From a286f67546eb45c9779660decfb990b38203a268 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sat, 8 Jul 2006 19:46:43 +0000 Subject: Attempted to fix epoll for Linux git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@592 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_common_abs.c | 31 ++++++++-------- pjlib/src/pj/ioqueue_common_abs.h | 4 +-- pjlib/src/pj/ioqueue_epoll.c | 74 +++++++++++++++++++++++++++++++-------- pjlib/src/pj/ioqueue_select.c | 16 ++++----- pjlib/src/pjlib-test/ioq_tcp.c | 27 +++++++------- pjlib/src/pjlib-test/ioq_udp.c | 10 +++--- pjlib/src/pjlib-test/test.h | 4 +-- 7 files changed, 106 insertions(+), 60 deletions(-) diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 95ca6d6a..982c8080 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -196,8 +196,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) /* Clear operation. */ h->connecting = 0; - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) @@ -272,7 +272,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) pj_list_erase(write_op); if (pj_list_empty(&h->write_list)) - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); } @@ -325,7 +325,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) /* Clear operation if there's no more data to send. */ if (pj_list_empty(&h->write_list)) - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); } @@ -378,7 +378,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Clear bit in fdset if there is no more pending accept */ if (pj_list_empty(&h->accept_list)) - ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT); rc=pj_sock_accept(h->fd, accept_op->accept_fd, accept_op->rmt_addr, accept_op->addrlen); @@ -411,7 +411,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) /* Clear fdset if there is no pending read. */ if (pj_list_empty(&h->read_list)) - ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT); bytes_read = read_op->size; @@ -518,8 +518,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, /* Clear operation. */ h->connecting = 0; - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); + ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); pj_mutex_unlock(h->mutex); @@ -585,7 +585,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, pj_mutex_lock(key->mutex); pj_list_insert_before(&key->read_list, read_op); - ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; @@ -653,7 +653,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, pj_mutex_lock(key->mutex); pj_list_insert_before(&key->read_list, read_op); - ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; @@ -759,7 +759,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, pj_mutex_lock(key->mutex); pj_list_insert_before(&key->write_list, write_op); - ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; @@ -827,6 +827,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, } status = status; } + PJ_LOG(3,(THIS_FILE, "pending write operation!!")); } /* @@ -876,7 +877,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, pj_mutex_lock(key->mutex); pj_list_insert_before(&key->write_list, write_op); - ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; @@ -945,7 +946,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, pj_mutex_lock(key->mutex); pj_list_insert_before(&key->accept_list, accept_op); - ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; @@ -981,8 +982,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, /* Pending! */ pj_mutex_lock(key->mutex); key->connecting = PJ_TRUE; - ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT); - ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT); + ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); + ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); pj_mutex_unlock(key->mutex); return PJ_EPENDING; } else { diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index e7d05561..e56065d7 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -128,9 +128,9 @@ enum ioqueue_event_type }; static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type ); static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type); diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 14a6cc32..66dd13e5 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -141,8 +141,8 @@ #define THIS_FILE "ioq_epoll" -#define TRACE_(expr) PJ_LOG(3,expr) -//#define TRACE_(expr) +//#define TRACE_(expr) PJ_LOG(3,expr) +#define TRACE_(expr) /* * Include common ioqueue abstraction. @@ -157,6 +157,12 @@ struct pj_ioqueue_key_t DECLARE_COMMON_KEY }; +struct queue +{ + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; +}; + /* * This describes the I/O queue. */ @@ -167,6 +173,8 @@ struct pj_ioqueue_t unsigned max, count; pj_ioqueue_key_t hlist; int epfd; + struct epoll_event *events; + struct queue *queue; }; /* Include implementation for common abstraction after we declare @@ -229,6 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, return PJ_RETURN_OS_ERROR(pj_get_native_os_error()); } + ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event)); + PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM); + + ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue)); + PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM); + PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue)); *p_ioqueue = ioqueue; @@ -305,7 +319,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* os_epoll_ctl. */ - ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; + ev.events = EPOLLIN | EPOLLERR; ev.epoll_data = (epoll_data_type)key; status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev); if (status < 0) { @@ -322,6 +336,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_list_insert_before(&ioqueue->hlist, key); ++ioqueue->count; + //TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count)); + on_return: *p_key = key; pj_lock_release(ioqueue->lock); @@ -373,9 +389,16 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) * set for the specified event. */ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type) { + if (event_type == WRITEABLE_EVENT) { + struct epoll_event ev; + + ev.events = EPOLLIN | EPOLLERR; + ev.epoll_data = (epoll_data_type)key; + os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev); + } } /* @@ -385,9 +408,16 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, * set for the specified event. */ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type ) { + if (event_type == WRITEABLE_EVENT) { + struct epoll_event ev; + + ev.events = EPOLLIN | EPOLLOUT | EPOLLERR; + ev.epoll_data = (epoll_data_type)key; + os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev); + } } /* @@ -397,22 +427,31 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { int i, count, processed; - struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; int msec; - struct queue { - pj_ioqueue_key_t *key; - enum ioqueue_event_type event_type; - } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + struct epoll_event *events = ioqueue->events; + struct queue *queue = ioqueue->queue; + pj_timestamp t1, t2; PJ_CHECK_STACK(); msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000; - - count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec); - if (count == 0) + + TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec)); + pj_get_timestamp(&t1); + + count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); + if (count == 0) { + TRACE_((THIS_FILE, "os_epoll_wait timed out")); return count; - else if (count < 0) + } + else if (count < 0) { + TRACE_((THIS_FILE, "os_epoll_wait error")); return -pj_get_netos_error(); + } + + pj_get_timestamp(&t2); + TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec", + count, pj_elapsed_usec(&t1, &t2))); /* Lock ioqueue. */ pj_lock_acquire(ioqueue->lock); @@ -421,6 +460,8 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; + TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events)); + /* * Check readability. */ @@ -486,6 +527,11 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count > 0 && !processed && msec > 0) { pj_thread_sleep(msec); } + + pj_get_timestamp(&t1); + TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec", + processed, pj_elapsed_usec(&t2, &t1))); + return processed; } diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 33fcae7d..93adf07d 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -542,17 +542,17 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, * set for the specified event. */ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type) { pj_lock_acquire(ioqueue->lock); if (event_type == READABLE_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset); else if (event_type == WRITEABLE_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset); else if (event_type == EXCEPTION_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset); else pj_assert(0); @@ -566,17 +566,17 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, * set for the specified event. */ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, + pj_ioqueue_key_t *key, enum ioqueue_event_type event_type ) { pj_lock_acquire(ioqueue->lock); if (event_type == READABLE_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset); else if (event_type == WRITEABLE_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset); else if (event_type == EXCEPTION_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset); else pj_assert(0); diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index 7f004d9f..4a546f03 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -38,7 +38,6 @@ #if PJ_HAS_TCP #define THIS_FILE "test_tcp" -#define PORT 50000 #define NON_EXISTANT_PORT 50123 #define LOOP 100 #define BUF_MIN_SIZE 32 @@ -257,14 +256,21 @@ static int compliance_test_0(void) } // Bind server socket. - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PJ_AF_INET; - addr.sin_port = pj_htons(PORT); - if (pj_sock_bind(ssock, &addr, sizeof(addr))) { + pj_sockaddr_in_init(&addr, 0, 0); + if ((rc=pj_sock_bind(ssock, &addr, sizeof(addr))) != 0 ) { app_perror("...bind error", rc); status=-10; goto on_error; } + // Get server address. + client_addr_len = sizeof(addr); + rc = pj_sock_getsockname(ssock, &addr, &client_addr_len); + if (rc != PJ_SUCCESS) { + app_perror("...ERROR in pj_sock_getsockname()", rc); + status=-15; goto on_error; + } + addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); + // Create I/O Queue. rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { @@ -302,12 +308,6 @@ static int compliance_test_0(void) ++pending_op; } - // Initialize remote address. - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PJ_AF_INET; - addr.sin_port = pj_htons(PORT); - addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); - // Client socket connect() status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); if (status!=PJ_SUCCESS && status != PJ_EPENDING) { @@ -461,10 +461,7 @@ static int compliance_test_1(void) } // Initialize remote address. - memset(&addr, 0, sizeof(addr)); - addr.sin_family = PJ_AF_INET; - addr.sin_port = pj_htons(NON_EXISTANT_PORT); - addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); + pj_sockaddr_in_init(&addr, pj_cstr(&s, "127.0.0.1"), NON_EXISTANT_PORT); // Client socket connect() status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c index d26e8fd9..dfc818fb 100644 --- a/pjlib/src/pjlib-test/ioq_udp.c +++ b/pjlib/src/pjlib-test/ioq_udp.c @@ -40,7 +40,7 @@ #define THIS_FILE "test_udp" #define PORT 51233 -#define LOOP 100 +#define LOOP 2 ///#define LOOP 2 #define BUF_MIN_SIZE 32 #define BUF_MAX_SIZE 2048 @@ -817,7 +817,7 @@ int udp_ioqueue_test() int status; int bufsize, sock_count; - goto pass1; + //goto pass1; PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name())); if ((status=compliance_test()) != 0) { @@ -836,6 +836,8 @@ int udp_ioqueue_test() return status; } + //return 0; + PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:")); PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, " "elapsed=in timer ticks")); @@ -847,7 +849,7 @@ pass1: PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)")); PJ_LOG(3, (THIS_FILE, "...=====================================")); - goto pass2; + //goto pass2; for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) { if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0) @@ -859,7 +861,7 @@ pass2: sock_count<=SOCK_INACTIVE_MAX+2; sock_count *= 2) { - PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count)); + //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count)); if ((status=bench_test(bufsize, sock_count-2)) != 0) return status; } diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h index b7fd6ded..d552ea40 100644 --- a/pjlib/src/pjlib-test/test.h +++ b/pjlib/src/pjlib-test/test.h @@ -24,7 +24,7 @@ #define GROUP_LIBC 0 #define GROUP_OS 0 #define GROUP_DATA_STRUCTURE 0 -#define GROUP_NETWORK 0 +#define GROUP_NETWORK 1 #define GROUP_FILE 0 #define INCLUDE_ERRNO_TEST GROUP_LIBC @@ -45,7 +45,7 @@ #define INCLUDE_SOCK_TEST GROUP_NETWORK #define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK #define INCLUDE_SELECT_TEST GROUP_NETWORK -#define INCLUDE_UDP_IOQUEUE_TEST 1 //GROUP_NETWORK +#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK #define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK #define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK #define INCLUDE_IOQUEUE_UNREG_TEST GROUP_NETWORK -- cgit v1.2.3