From 33a8c1cb59304d92d517e3ba511bf233c729597f Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 6 Nov 2005 16:50:38 +0000 Subject: Tested new ioqueue framework on Linux with select and epoll git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_select.c | 394 +++++++++++++++++++++--------------------- 1 file changed, 198 insertions(+), 196 deletions(-) (limited to 'pjlib/src/pj/ioqueue_select.c') diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 24e68564..c2051681 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,11 +20,11 @@ #include #include #include - -/* - * Include declaration from common abstraction. - */ -#include "ioqueue_common_abs.h" + +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -38,30 +38,30 @@ * */ #define THIS_FILE "ioq_select" - -/* - * The select ioqueue relies on socket functions (pj_sock_xxx()) to return - * the correct error code. - */ -#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) -# error "Error reporting must be enabled for this function to work!" -#endif - -/** - * Get the number of descriptors in the set. This is defined in sock_select.c - * This function will only return the number of sockets set from PJ_FD_SET - * operation. When the set is modified by other means (such as by select()), - * the count will not be reflected here. - * - * That's why don't export this function in the header file, to avoid - * misunderstanding. - * - * @param fdsetp The descriptor set. - * - * @return Number of descriptors in the set. - */ -PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); - + +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + +/** + * Get the number of descriptors in the set. This is defined in sock_select.c + * This function will only return the number of sockets set from PJ_FD_SET + * operation. When the set is modified by other means (such as by select()), + * the count will not be reflected here. + * + * That's why don't export this function in the header file, to avoid + * misunderstanding. + * + * @param fdsetp The descriptor set. + * + * @return Number of descriptors in the set. + */ +PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); + /* * During debugging build, VALIDATE_FD_SET is set. @@ -72,12 +72,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); #else # define VALIDATE_FD_SET 0 #endif - + /* * This describes each key. */ struct pj_ioqueue_key_t -{ +{ DECLARE_COMMON_KEY }; @@ -86,7 +86,7 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - DECLARE_COMMON_IOQUEUE + DECLARE_COMMON_IOQUEUE unsigned max, count; pj_ioqueue_key_t key_list; @@ -96,11 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; - -/* Include implementation for common abstraction after we declare - * pj_ioqueue_key_t and pj_ioqueue_t. - */ -#include "ioqueue_common_abs.c" + +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -111,22 +111,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue; + pj_ioqueue_t *ioqueue; pj_lock_t *lock; pj_status_t rc; - - /* Check that arguments are valid. */ - PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && - max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, - PJ_EINVAL); - - /* Check that size of pj_ioqueue_op_key_t is sufficient */ - PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= - sizeof(union operation_key), PJ_EBUG); - - ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - - ioqueue_init(ioqueue); + + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); ioqueue->max = max_fd; ioqueue->count = 0; @@ -141,8 +141,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, if (rc != PJ_SUCCESS) return rc; - rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); - if (rc != PJ_SUCCESS) + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -159,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); - - pj_lock_acquire(ioqueue->lock); + + pj_lock_acquire(ioqueue->lock); return ioqueue_destroy(ioqueue); } @@ -203,16 +203,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); - if (rc != PJ_SUCCESS) - return rc; + key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* Register */ pj_list_insert_before(&ioqueue->key_list, key); ++ioqueue->count; -on_return: +on_return: /* On error, socket may be left in non-blocking mode. */ *p_key = key; pj_lock_release(ioqueue->lock); @@ -226,13 +228,13 @@ on_return: * Unregister handle from ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) -{ - pj_ioqueue_t *ioqueue; +{ + pj_ioqueue_t *ioqueue; PJ_ASSERT_RETURN(key, PJ_EINVAL); - - ioqueue = key->ioqueue; - + + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -243,21 +245,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) #if PJ_HAS_TCP PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - - /* ioqueue_destroy may try to acquire key's mutex. - * Since normally the order of locking is to lock key's mutex first - * then ioqueue's mutex, ioqueue_destroy may deadlock unless we - * release ioqueue's mutex first. - */ - pj_lock_release(ioqueue->lock); - - /* Destroy the key. */ - ioqueue_destroy_key(key); + + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); return PJ_SUCCESS; } - + /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -307,54 +309,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ - - -/* ioqueue_remove_from_set() - * This function is called from ioqueue_dispatch_event() to instruct - * the ioqueue to remove the specified descriptor from ioqueue's descriptor - * set for the specified event. - */ -static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, - enum ioqueue_event_type event_type) -{ - pj_lock_acquire(ioqueue->lock); - - if (event_type == READABLE_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); - else if (event_type == WRITEABLE_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); - else if (event_type == EXCEPTION_EVENT) - PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); - else - pj_assert(0); - - pj_lock_release(ioqueue->lock); -} - -/* - * ioqueue_add_to_set() - * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc - * to instruct the ioqueue to add the specified handle to ioqueue's descriptor - * set for the specified event. - */ -static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, - pj_sock_t fd, - enum ioqueue_event_type event_type ) -{ - pj_lock_acquire(ioqueue->lock); - - if (event_type == READABLE_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); - else if (event_type == WRITEABLE_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); - else if (event_type == EXCEPTION_EVENT) - PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); - else - pj_assert(0); - - pj_lock_release(ioqueue->lock); -} + + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -378,19 +380,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_fd_set_t rfdset, wfdset, xfdset; int count, counter; pj_ioqueue_key_t *h; - struct event - { - pj_ioqueue_key_t *key; - enum event_type event_type; - } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; - - PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + struct event + { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Lock ioqueue before making fd_set copies */ pj_lock_acquire(ioqueue->lock); - - /* We will only do select() when there are sockets to be polled. - * Otherwise select() will return error. + + /* We will only do select() when there are sockets to be polled. + * Otherwise select() will return error. */ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && PJ_FD_COUNT(&ioqueue->wfdset)==0 && @@ -422,71 +424,71 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) - count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; + else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Scan descriptor sets for event and add the events in the event - * array to be processed later in this function. We do this so that - * events can be processed in parallel without holding ioqueue lock. + /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); - - counter = 0; - - /* Scan for writable sockets first to handle piggy-back data - * coming with accept(). - */ - h = ioqueue->key_list.next; - for ( ; h!=&ioqueue->key_list && counternext) { - if ( (key_has_pending_write(h) || key_has_pending_connect(h)) - && PJ_FD_ISSET(h->fd, &wfdset)) - { - event[counter].key = h; - event[counter].event_type = WRITEABLE_EVENT; - ++counter; - } - - /* Scan for readable socket. */ - if ((key_has_pending_read(h) || key_has_pending_accept(h)) - && PJ_FD_ISSET(h->fd, &rfdset)) - { - event[counter].key = h; - event[counter].event_type = READABLE_EVENT; - ++counter; } - -#if PJ_HAS_TCP - if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { - event[counter].key = h; - event[counter].event_type = EXCEPTION_EVENT; - ++counter; - } -#endif - } - - pj_lock_release(ioqueue->lock); - - count = counter; - - /* Now process all events. The dispatch functions will take care - * of locking in each of the key - */ - for (counter=0; counterkey_list.next; + for ( ; h!=&ioqueue->key_list && counternext) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) + && PJ_FD_ISSET(h->fd, &wfdset)) + { + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; + } + + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; + } + +#if PJ_HAS_TCP + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } +#endif + } + + pj_lock_release(ioqueue->lock); + + count = counter; + + /* Now process all events. The dispatch functions will take care + * of locking in each of the key + */ + for (counter=0; counter