diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 16:50:38 +0000 |
commit | 33a8c1cb59304d92d517e3ba511bf233c729597f (patch) | |
tree | e6cb65930121480465db749bf5916fa2708ca633 /pjlib/src/pj/ioqueue_select.c | |
parent | 6d5fbe07f3dc84c10ea75c5584fe8b5513278d08 (diff) |
Tested new ioqueue framework on Linux with select and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@14 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 394 |
1 files changed, 198 insertions, 196 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 24e68564..c2051681 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,11 +20,11 @@ #include <pj/compat/socket.h> #include <pj/sock_select.h> #include <pj/errno.h> -
-/*
- * Include declaration from common abstraction.
- */
-#include "ioqueue_common_abs.h"
+ +/* + * Include declaration from common abstraction. + */ +#include "ioqueue_common_abs.h" /* * ISSUES with ioqueue_select() @@ -38,30 +38,30 @@ * */ #define THIS_FILE "ioq_select" -
-/*
- * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
- * the correct error code.
- */
-#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
-# error "Error reporting must be enabled for this function to work!"
-#endif
-
-/**
- * Get the number of descriptors in the set. This is defined in sock_select.c
- * This function will only return the number of sockets set from PJ_FD_SET
- * operation. When the set is modified by other means (such as by select()),
- * the count will not be reflected here.
- *
- * That's why don't export this function in the header file, to avoid
- * misunderstanding.
- *
- * @param fdsetp The descriptor set.
- *
- * @return Number of descriptors in the set.
- */
-PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
+ +/* + * The select ioqueue relies on socket functions (pj_sock_xxx()) to return + * the correct error code. + */ +#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100) +# error "Error reporting must be enabled for this function to work!" +#endif + +/** + * Get the number of descriptors in the set. This is defined in sock_select.c + * This function will only return the number of sockets set from PJ_FD_SET + * operation. When the set is modified by other means (such as by select()), + * the count will not be reflected here. + * + * That's why don't export this function in the header file, to avoid + * misunderstanding. + * + * @param fdsetp The descriptor set. + * + * @return Number of descriptors in the set. + */ +PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); + /* * During debugging build, VALIDATE_FD_SET is set. @@ -72,12 +72,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); #else # define VALIDATE_FD_SET 0 #endif -
+ /* * This describes each key. */ struct pj_ioqueue_key_t -{
+{ DECLARE_COMMON_KEY }; @@ -86,7 +86,7 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - DECLARE_COMMON_IOQUEUE
+ DECLARE_COMMON_IOQUEUE unsigned max, count; pj_ioqueue_key_t key_list; @@ -96,11 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; -
-/* Include implementation for common abstraction after we declare
- * pj_ioqueue_key_t and pj_ioqueue_t.
- */
-#include "ioqueue_common_abs.c"
+ +/* Include implementation for common abstraction after we declare + * pj_ioqueue_key_t and pj_ioqueue_t. + */ +#include "ioqueue_common_abs.c" /* * pj_ioqueue_create() @@ -111,22 +111,22 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue;
+ pj_ioqueue_t *ioqueue; pj_lock_t *lock; pj_status_t rc; -
- /* Check that arguments are valid. */
- PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
- max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
- PJ_EINVAL);
-
- /* Check that size of pj_ioqueue_op_key_t is sufficient */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
-
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-
- ioqueue_init(ioqueue);
+ + /* Check that arguments are valid. */ + PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && + max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, + PJ_EINVAL); + + /* Check that size of pj_ioqueue_op_key_t is sufficient */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + + ioqueue_init(ioqueue); ioqueue->max = max_fd; ioqueue->count = 0; @@ -141,8 +141,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, if (rc != PJ_SUCCESS) return rc; - rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
- if (rc != PJ_SUCCESS)
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE); + if (rc != PJ_SUCCESS) return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -159,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); -
- pj_lock_acquire(ioqueue->lock);
+ + pj_lock_acquire(ioqueue->lock); return ioqueue_destroy(ioqueue); } @@ -203,16 +203,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
- if (rc != PJ_SUCCESS)
- return rc;
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } /* Register */ pj_list_insert_before(&ioqueue->key_list, key); ++ioqueue->count; -on_return:
+on_return: /* On error, socket may be left in non-blocking mode. */ *p_key = key; pj_lock_release(ioqueue->lock); @@ -226,13 +228,13 @@ on_return: * Unregister handle from ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) -{
- pj_ioqueue_t *ioqueue;
+{ + pj_ioqueue_t *ioqueue; PJ_ASSERT_RETURN(key, PJ_EINVAL); -
- ioqueue = key->ioqueue;
-
+ + ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -243,21 +245,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) #if PJ_HAS_TCP PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif -
- /* ioqueue_destroy may try to acquire key's mutex.
- * Since normally the order of locking is to lock key's mutex first
- * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
- * release ioqueue's mutex first.
- */
- pj_lock_release(ioqueue->lock);
-
- /* Destroy the key. */
- ioqueue_destroy_key(key);
+ + /* ioqueue_destroy may try to acquire key's mutex. + * Since normally the order of locking is to lock key's mutex first + * then ioqueue's mutex, ioqueue_destroy may deadlock unless we + * release ioqueue's mutex first. + */ + pj_lock_release(ioqueue->lock); + + /* Destroy the key. */ + ioqueue_destroy_key(key); return PJ_SUCCESS; } -
+ /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -307,54 +309,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ -
- -/* ioqueue_remove_from_set()
- * This function is called from ioqueue_dispatch_event() to instruct
- * the ioqueue to remove the specified descriptor from ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type)
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
-
-/*
- * ioqueue_add_to_set()
- * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
- * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
- * set for the specified event.
- */
-static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
- enum ioqueue_event_type event_type )
-{
- pj_lock_acquire(ioqueue->lock);
-
- if (event_type == READABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
- else if (event_type == WRITEABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
- else if (event_type == EXCEPTION_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
- else
- pj_assert(0);
-
- pj_lock_release(ioqueue->lock);
-}
+ + +/* ioqueue_remove_from_set() + * This function is called from ioqueue_dispatch_event() to instruct + * the ioqueue to remove the specified descriptor from ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} + +/* + * ioqueue_add_to_set() + * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc + * to instruct the ioqueue to add the specified handle to ioqueue's descriptor + * set for the specified event. + */ +static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, + pj_sock_t fd, + enum ioqueue_event_type event_type ) +{ + pj_lock_acquire(ioqueue->lock); + + if (event_type == READABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset); + else if (event_type == WRITEABLE_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset); + else if (event_type == EXCEPTION_EVENT) + PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset); + else + pj_assert(0); + + pj_lock_release(ioqueue->lock); +} /* * pj_ioqueue_poll() @@ -378,19 +380,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_fd_set_t rfdset, wfdset, xfdset; int count, counter; pj_ioqueue_key_t *h; - struct event
- {
- pj_ioqueue_key_t *key;
- enum event_type event_type;
- } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
-
- PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ struct event + { + pj_ioqueue_key_t *key; + enum ioqueue_event_type event_type; + } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Lock ioqueue before making fd_set copies */ pj_lock_acquire(ioqueue->lock); -
- /* We will only do select() when there are sockets to be polled.
- * Otherwise select() will return error.
+ + /* We will only do select() when there are sockets to be polled. + * Otherwise select() will return error. */ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && PJ_FD_COUNT(&ioqueue->wfdset)==0 && @@ -422,71 +424,71 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
- count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
+ else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) + count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; - /* Scan descriptor sets for event and add the events in the event
- * array to be processed later in this function. We do this so that
- * events can be processed in parallel without holding ioqueue lock.
+ /* Scan descriptor sets for event and add the events in the event + * array to be processed later in this function. We do this so that + * events can be processed in parallel without holding ioqueue lock. */ pj_lock_acquire(ioqueue->lock); -
- counter = 0;
- - /* Scan for writable sockets first to handle piggy-back data
- * coming with accept().
- */
- h = ioqueue->key_list.next;
- for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
- if ( (key_has_pending_write(h) || key_has_pending_connect(h))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- event[counter].key = h;
- event[counter].event_type = WRITEABLE_EVENT;
- ++counter;
- }
-
- /* Scan for readable socket. */
- if ((key_has_pending_read(h) || key_has_pending_accept(h))
- && PJ_FD_ISSET(h->fd, &rfdset))
- {
- event[counter].key = h;
- event[counter].event_type = READABLE_EVENT;
- ++counter; }
-
-#if PJ_HAS_TCP
- if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
- event[counter].key = h;
- event[counter].event_type = EXCEPTION_EVENT;
- ++counter;
- }
-#endif
- }
-
- pj_lock_release(ioqueue->lock);
-
- count = counter;
-
- /* Now process all events. The dispatch functions will take care
- * of locking in each of the key
- */
- for (counter=0; counter<count; ++counter) {
- switch (event[counter].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, event[counter].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, event[counter].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
- break;
- case NO_EVENT:
- default:
- pj_assert(!"Invalid event!");
- break;
- }
- }
+ + counter = 0; + + /* Scan for writable sockets first to handle piggy-back data + * coming with accept(). + */ + h = ioqueue->key_list.next; + for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) + && PJ_FD_ISSET(h->fd, &wfdset)) + { + event[counter].key = h; + event[counter].event_type = WRITEABLE_EVENT; + ++counter; + } + + /* Scan for readable socket. */ + if ((key_has_pending_read(h) || key_has_pending_accept(h)) + && PJ_FD_ISSET(h->fd, &rfdset)) + { + event[counter].key = h; + event[counter].event_type = READABLE_EVENT; + ++counter; + } + +#if PJ_HAS_TCP + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + event[counter].key = h; + event[counter].event_type = EXCEPTION_EVENT; + ++counter; + } +#endif + } + + pj_lock_release(ioqueue->lock); + + count = counter; + + /* Now process all events. The dispatch functions will take care + * of locking in each of the key + */ + for (counter=0; counter<count; ++counter) { + switch (event[counter].event_type) { + case READABLE_EVENT: + ioqueue_dispatch_read_event(ioqueue, event[counter].key); + break; + case WRITEABLE_EVENT: + ioqueue_dispatch_write_event(ioqueue, event[counter].key); + break; + case EXCEPTION_EVENT: + ioqueue_dispatch_exception_event(ioqueue, event[counter].key); + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } return count; } |