diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 933 |
1 files changed, 135 insertions, 798 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 367ffb5e..24e68564 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -20,6 +20,11 @@ #include <pj/compat/socket.h> #include <pj/sock_select.h> #include <pj/errno.h> +
+/*
+ * Include declaration from common abstraction.
+ */
+#include "ioqueue_common_abs.h"
/* * ISSUES with ioqueue_select() @@ -58,8 +63,6 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
- - /* * During debugging build, VALIDATE_FD_SET is set. * This will check the validity of the fd_sets. @@ -70,76 +73,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp); # define VALIDATE_FD_SET 0 #endif -struct generic_operation
-{
- PJ_DECL_LIST_MEMBER(struct generic_operation);
- pj_ioqueue_operation_e op;
-};
-
-struct read_operation
-{
- PJ_DECL_LIST_MEMBER(struct read_operation);
- pj_ioqueue_operation_e op;
-
- void *buf;
- pj_size_t size;
- unsigned flags;
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-};
-
-struct write_operation
-{
- PJ_DECL_LIST_MEMBER(struct write_operation);
- pj_ioqueue_operation_e op;
-
- char *buf;
- pj_size_t size;
- pj_ssize_t written;
- unsigned flags;
- pj_sockaddr_in rmt_addr;
- int rmt_addrlen;
-};
-
-#if PJ_HAS_TCP
-struct accept_operation
-{
- PJ_DECL_LIST_MEMBER(struct accept_operation);
- pj_ioqueue_operation_e op;
-
- pj_sock_t *accept_fd;
- pj_sockaddr_t *local_addr;
- pj_sockaddr_t *rmt_addr;
- int *addrlen;
-};
-#endif
-
-union operation_key
-{
- struct generic_operation generic;
- struct read_operation read;
- struct write_operation write;
-#if PJ_HAS_TCP
- struct accept_operation accept;
-#endif
-};
- /* * This describes each key. */ struct pj_ioqueue_key_t -{ - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
- pj_ioqueue_t *ioqueue; - pj_sock_t fd; - void *user_data; - pj_ioqueue_callback cb;
- int connecting;
- struct read_operation read_list;
- struct write_operation write_list;
-#if PJ_HAS_TCP
- struct accept_operation accept_list;
-#endif +{
+ DECLARE_COMMON_KEY }; /* @@ -147,8 +86,8 @@ struct pj_ioqueue_key_t */ struct pj_ioqueue_t { - pj_lock_t *lock; - pj_bool_t auto_delete_lock; + DECLARE_COMMON_IOQUEUE
+ unsigned max, count; pj_ioqueue_key_t key_list; pj_fd_set_t rfdset; @@ -157,6 +96,11 @@ struct pj_ioqueue_t pj_fd_set_t xfdset; #endif }; +
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
/* * pj_ioqueue_create() @@ -167,7 +111,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioqueue; + pj_ioqueue_t *ioqueue;
+ pj_lock_t *lock; pj_status_t rc; /* Check that arguments are valid. */
@@ -179,7 +124,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+ ioqueue_init(ioqueue);
+ ioqueue->max = max_fd; ioqueue->count = 0; PJ_FD_ZERO(&ioqueue->rfdset); @@ -189,11 +137,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, #endif pj_list_init(&ioqueue->key_list); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; - ioqueue->auto_delete_lock = PJ_TRUE; + rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+ if (rc != PJ_SUCCESS)
+ return rc; PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); @@ -208,16 +158,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { - pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock);
- - if (ioqueue->auto_delete_lock) - rc = pj_lock_destroy(ioqueue->lock); - - return rc; + return ioqueue_destroy(ioqueue); } @@ -260,17 +204,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- key->ioqueue = ioqueue; - key->fd = sock; - key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif -
- /* Save callback. */ - pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ if (rc != PJ_SUCCESS)
+ return rc;
/* Register */ pj_list_insert_before(&ioqueue->key_list, key); @@ -296,7 +232,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_ASSERT_RETURN(key, PJ_EINVAL); ioqueue = key->ioqueue;
- +
pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -308,39 +244,20 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - pj_lock_release(ioqueue->lock); - return PJ_SUCCESS; -} + /* ioqueue_destroy may try to acquire key's mutex.
+ * Since normally the order of locking is to lock key's mutex first
+ * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
+ * release ioqueue's mutex first.
+ */
+ pj_lock_release(ioqueue->lock);
+
+ /* Destroy the key. */
+ ioqueue_destroy_key(key);
-/* - * pj_ioqueue_get_user_data() - * - * Obtain value associated with a key. - */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key != NULL, NULL); - return key->user_data; + return PJ_SUCCESS; } -/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
- /* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ @@ -390,7 +307,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, } } #endif /* VALIDATE_FD_SET */ +
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type)
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type )
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
/* * pj_ioqueue_poll() @@ -412,8 +376,13 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; - int count; + int count, counter; pj_ioqueue_key_t *h; + struct event
+ {
+ pj_ioqueue_key_t *key;
+ enum event_type event_type;
+ } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
@@ -453,704 +422,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) if (count <= 0) return count; + else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
+ count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
- /* Lock ioqueue again before scanning for signalled sockets.
- * We must strictly use recursive mutex since application may invoke
- * the ioqueue again inside the callback.
+ /* Scan descriptor sets for event and add the events in the event
+ * array to be processed later in this function. We do this so that
+ * events can be processed in parallel without holding ioqueue lock.
*/ pj_lock_acquire(ioqueue->lock); +
+ counter = 0;
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
h = ioqueue->key_list.next;
-do_writable_scan:
- for ( ; h!=&ioqueue->key_list; h = h->next) {
- if ( (!pj_list_empty(&h->write_list) || h->connecting)
+ for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+ if ( (key_has_pending_write(h) || key_has_pending_connect(h))
&& PJ_FD_ISSET(h->fd, &wfdset))
{
- break;
+ event[counter].key = h;
+ event[counter].event_type = WRITEABLE_EVENT;
+ ++counter;
}
- }
- if (h != &ioqueue->key_list) {
- pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Clear operation. */
- h->connecting = 0;
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
- PJ_FD_CLR(h->fd, &ioqueue->xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
-
- } else
-#endif /* PJ_HAS_TCP */
- {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* Send the data. */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* In any case we don't need to process this descriptor again. */
- PJ_FD_CLR(h->fd, &wfdset);
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size)
- {
- pj_list_erase(write_op);
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
- }
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
- }
- }
+ /* Scan for readable socket. */
+ if ((key_has_pending_read(h) || key_has_pending_accept(h))
+ && PJ_FD_ISSET(h->fd, &rfdset))
+ {
+ event[counter].key = h;
+ event[counter].event_type = READABLE_EVENT;
+ ++counter; }
- /* Scan for readable socket. */ - h = ioqueue->key_list.next; -do_readable_scan: - for ( ; h!=&ioqueue->key_list; h = h->next) { - if ((!pj_list_empty(&h->read_list)
#if PJ_HAS_TCP
- || !pj_list_empty(&h->accept_list)
-#endif
- ) && PJ_FD_ISSET(h->fd, &rfdset)) - { - break; - } - } - if (h != &ioqueue->key_list) { - pj_status_t rc; - -#if PJ_HAS_TCP
- pj_assert(!pj_list_empty(&h->read_list) ||
- !pj_list_empty(&h->accept_list));
-#else
- pj_assert(!pj_list_empty(&h->read_list)); + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+ event[counter].key = h;
+ event[counter].event_type = EXCEPTION_EVENT;
+ ++counter;
+ }
#endif
- -# if PJ_HAS_TCP - if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op; -
- /* Get one accept operation from the list. */ - accept_op = h->accept_list.next;
- pj_list_erase(accept_op); - - rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen); - if (rc==PJ_SUCCESS && accept_op->local_addr) { - rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr, - accept_op->addrlen); - } - - /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- PJ_FD_CLR(h->fd, &ioqueue->rfdset); - - /* Call callback. */ - if (h->cb.on_accept_complete) - (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - } - else { -# endif
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- pj_assert(!pj_list_empty(&h->read_list));
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
- - bytes_read = read_op->size; - - if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, - read_op->rmt_addr,
- read_op->rmt_addrlen); - } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); - } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); - /* - * User has specified pj_ioqueue_read(). - * On Win32, we should do ReadFile(). But because we got - * here because of select() anyway, user must have put a - * socket descriptor on h->fd, which in this case we can - * just call pj_sock_recv() instead of ReadFile(). - * On Unix, user may put a file in h->fd, so we'll have - * to call read() here. - * This may not compile on systems which doesn't have - * read(). That's why we only specify PJ_LINUX here so - * that error is easier to catch. - */ -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) - bytes_read = read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); -# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 - bytes_read = sys_read(h->fd, h->rd_buf, bytes_read); - rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read; -# else -# error "Implement read() for this platform!" -# endif - } - - if (rc != PJ_SUCCESS) { -# if defined(PJ_WIN32) && PJ_WIN32 != 0 - /* On Win32, for UDP, WSAECONNRESET on the receive side - * indicates that previous sending has triggered ICMP Port - * Unreachable message. - * But we wouldn't know at this point which one of previous - * key that has triggered the error, since UDP socket can - * be shared! - * So we'll just ignore it! - */ - - if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { - //PJ_LOG(4,(THIS_FILE, - // "Ignored ICMP port unreach. on key=%p", h)); - } -# endif - - /* In any case we would report this to caller. */ - bytes_read = -rc; - } -
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list)) - PJ_FD_CLR(h->fd, &ioqueue->rfdset);
-
- /* In any case clear from temporary set. */ - PJ_FD_CLR(h->fd, &rfdset); - - /* Call callback. */ - if (h->cb.on_read_complete) - (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
- bytes_read); - - /* Re-scan readable sockets. */ - goto do_readable_scan; - - } - } -
-#if PJ_HAS_TCP
- /* Scan for exception socket for TCP connection error. */
- h = ioqueue->key_list.next;
-do_except_scan:
- for ( ; h!=&ioqueue->key_list; h = h->next) {
- if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
- break;
- }
- if (h != &ioqueue->key_list) {
-
- pj_assert(h->connecting);
-
- /* Clear operation. */
- h->connecting = 0;
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
- PJ_FD_CLR(h->fd, &ioqueue->xfdset);
- PJ_FD_CLR(h->fd, &wfdset);
- PJ_FD_CLR(h->fd, &xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-
- /* Re-scan exception list. */
- goto do_except_scan;
- }
-#endif /* PJ_HAS_TCP */
-
- /* Shouldn't happen. */ - /* For strange reason on WinXP select() can return 1 while there is no - * pj_fd_set_t signaled. */ - /* pj_assert(0); */ - - //count = 0; - - pj_lock_release(ioqueue->lock); - return count; -} - -/* - * pj_ioqueue_recv() - * - * Start asynchronous recv() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags ) -{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
- pj_ioqueue_t *ioqueue;
- - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - PJ_CHECK_STACK(); -
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- } -
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); -
- read_op = (struct read_operation*)op_key;
- - read_op->op = PJ_IOQUEUE_OP_RECV; - read_op->buf = buffer; - read_op->size = *length; - read_op->flags = flags;
-
- pj_list_insert_before(&key->read_list, read_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); - - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Start asynchronous recvfrom() from the socket. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
- pj_ioqueue_t *ioqueue;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
}
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_list_insert_before(&key->read_list, read_op);
- PJ_FD_SET(key->fd, &ioqueue->rfdset);
-
pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
-} - -/* - * pj_ioqueue_send() - * - * Start asynchronous send() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags) -{
- pj_ioqueue_t *ioqueue;
- struct write_operation *write_op; - pj_status_t status;
- pj_ssize_t sent; - - PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); - PJ_CHECK_STACK(); - /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */ - sent = *length; - status = pj_sock_send(key->fd, data, &sent, flags); - if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- } + count = counter;
- /*
- * Schedule asynchronous send.
+ /* Now process all events. The dispatch functions will take care
+ * of locking in each of the key
*/
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock); -
- write_op = (struct write_operation*)op_key; - write_op->op = PJ_IOQUEUE_OP_SEND; - write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_list_insert_before(&key->write_list, write_op); - PJ_FD_SET(key->fd, &ioqueue->wfdset); - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_sendto() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - pj_ioqueue_t *ioqueue;
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
+ for (counter=0; counter<count; ++counter) {
+ switch (event[counter].event_type) {
+ case READABLE_EVENT:
+ ioqueue_dispatch_read_event(ioqueue, event[counter].key);
+ break;
+ case WRITEABLE_EVENT:
+ ioqueue_dispatch_write_event(ioqueue, event[counter].key);
+ break;
+ case EXCEPTION_EVENT:
+ ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
+ break;
+ case NO_EVENT:
+ default:
+ pj_assert(!"Invalid event!");
+ break;
}
}
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_list_insert_before(&key->write_list, write_op);
- PJ_FD_SET(key->fd, &ioqueue->wfdset);
-
- pj_lock_release(ioqueue->lock);
-
- return PJ_EPENDING;
-} -#if PJ_HAS_TCP -/* - * Initiate overlapped accept() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{
- pj_ioqueue_t *ioqueue;
- struct accept_operation *accept_op;
- pj_status_t status;
- - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); - - /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- ioqueue = key->ioqueue;
- accept_op = (struct accept_operation*)op_key;
- - pj_lock_acquire(ioqueue->lock);
- - accept_op->op = PJ_IOQUEUE_OP_ACCEPT; - accept_op->accept_fd = new_sock; - accept_op->rmt_addr = remote; - accept_op->addrlen= addrlen; - accept_op->local_addr = local; -
- pj_list_insert_before(&key->accept_list, accept_op); - PJ_FD_SET(key->fd, &ioqueue->rfdset); -
- pj_lock_release(ioqueue->lock);
- - return PJ_EPENDING; -} - -/* - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{
- pj_ioqueue_t *ioqueue; - pj_status_t status; - - /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); - - /* Check if socket has not been marked for connecting */ - if (key->connecting != 0)
- return PJ_EPENDING; - - status = pj_sock_connect(key->fd, addr, addrlen); - if (status == PJ_SUCCESS) { - /* Connected! */ - return PJ_SUCCESS; - } else { - if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { - /* Pending! */
- ioqueue = key->ioqueue; - pj_lock_acquire(ioqueue->lock); - key->connecting = PJ_TRUE; - PJ_FD_SET(key->fd, &ioqueue->wfdset); - PJ_FD_SET(key->fd, &ioqueue->xfdset); - pj_lock_release(ioqueue->lock); - return PJ_EPENDING; - } else { - /* Error! */ - return status; - } - } + return count; } -#endif /* PJ_HAS_TCP */ |