summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_select.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-06 13:32:11 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-06 13:32:11 +0000
commit97611d9f0a7809a759a0a0603f6d45f5822ad170 (patch)
treecfef74513d998e880d13bb0da8d1f465559c3861 /pjlib/src/pj/ioqueue_select.c
parent7c7300624eb867fa7c1ea52b9c636889aac60e80 (diff)
Put common ioqueue functionalities in separate file to be used by both select() and epoll
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@12 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r--pjlib/src/pj/ioqueue_select.c933
1 files changed, 135 insertions, 798 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 367ffb5e..24e68564 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -20,6 +20,11 @@
#include <pj/compat/socket.h>
#include <pj/sock_select.h>
#include <pj/errno.h>
+
+/*
+ * Include declaration from common abstraction.
+ */
+#include "ioqueue_common_abs.h"
/*
* ISSUES with ioqueue_select()
@@ -58,8 +63,6 @@
PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
-
-
/*
* During debugging build, VALIDATE_FD_SET is set.
* This will check the validity of the fd_sets.
@@ -70,76 +73,12 @@ PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
# define VALIDATE_FD_SET 0
#endif
-struct generic_operation
-{
- PJ_DECL_LIST_MEMBER(struct generic_operation);
- pj_ioqueue_operation_e op;
-};
-
-struct read_operation
-{
- PJ_DECL_LIST_MEMBER(struct read_operation);
- pj_ioqueue_operation_e op;
-
- void *buf;
- pj_size_t size;
- unsigned flags;
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-};
-
-struct write_operation
-{
- PJ_DECL_LIST_MEMBER(struct write_operation);
- pj_ioqueue_operation_e op;
-
- char *buf;
- pj_size_t size;
- pj_ssize_t written;
- unsigned flags;
- pj_sockaddr_in rmt_addr;
- int rmt_addrlen;
-};
-
-#if PJ_HAS_TCP
-struct accept_operation
-{
- PJ_DECL_LIST_MEMBER(struct accept_operation);
- pj_ioqueue_operation_e op;
-
- pj_sock_t *accept_fd;
- pj_sockaddr_t *local_addr;
- pj_sockaddr_t *rmt_addr;
- int *addrlen;
-};
-#endif
-
-union operation_key
-{
- struct generic_operation generic;
- struct read_operation read;
- struct write_operation write;
-#if PJ_HAS_TCP
- struct accept_operation accept;
-#endif
-};
-
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
-{
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
- pj_ioqueue_t *ioqueue;
- pj_sock_t fd;
- void *user_data;
- pj_ioqueue_callback cb;
- int connecting;
- struct read_operation read_list;
- struct write_operation write_list;
-#if PJ_HAS_TCP
- struct accept_operation accept_list;
-#endif
+{
+ DECLARE_COMMON_KEY
};
/*
@@ -147,8 +86,8 @@ struct pj_ioqueue_key_t
*/
struct pj_ioqueue_t
{
- pj_lock_t *lock;
- pj_bool_t auto_delete_lock;
+ DECLARE_COMMON_IOQUEUE
+
unsigned max, count;
pj_ioqueue_key_t key_list;
pj_fd_set_t rfdset;
@@ -157,6 +96,11 @@ struct pj_ioqueue_t
pj_fd_set_t xfdset;
#endif
};
+
+/* Include implementation for common abstraction after we declare
+ * pj_ioqueue_key_t and pj_ioqueue_t.
+ */
+#include "ioqueue_common_abs.c"
/*
* pj_ioqueue_create()
@@ -167,7 +111,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioqueue;
+ pj_ioqueue_t *ioqueue;
+ pj_lock_t *lock;
pj_status_t rc;
/* Check that arguments are valid. */
@@ -179,7 +124,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
- ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+
+ ioqueue_init(ioqueue);
+
ioqueue->max = max_fd;
ioqueue->count = 0;
PJ_FD_ZERO(&ioqueue->rfdset);
@@ -189,11 +137,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
#endif
pj_list_init(&ioqueue->key_list);
- rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
+ rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
- ioqueue->auto_delete_lock = PJ_TRUE;
+ rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
+ if (rc != PJ_SUCCESS)
+ return rc;
PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
@@ -208,16 +158,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
- pj_status_t rc = PJ_SUCCESS;
-
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
-
- if (ioqueue->auto_delete_lock)
- rc = pj_lock_destroy(ioqueue->lock);
-
- return rc;
+ return ioqueue_destroy(ioqueue);
}
@@ -260,17 +204,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
/* Create key. */
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- key->ioqueue = ioqueue;
- key->fd = sock;
- key->user_data = user_data;
- pj_list_init(&key->read_list);
- pj_list_init(&key->write_list);
-#if PJ_HAS_TCP
- pj_list_init(&key->accept_list);
-#endif
-
- /* Save callback. */
- pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ if (rc != PJ_SUCCESS)
+ return rc;
/* Register */
pj_list_insert_before(&ioqueue->key_list, key);
@@ -296,7 +232,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
PJ_ASSERT_RETURN(key, PJ_EINVAL);
ioqueue = key->ioqueue;
-
+
pj_lock_acquire(ioqueue->lock);
pj_assert(ioqueue->count > 0);
@@ -308,39 +244,20 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
- pj_lock_release(ioqueue->lock);
- return PJ_SUCCESS;
-}
+ /* ioqueue_destroy may try to acquire key's mutex.
+ * Since normally the order of locking is to lock key's mutex first
+ * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
+ * release ioqueue's mutex first.
+ */
+ pj_lock_release(ioqueue->lock);
+
+ /* Destroy the key. */
+ ioqueue_destroy_key(key);
-/*
- * pj_ioqueue_get_user_data()
- *
- * Obtain value associated with a key.
- */
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key != NULL, NULL);
- return key->user_data;
+ return PJ_SUCCESS;
}
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data)
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
- key->user_data = user_data;
-
- return PJ_SUCCESS;
-}
-
-
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
@@ -390,7 +307,54 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
}
}
#endif /* VALIDATE_FD_SET */
+
+/* ioqueue_remove_from_set()
+ * This function is called from ioqueue_dispatch_event() to instruct
+ * the ioqueue to remove the specified descriptor from ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type)
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
+
+/*
+ * ioqueue_add_to_set()
+ * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
+ * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
+ * set for the specified event.
+ */
+static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
+ pj_sock_t fd,
+ enum ioqueue_event_type event_type )
+{
+ pj_lock_acquire(ioqueue->lock);
+
+ if (event_type == READABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+ else if (event_type == WRITEABLE_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+ else if (event_type == EXCEPTION_EVENT)
+ PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+ else
+ pj_assert(0);
+
+ pj_lock_release(ioqueue->lock);
+}
/*
* pj_ioqueue_poll()
@@ -412,8 +376,13 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
- int count;
+ int count, counter;
pj_ioqueue_key_t *h;
+ struct event
+ {
+ pj_ioqueue_key_t *key;
+ enum event_type event_type;
+ } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
@@ -453,704 +422,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
if (count <= 0)
return count;
+ else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
+ count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
- /* Lock ioqueue again before scanning for signalled sockets.
- * We must strictly use recursive mutex since application may invoke
- * the ioqueue again inside the callback.
+ /* Scan descriptor sets for event and add the events in the event
+ * array to be processed later in this function. We do this so that
+ * events can be processed in parallel without holding ioqueue lock.
*/
pj_lock_acquire(ioqueue->lock);
+
+ counter = 0;
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
h = ioqueue->key_list.next;
-do_writable_scan:
- for ( ; h!=&ioqueue->key_list; h = h->next) {
- if ( (!pj_list_empty(&h->write_list) || h->connecting)
+ for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+ if ( (key_has_pending_write(h) || key_has_pending_connect(h))
&& PJ_FD_ISSET(h->fd, &wfdset))
{
- break;
+ event[counter].key = h;
+ event[counter].event_type = WRITEABLE_EVENT;
+ ++counter;
}
- }
- if (h != &ioqueue->key_list) {
- pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if (h->connecting) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
-#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Clear operation. */
- h->connecting = 0;
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
- PJ_FD_CLR(h->fd, &ioqueue->xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
-
- } else
-#endif /* PJ_HAS_TCP */
- {
- /* Socket is writable. */
- struct write_operation *write_op;
- pj_ssize_t sent;
- pj_status_t send_rc;
-
- /* Get the first in the queue. */
- write_op = h->write_list.next;
-
- /* Send the data. */
- sent = write_op->size - write_op->written;
- if (write_op->op == PJ_IOQUEUE_OP_SEND) {
- send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
- &sent, write_op->flags);
- } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
- send_rc = pj_sock_sendto(h->fd,
- write_op->buf+write_op->written,
- &sent, write_op->flags,
- &write_op->rmt_addr,
- write_op->rmt_addrlen);
- } else {
- pj_assert(!"Invalid operation type!");
- send_rc = PJ_EBUG;
- }
- if (send_rc == PJ_SUCCESS) {
- write_op->written += sent;
- } else {
- pj_assert(send_rc > 0);
- write_op->written = -send_rc;
- }
-
- /* In any case we don't need to process this descriptor again. */
- PJ_FD_CLR(h->fd, &wfdset);
-
- /* Are we finished with this buffer? */
- if (send_rc!=PJ_SUCCESS ||
- write_op->written == (pj_ssize_t)write_op->size)
- {
- pj_list_erase(write_op);
-
- /* Clear operation if there's no more data to send. */
- if (pj_list_empty(&h->write_list))
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
-
- /* Call callback. */
- if (h->cb.on_write_complete) {
- (*h->cb.on_write_complete)(h,
- (pj_ioqueue_op_key_t*)write_op,
- write_op->written);
- }
- }
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
- }
- }
+ /* Scan for readable socket. */
+ if ((key_has_pending_read(h) || key_has_pending_accept(h))
+ && PJ_FD_ISSET(h->fd, &rfdset))
+ {
+ event[counter].key = h;
+ event[counter].event_type = READABLE_EVENT;
+ ++counter; }
- /* Scan for readable socket. */
- h = ioqueue->key_list.next;
-do_readable_scan:
- for ( ; h!=&ioqueue->key_list; h = h->next) {
- if ((!pj_list_empty(&h->read_list)
#if PJ_HAS_TCP
- || !pj_list_empty(&h->accept_list)
-#endif
- ) && PJ_FD_ISSET(h->fd, &rfdset))
- {
- break;
- }
- }
- if (h != &ioqueue->key_list) {
- pj_status_t rc;
-
-#if PJ_HAS_TCP
- pj_assert(!pj_list_empty(&h->read_list) ||
- !pj_list_empty(&h->accept_list));
-#else
- pj_assert(!pj_list_empty(&h->read_list));
+ if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+ event[counter].key = h;
+ event[counter].event_type = EXCEPTION_EVENT;
+ ++counter;
+ }
#endif
-
-# if PJ_HAS_TCP
- if (!pj_list_empty(&h->accept_list)) {
-
- struct accept_operation *accept_op;
-
- /* Get one accept operation from the list. */
- accept_op = h->accept_list.next;
- pj_list_erase(accept_op);
-
- rc=pj_sock_accept(h->fd, accept_op->accept_fd,
- accept_op->rmt_addr, accept_op->addrlen);
- if (rc==PJ_SUCCESS && accept_op->local_addr) {
- rc = pj_sock_getsockname(*accept_op->accept_fd,
- accept_op->local_addr,
- accept_op->addrlen);
- }
-
- /* Clear bit in fdset if there is no more pending accept */
- if (pj_list_empty(&h->accept_list))
- PJ_FD_CLR(h->fd, &ioqueue->rfdset);
-
- /* Call callback. */
- if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
- *accept_op->accept_fd, rc);
-
- /* Re-scan readable sockets. */
- goto do_readable_scan;
- }
- else {
-# endif
- struct read_operation *read_op;
- pj_ssize_t bytes_read;
-
- pj_assert(!pj_list_empty(&h->read_list));
-
- /* Get one pending read operation from the list. */
- read_op = h->read_list.next;
- pj_list_erase(read_op);
-
- bytes_read = read_op->size;
-
- if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
- read_op->rmt_addr,
- read_op->rmt_addrlen);
- } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- } else {
- pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
- /*
- * User has specified pj_ioqueue_read().
- * On Win32, we should do ReadFile(). But because we got
- * here because of select() anyway, user must have put a
- * socket descriptor on h->fd, which in this case we can
- * just call pj_sock_recv() instead of ReadFile().
- * On Unix, user may put a file in h->fd, so we'll have
- * to call read() here.
- * This may not compile on systems which doesn't have
- * read(). That's why we only specify PJ_LINUX here so
- * that error is easier to catch.
- */
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
- //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
- // &bytes_read, NULL);
-# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
- bytes_read = read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
-# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
- bytes_read = sys_read(h->fd, h->rd_buf, bytes_read);
- rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
-# else
-# error "Implement read() for this platform!"
-# endif
- }
-
- if (rc != PJ_SUCCESS) {
-# if defined(PJ_WIN32) && PJ_WIN32 != 0
- /* On Win32, for UDP, WSAECONNRESET on the receive side
- * indicates that previous sending has triggered ICMP Port
- * Unreachable message.
- * But we wouldn't know at this point which one of previous
- * key that has triggered the error, since UDP socket can
- * be shared!
- * So we'll just ignore it!
- */
-
- if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- //PJ_LOG(4,(THIS_FILE,
- // "Ignored ICMP port unreach. on key=%p", h));
- }
-# endif
-
- /* In any case we would report this to caller. */
- bytes_read = -rc;
- }
-
- /* Clear fdset if there is no pending read. */
- if (pj_list_empty(&h->read_list))
- PJ_FD_CLR(h->fd, &ioqueue->rfdset);
-
- /* In any case clear from temporary set. */
- PJ_FD_CLR(h->fd, &rfdset);
-
- /* Call callback. */
- if (h->cb.on_read_complete)
- (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
- bytes_read);
-
- /* Re-scan readable sockets. */
- goto do_readable_scan;
-
- }
- }
-
-#if PJ_HAS_TCP
- /* Scan for exception socket for TCP connection error. */
- h = ioqueue->key_list.next;
-do_except_scan:
- for ( ; h!=&ioqueue->key_list; h = h->next) {
- if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
- break;
- }
- if (h != &ioqueue->key_list) {
-
- pj_assert(h->connecting);
-
- /* Clear operation. */
- h->connecting = 0;
- PJ_FD_CLR(h->fd, &ioqueue->wfdset);
- PJ_FD_CLR(h->fd, &ioqueue->xfdset);
- PJ_FD_CLR(h->fd, &wfdset);
- PJ_FD_CLR(h->fd, &xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-
- /* Re-scan exception list. */
- goto do_except_scan;
- }
-#endif /* PJ_HAS_TCP */
-
- /* Shouldn't happen. */
- /* For strange reason on WinXP select() can return 1 while there is no
- * pj_fd_set_t signaled. */
- /* pj_assert(0); */
-
- //count = 0;
-
- pj_lock_release(ioqueue->lock);
- return count;
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Start asynchronous recv() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags )
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
- pj_ioqueue_t *ioqueue;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recv(key->fd, buffer, &size, flags);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
- }
-
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
-
- pj_list_insert_before(&key->read_list, read_op);
- PJ_FD_SET(key->fd, &ioqueue->rfdset);
-
- pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Start asynchronous recvfrom() from the socket.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- void *buffer,
- pj_ssize_t *length,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- pj_status_t status;
- pj_ssize_t size;
- struct read_operation *read_op;
- pj_ioqueue_t *ioqueue;
-
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Try to see if there's data immediately available.
- */
- size = *length;
- status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
- addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! Data is available! */
- *length = size;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
- return status;
}
- /*
- * No data is immediately available.
- * Must schedule asynchronous operation to the ioqueue.
- */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- read_op = (struct read_operation*)op_key;
-
- read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
- read_op->buf = buffer;
- read_op->size = *length;
- read_op->flags = flags;
- read_op->rmt_addr = addr;
- read_op->rmt_addrlen = addrlen;
-
- pj_list_insert_before(&key->read_list, read_op);
- PJ_FD_SET(key->fd, &ioqueue->rfdset);
-
pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_send()
- *
- * Start asynchronous send() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags)
-{
- pj_ioqueue_t *ioqueue;
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_send(key->fd, data, &sent, flags);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
+ count = counter;
- /*
- * Schedule asynchronous send.
+ /* Now process all events. The dispatch functions will take care
+ * of locking in each of the key
*/
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
-
- pj_list_insert_before(&key->write_list, write_op);
- PJ_FD_SET(key->fd, &ioqueue->wfdset);
-
- pj_lock_release(ioqueue->lock);
-
- return PJ_EPENDING;
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- const void *data,
- pj_ssize_t *length,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- pj_ioqueue_t *ioqueue;
- struct write_operation *write_op;
- pj_status_t status;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* Fast track:
- * Try to send data immediately, only if there's no pending write!
- * Note:
- * We are speculating that the list is empty here without properly
- * acquiring ioqueue's mutex first. This is intentional, to maximize
- * performance via parallelism.
- *
- * This should be safe, because:
- * - by convention, we require caller to make sure that the
- * key is not unregistered while other threads are invoking
- * an operation on the same key.
- * - pj_list_empty() is safe to be invoked by multiple threads,
- * even when other threads are modifying the list.
- */
- if (pj_list_empty(&key->write_list)) {
- /*
- * See if data can be sent immediately.
- */
- sent = *length;
- status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Success! */
- *length = sent;
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
+ for (counter=0; counter<count; ++counter) {
+ switch (event[counter].event_type) {
+ case READABLE_EVENT:
+ ioqueue_dispatch_read_event(ioqueue, event[counter].key);
+ break;
+ case WRITEABLE_EVENT:
+ ioqueue_dispatch_write_event(ioqueue, event[counter].key);
+ break;
+ case EXCEPTION_EVENT:
+ ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
+ break;
+ case NO_EVENT:
+ default:
+ pj_assert(!"Invalid event!");
+ break;
}
}
-
- /*
- * Check that address storage can hold the address parameter.
- */
- PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
-
- /*
- * Schedule asynchronous send.
- */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
-
- write_op = (struct write_operation*)op_key;
- write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
- write_op->size = *length;
- write_op->written = 0;
- write_op->flags = flags;
- pj_memcpy(&write_op->rmt_addr, addr, addrlen);
- write_op->rmt_addrlen = addrlen;
-
- pj_list_insert_before(&key->write_list, write_op);
- PJ_FD_SET(key->fd, &ioqueue->wfdset);
-
- pj_lock_release(ioqueue->lock);
-
- return PJ_EPENDING;
-}
-#if PJ_HAS_TCP
-/*
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- pj_ioqueue_t *ioqueue;
- struct accept_operation *accept_op;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
-
- /* Fast track:
- * See if there's new connection available immediately.
- */
- if (pj_list_empty(&key->accept_list)) {
- status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
- if (status == PJ_SUCCESS) {
- /* Yes! New connection is available! */
- if (local && addrlen) {
- status = pj_sock_getsockname(*new_sock, local, addrlen);
- if (status != PJ_SUCCESS) {
- pj_sock_close(*new_sock);
- *new_sock = PJ_INVALID_SOCKET;
- return status;
- }
- }
- return PJ_SUCCESS;
- } else {
- /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
- * the error to caller.
- */
- if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
- return status;
- }
- }
- }
-
- /*
- * No connection is available immediately.
- * Schedule accept() operation to be completed when there is incoming
- * connection available.
- */
- ioqueue = key->ioqueue;
- accept_op = (struct accept_operation*)op_key;
-
- pj_lock_acquire(ioqueue->lock);
-
- accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
- accept_op->accept_fd = new_sock;
- accept_op->rmt_addr = remote;
- accept_op->addrlen= addrlen;
- accept_op->local_addr = local;
-
- pj_list_insert_before(&key->accept_list, accept_op);
- PJ_FD_SET(key->fd, &ioqueue->rfdset);
-
- pj_lock_release(ioqueue->lock);
-
- return PJ_EPENDING;
-}
-
-/*
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- pj_ioqueue_t *ioqueue;
- pj_status_t status;
-
- /* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
-
- /* Check if socket has not been marked for connecting */
- if (key->connecting != 0)
- return PJ_EPENDING;
-
- status = pj_sock_connect(key->fd, addr, addrlen);
- if (status == PJ_SUCCESS) {
- /* Connected! */
- return PJ_SUCCESS;
- } else {
- if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
- /* Pending! */
- ioqueue = key->ioqueue;
- pj_lock_acquire(ioqueue->lock);
- key->connecting = PJ_TRUE;
- PJ_FD_SET(key->fd, &ioqueue->wfdset);
- PJ_FD_SET(key->fd, &ioqueue->xfdset);
- pj_lock_release(ioqueue->lock);
- return PJ_EPENDING;
- } else {
- /* Error! */
- return status;
- }
- }
+ return count;
}
-#endif /* PJ_HAS_TCP */