summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_select.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r--pjlib/src/pj/ioqueue_select.c1235
1 files changed, 723 insertions, 512 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 37d6f661..367ffb5e 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -33,23 +33,33 @@
*
*/
#define THIS_FILE "ioq_select"
+
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+# error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp The descriptor set.
+ *
+ * @return Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
-#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \
- (op & PJ_IOQUEUE_OP_RECV) || \
- (op & PJ_IOQUEUE_OP_RECV_FROM))
-#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \
- (op & PJ_IOQUEUE_OP_SEND) || \
- (op & PJ_IOQUEUE_OP_SEND_TO))
-#if PJ_HAS_TCP
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT)
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT)
-#else
-# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0
-# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0
-#endif
-
/*
* During debugging build, VALIDATE_FD_SET is set.
* This will check the validity of the fd_sets.
@@ -59,31 +69,77 @@
#else
# define VALIDATE_FD_SET 0
#endif
+
+struct generic_operation
+{
+ PJ_DECL_LIST_MEMBER(struct generic_operation);
+ pj_ioqueue_operation_e op;
+};
+
+struct read_operation
+{
+ PJ_DECL_LIST_MEMBER(struct read_operation);
+ pj_ioqueue_operation_e op;
+
+ void *buf;
+ pj_size_t size;
+ unsigned flags;
+ pj_sockaddr_t *rmt_addr;
+ int *rmt_addrlen;
+};
+
+struct write_operation
+{
+ PJ_DECL_LIST_MEMBER(struct write_operation);
+ pj_ioqueue_operation_e op;
+
+ char *buf;
+ pj_size_t size;
+ pj_ssize_t written;
+ unsigned flags;
+ pj_sockaddr_in rmt_addr;
+ int rmt_addrlen;
+};
+
+#if PJ_HAS_TCP
+struct accept_operation
+{
+ PJ_DECL_LIST_MEMBER(struct accept_operation);
+ pj_ioqueue_operation_e op;
+
+ pj_sock_t *accept_fd;
+ pj_sockaddr_t *local_addr;
+ pj_sockaddr_t *rmt_addr;
+ int *addrlen;
+};
+#endif
+
+union operation_key
+{
+ struct generic_operation generic;
+ struct read_operation read;
+ struct write_operation write;
+#if PJ_HAS_TCP
+ struct accept_operation accept;
+#endif
+};
/*
* This describes each key.
*/
struct pj_ioqueue_key_t
{
- PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t)
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+ pj_ioqueue_t *ioqueue;
pj_sock_t fd;
- pj_ioqueue_operation_e op;
void *user_data;
- pj_ioqueue_callback cb;
-
- void *rd_buf;
- unsigned rd_flags;
- pj_size_t rd_buflen;
- void *wr_buf;
- pj_size_t wr_buflen;
-
- pj_sockaddr_t *rmt_addr;
- int *rmt_addrlen;
-
- pj_sockaddr_t *local_addr;
- int *local_addrlen;
-
- pj_sock_t *accept_fd;
+ pj_ioqueue_callback cb;
+ int connecting;
+ struct read_operation read_list;
+ struct write_operation write_list;
+#if PJ_HAS_TCP
+ struct accept_operation accept_list;
+#endif
};
/*
@@ -94,7 +150,7 @@ struct pj_ioqueue_t
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
unsigned max, count;
- pj_ioqueue_key_t hlist;
+ pj_ioqueue_key_t key_list;
pj_fd_set_t rfdset;
pj_fd_set_t wfdset;
#if PJ_HAS_TCP
@@ -109,38 +165,39 @@ struct pj_ioqueue_t
*/
PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_size_t max_fd,
- int max_threads,
pj_ioqueue_t **p_ioqueue)
{
- pj_ioqueue_t *ioque;
+ pj_ioqueue_t *ioqueue;
pj_status_t rc;
-
- PJ_UNUSED_ARG(max_threads);
-
- if (max_fd > PJ_IOQUEUE_MAX_HANDLES) {
- pj_assert(!"max_fd too large");
- return PJ_EINVAL;
- }
-
- ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
- ioque->max = max_fd;
- ioque->count = 0;
- PJ_FD_ZERO(&ioque->rfdset);
- PJ_FD_ZERO(&ioque->wfdset);
+
+ /* Check that arguments are valid. */
+ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
+ max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
+ PJ_EINVAL);
+
+ /* Check that size of pj_ioqueue_op_key_t is sufficient */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
+ ioqueue->max = max_fd;
+ ioqueue->count = 0;
+ PJ_FD_ZERO(&ioqueue->rfdset);
+ PJ_FD_ZERO(&ioqueue->wfdset);
#if PJ_HAS_TCP
- PJ_FD_ZERO(&ioque->xfdset);
+ PJ_FD_ZERO(&ioqueue->xfdset);
#endif
- pj_list_init(&ioque->hlist);
+ pj_list_init(&ioqueue->key_list);
- rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock);
+ rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock);
if (rc != PJ_SUCCESS)
return rc;
- ioque->auto_delete_lock = PJ_TRUE;
+ ioqueue->auto_delete_lock = PJ_TRUE;
- PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque));
+ PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
- *p_ioqueue = ioque;
+ *p_ioqueue = ioqueue;
return PJ_SUCCESS;
}
@@ -149,46 +206,28 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*
* Destroy ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque)
+PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
pj_status_t rc = PJ_SUCCESS;
- PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->auto_delete_lock)
- rc = pj_lock_destroy(ioque->lock);
+ if (ioqueue->auto_delete_lock)
+ rc = pj_lock_destroy(ioqueue->lock);
return rc;
}
/*
- * pj_ioqueue_set_lock()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
- if (ioque->auto_delete_lock) {
- pj_lock_destroy(ioque->lock);
- }
-
- ioque->lock = lock;
- ioque->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-
-/*
* pj_ioqueue_register_sock()
*
* Register a handle to ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioque,
+ pj_ioqueue_t *ioqueue,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb,
@@ -198,12 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_uint32_t value;
pj_status_t rc = PJ_SUCCESS;
- PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET &&
+ PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
cb && p_key, PJ_EINVAL);
- pj_lock_acquire(ioque->lock);
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->count >= ioque->max) {
+ if (ioqueue->count >= ioqueue->max) {
rc = PJ_ETOOMANY;
goto on_return;
}
@@ -211,7 +250,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
/* Set socket to nonblocking. */
value = 1;
#ifdef PJ_WIN32
- if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) {
+ if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) {
#else
if (ioctl(sock, FIONBIO, &value)) {
#endif
@@ -220,20 +259,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Create key. */
- key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ key->ioqueue = ioqueue;
key->fd = sock;
- key->user_data = user_data;
-
+ key->user_data = user_data;
+ pj_list_init(&key->read_list);
+ pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+ pj_list_init(&key->accept_list);
+#endif
+
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
/* Register */
- pj_list_insert_before(&ioque->hlist, key);
- ++ioque->count;
+ pj_list_insert_before(&ioqueue->key_list, key);
+ ++ioqueue->count;
-on_return:
+on_return:
+ /* On error, socket may be left in non-blocking mode. */
*p_key = key;
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return rc;
}
@@ -243,23 +289,26 @@ on_return:
*
* Unregister handle from ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key)
-{
- PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
+{
+ pj_ioqueue_t *ioqueue;
+
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ ioqueue = key->ioqueue;
- pj_lock_acquire(ioque->lock);
+ pj_lock_acquire(ioqueue->lock);
- pj_assert(ioque->count > 0);
- --ioque->count;
+ pj_assert(ioqueue->count > 0);
+ --ioqueue->count;
pj_list_erase(key);
- PJ_FD_CLR(key->fd, &ioque->rfdset);
- PJ_FD_CLR(key->fd, &ioque->wfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->rfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->wfdset);
#if PJ_HAS_TCP
- PJ_FD_CLR(key->fd, &ioque->xfdset);
+ PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
-
- pj_lock_release(ioque->lock);
+
+ pj_lock_release(ioqueue->lock);
return PJ_SUCCESS;
}
@@ -274,25 +323,40 @@ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
return key->user_data;
}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+ key->user_data = user_data;
+
+ return PJ_SUCCESS;
+}
+
/* This supposed to check whether the fd_set values are consistent
* with the operation currently set in each key.
*/
#if VALIDATE_FD_SET
-static void validate_sets(const pj_ioqueue_t *ioque,
+static void validate_sets(const pj_ioqueue_t *ioqueue,
const pj_fd_set_t *rfdset,
const pj_fd_set_t *wfdset,
const pj_fd_set_t *xfdset)
{
pj_ioqueue_key_t *key;
- key = ioque->hlist.next;
- while (key != &ioque->hlist) {
- if ((key->op & PJ_IOQUEUE_OP_READ)
- || (key->op & PJ_IOQUEUE_OP_RECV)
- || (key->op & PJ_IOQUEUE_OP_RECV_FROM)
+ key = ioqueue->key_list.next;
+ while (key != &ioqueue->key_list) {
+ if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- || (key->op & PJ_IOQUEUE_OP_ACCEPT)
+ || !pj_list_empty(&key->accept_list)
#endif
)
{
@@ -301,11 +365,9 @@ static void validate_sets(const pj_ioqueue_t *ioque,
else {
pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
}
- if ((key->op & PJ_IOQUEUE_OP_WRITE)
- || (key->op & PJ_IOQUEUE_OP_SEND)
- || (key->op & PJ_IOQUEUE_OP_SEND_TO)
+ if (!pj_list_empty(&key->write_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- || (key->op & PJ_IOQUEUE_OP_CONNECT)
+ || key->connecting
#endif
)
{
@@ -315,7 +377,7 @@ static void validate_sets(const pj_ioqueue_t *ioque,
pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
- if (key->op & PJ_IOQUEUE_OP_CONNECT)
+ if (key->connecting)
{
pj_assert(PJ_FD_ISSET(key->fd, xfdset));
}
@@ -347,124 +409,263 @@ static void validate_sets(const pj_ioqueue_t *ioque,
* - to guarantee preemptiveness etc, the poll function must strictly
* work on fd_set copy of the ioqueue (not the original one).
*/
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
int count;
pj_ioqueue_key_t *h;
+
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */
- pj_lock_acquire(ioque->lock);
-
- if (PJ_FD_COUNT(&ioque->rfdset)==0 &&
- PJ_FD_COUNT(&ioque->wfdset)==0 &&
- PJ_FD_COUNT(&ioque->xfdset)==0)
+ pj_lock_acquire(ioqueue->lock);
+
+ /* We will only do select() when there are sockets to be polled.
+ * Otherwise select() will return error.
+ */
+ if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
+ PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
+ PJ_FD_COUNT(&ioqueue->xfdset)==0)
{
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
if (timeout)
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
}
/* Copy ioqueue's pj_fd_set_t to local variables. */
- pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t));
- pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
#if PJ_HAS_TCP
- pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t));
+ pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
#else
PJ_FD_ZERO(&xfdset);
#endif
#if VALIDATE_FD_SET
- validate_sets(ioque, &rfdset, &wfdset, &xfdset);
+ validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
#endif
/* Unlock ioqueue before select(). */
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout);
if (count <= 0)
return count;
- /* Lock ioqueue again before scanning for signalled sockets. */
- pj_lock_acquire(ioque->lock);
-
-#if PJ_HAS_TCP
- /* Scan for exception socket */
- h = ioque->hlist.next;
-do_except_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset))
- break;
- }
- if (h != &ioque->hlist) {
- /* 'connect()' should be the only operation. */
- pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT));
-
- /* Clear operation. */
- h->op &= ~(PJ_IOQUEUE_OP_CONNECT);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &ioque->xfdset);
- PJ_FD_CLR(h->fd, &wfdset);
- PJ_FD_CLR(h->fd, &xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, -1);
-
- /* Re-scan exception list. */
- goto do_except_scan;
- }
-#endif /* PJ_HAS_TCP */
+ /* Lock ioqueue again before scanning for signalled sockets.
+ * We must strictly use recursive mutex since application may invoke
+ * the ioqueue again inside the callback.
+ */
+ pj_lock_acquire(ioqueue->lock);
+ /* Scan for writable sockets first to handle piggy-back data
+ * coming with accept().
+ */
+ h = ioqueue->key_list.next;
+do_writable_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if ( (!pj_list_empty(&h->write_list) || h->connecting)
+ && PJ_FD_ISSET(h->fd, &wfdset))
+ {
+ break;
+ }
+ }
+ if (h != &ioqueue->key_list) {
+ pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+ if (h->connecting) {
+ /* Completion of connect() operation */
+ pj_ssize_t bytes_transfered;
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+ /* from connect(2):
+ * On Linux, use getsockopt to read the SO_ERROR option at
+ * level SOL_SOCKET to determine whether connect() completed
+ * successfully (if SO_ERROR is zero).
+ */
+ int value;
+ socklen_t vallen = sizeof(value);
+ int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
+ &value, &vallen);
+ if (gs_rc != 0) {
+ /* Argh!! What to do now???
+ * Just indicate that the socket is connected. The
+ * application will get error as soon as it tries to use
+ * the socket to send/receive.
+ */
+ bytes_transfered = 0;
+ } else {
+ bytes_transfered = value;
+ }
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+ bytes_transfered = 0; /* success */
+#else
+ /* Excellent information in D.J. Bernstein page:
+ * http://cr.yp.to/docs/connect.html
+ *
+ * Seems like the most portable way of detecting connect()
+ * failure is to call getpeername(). If socket is connected,
+ * getpeername() will return 0. If the socket is not connected,
+ * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+ * the right errno through error slippage. This is a combination
+ * of suggestions from Douglas C. Schmidt and Ken Keys.
+ */
+ int gp_rc;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+ bytes_transfered = gp_rc;
+#endif
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+
+ } else
+#endif /* PJ_HAS_TCP */
+ {
+ /* Socket is writable. */
+ struct write_operation *write_op;
+ pj_ssize_t sent;
+ pj_status_t send_rc;
+
+ /* Get the first in the queue. */
+ write_op = h->write_list.next;
+
+ /* Send the data. */
+ sent = write_op->size - write_op->written;
+ if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+ send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+ &sent, write_op->flags);
+ } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+ send_rc = pj_sock_sendto(h->fd,
+ write_op->buf+write_op->written,
+ &sent, write_op->flags,
+ &write_op->rmt_addr,
+ write_op->rmt_addrlen);
+ } else {
+ pj_assert(!"Invalid operation type!");
+ send_rc = PJ_EBUG;
+ }
+
+ if (send_rc == PJ_SUCCESS) {
+ write_op->written += sent;
+ } else {
+ pj_assert(send_rc > 0);
+ write_op->written = -send_rc;
+ }
+
+ /* In any case we don't need to process this descriptor again. */
+ PJ_FD_CLR(h->fd, &wfdset);
+
+ /* Are we finished with this buffer? */
+ if (send_rc!=PJ_SUCCESS ||
+ write_op->written == (pj_ssize_t)write_op->size)
+ {
+ pj_list_erase(write_op);
+
+ /* Clear operation if there's no more data to send. */
+ if (pj_list_empty(&h->write_list))
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+
+ /* Call callback. */
+ if (h->cb.on_write_complete) {
+ (*h->cb.on_write_complete)(h,
+ (pj_ioqueue_op_key_t*)write_op,
+ write_op->written);
+ }
+ }
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+ }
+ }
+
/* Scan for readable socket. */
- h = ioque->hlist.next;
+ h = ioqueue->key_list.next;
do_readable_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) &&
- PJ_FD_ISSET(h->fd, &rfdset))
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if ((!pj_list_empty(&h->read_list)
+#if PJ_HAS_TCP
+ || !pj_list_empty(&h->accept_list)
+#endif
+ ) && PJ_FD_ISSET(h->fd, &rfdset))
{
break;
}
}
- if (h != &ioque->hlist) {
+ if (h != &ioqueue->key_list) {
pj_status_t rc;
- pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) ||
- PJ_IOQUEUE_IS_ACCEPT_OP(h->op));
+#if PJ_HAS_TCP
+ pj_assert(!pj_list_empty(&h->read_list) ||
+ !pj_list_empty(&h->accept_list));
+#else
+ pj_assert(!pj_list_empty(&h->read_list));
+#endif
# if PJ_HAS_TCP
- if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) {
- /* accept() must be the only operation specified on server socket */
- pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT);
-
- rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen);
- if (rc==0 && h->local_addr) {
- rc = pj_sock_getsockname(*h->accept_fd, h->local_addr,
- h->local_addrlen);
+ if (!pj_list_empty(&h->accept_list)) {
+
+ struct accept_operation *accept_op;
+
+ /* Get one accept operation from the list. */
+ accept_op = h->accept_list.next;
+ pj_list_erase(accept_op);
+
+ rc=pj_sock_accept(h->fd, accept_op->accept_fd,
+ accept_op->rmt_addr, accept_op->addrlen);
+ if (rc==PJ_SUCCESS && accept_op->local_addr) {
+ rc = pj_sock_getsockname(*accept_op->accept_fd,
+ accept_op->local_addr,
+ accept_op->addrlen);
}
- h->op &= ~(PJ_IOQUEUE_OP_ACCEPT);
- PJ_FD_CLR(h->fd, &ioque->rfdset);
+ /* Clear bit in fdset if there is no more pending accept */
+ if (pj_list_empty(&h->accept_list))
+ PJ_FD_CLR(h->fd, &ioqueue->rfdset);
/* Call callback. */
if (h->cb.on_accept_complete)
- (*h->cb.on_accept_complete)(h, *h->accept_fd, rc);
+ (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
+ *accept_op->accept_fd, rc);
/* Re-scan readable sockets. */
goto do_readable_scan;
- }
+ }
else {
-# endif
- pj_ssize_t bytes_read = h->rd_buflen;
-
- if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) {
- rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0,
- h->rmt_addr, h->rmt_addrlen);
- } else if ((h->op & PJ_IOQUEUE_OP_RECV)) {
- rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
- } else {
+# endif
+ struct read_operation *read_op;
+ pj_ssize_t bytes_read;
+
+ pj_assert(!pj_list_empty(&h->read_list));
+
+ /* Get one pending read operation from the list. */
+ read_op = h->read_list.next;
+ pj_list_erase(read_op);
+
+ bytes_read = read_op->size;
+
+ if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
+ rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0,
+ read_op->rmt_addr,
+ read_op->rmt_addrlen);
+ } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ } else {
+ pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
/*
* User has specified pj_ioqueue_read().
* On Win32, we should do ReadFile(). But because we got
@@ -478,9 +679,10 @@ do_readable_scan:
* that error is easier to catch.
*/
# if defined(PJ_WIN32) && PJ_WIN32 != 0
- rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0);
-# elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \
- (defined(PJ_SUNOS) && PJ_SUNOS != 0)
+ rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+ // &bytes_read, NULL);
+# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
bytes_read = read(h->fd, h->rd_buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
@@ -503,124 +705,61 @@ do_readable_scan:
*/
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
- PJ_LOG(4,(THIS_FILE,
- "Ignored ICMP port unreach. on key=%p", h));
+ //PJ_LOG(4,(THIS_FILE,
+ // "Ignored ICMP port unreach. on key=%p", h));
}
# endif
/* In any case we would report this to caller. */
bytes_read = -rc;
}
-
- h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV |
- PJ_IOQUEUE_OP_RECV_FROM);
- PJ_FD_CLR(h->fd, &ioque->rfdset);
+
+ /* Clear fdset if there is no pending read. */
+ if (pj_list_empty(&h->read_list))
+ PJ_FD_CLR(h->fd, &ioqueue->rfdset);
+
+ /* In any case clear from temporary set. */
PJ_FD_CLR(h->fd, &rfdset);
/* Call callback. */
if (h->cb.on_read_complete)
- (*h->cb.on_read_complete)(h, bytes_read);
+ (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
+ bytes_read);
/* Re-scan readable sockets. */
goto do_readable_scan;
}
}
-
- /* Scan for writable socket */
- h = ioque->hlist.next;
-do_writable_scan:
- for ( ; h!=&ioque->hlist; h = h->next) {
- if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op))
- && PJ_FD_ISSET(h->fd, &wfdset))
- {
- break;
- }
- }
- if (h != &ioque->hlist) {
- pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) ||
- PJ_IOQUEUE_IS_CONNECT_OP(h->op));
-
-#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
- if ((h->op & PJ_IOQUEUE_OP_CONNECT)) {
- /* Completion of connect() operation */
- pj_ssize_t bytes_transfered;
-
-#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \
- (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0)
- /* from connect(2):
- * On Linux, use getsockopt to read the SO_ERROR option at
- * level SOL_SOCKET to determine whether connect() completed
- * successfully (if SO_ERROR is zero).
- */
- int value;
- socklen_t vallen = sizeof(value);
- int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
- &value, &vallen);
- if (gs_rc != 0) {
- /* Argh!! What to do now???
- * Just indicate that the socket is connected. The
- * application will get error as soon as it tries to use
- * the socket to send/receive.
- */
- bytes_transfered = 0;
- } else {
- bytes_transfered = value;
- }
-#elif defined(PJ_WIN32) && PJ_WIN32!=0
- bytes_transfered = 0; /* success */
-#else
- /* Excellent information in D.J. Bernstein page:
- * http://cr.yp.to/docs/connect.html
- *
- * Seems like the most portable way of detecting connect()
- * failure is to call getpeername(). If socket is connected,
- * getpeername() will return 0. If the socket is not connected,
- * it will return ENOTCONN, and read(fd, &ch, 1) will produce
- * the right errno through error slippage. This is a combination
- * of suggestions from Douglas C. Schmidt and Ken Keys.
- */
- int gp_rc;
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(addr);
-
- gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
- bytes_transfered = gp_rc;
-#endif
-
- /* Clear operation. */
- h->op &= (~PJ_IOQUEUE_OP_CONNECT);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &ioque->xfdset);
-
- /* Call callback. */
- if (h->cb.on_connect_complete)
- (*h->cb.on_connect_complete)(h, bytes_transfered);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
-
- } else
-#endif /* PJ_HAS_TCP */
- {
- /* Completion of write(), send(), or sendto() operation. */
-
- /* Clear operation. */
- h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND |
- PJ_IOQUEUE_OP_SEND_TO);
- PJ_FD_CLR(h->fd, &ioque->wfdset);
- PJ_FD_CLR(h->fd, &wfdset);
-
- /* Call callback. */
- /* All data must have been sent? */
- if (h->cb.on_write_complete)
- (*h->cb.on_write_complete)(h, h->wr_buflen);
-
- /* Re-scan writable sockets. */
- goto do_writable_scan;
- }
- }
-
+
+#if PJ_HAS_TCP
+ /* Scan for exception socket for TCP connection error. */
+ h = ioqueue->key_list.next;
+do_except_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
+ break;
+ }
+ if (h != &ioqueue->key_list) {
+
+ pj_assert(h->connecting);
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+ PJ_FD_CLR(h->fd, &wfdset);
+ PJ_FD_CLR(h->fd, &xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, -1);
+
+ /* Re-scan exception list. */
+ goto do_except_scan;
+ }
+#endif /* PJ_HAS_TCP */
+
/* Shouldn't happen. */
/* For strange reason on WinXP select() can return 1 while there is no
* pj_fd_set_t signaled. */
@@ -628,75 +767,63 @@ do_writable_scan:
//count = 0;
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return count;
}
/*
- * pj_ioqueue_read()
- *
- * Start asynchronous read from the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen)
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_READ;
- key->rd_flags = 0;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- PJ_FD_SET(key->fd, &ioque->rfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-
-/*
* pj_ioqueue_recv()
*
* Start asynchronous recv() from the socket.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags )
-{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
+{
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
- key->op |= PJ_IOQUEUE_OP_RECV;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
- PJ_FD_SET(key->fd, &ioque->rfdset);
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recv(key->fd, buffer, &size, flags);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+
+ pj_list_insert_before(&key->read_list, read_op);
+ PJ_FD_SET(key->fd, &ioqueue->rfdset);
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -705,80 +832,60 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
*
* Start asynchronous recvfrom() from the socket.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
void *buffer,
- pj_size_t buflen,
+ pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
- PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for reading before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV) == 0 &&
- (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0),
- PJ_EBUSY);
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_RECV_FROM;
- key->rd_buf = buffer;
- key->rd_buflen = buflen;
- key->rd_flags = flags;
- key->rmt_addr = addr;
- key->rmt_addrlen = addrlen;
- PJ_FD_SET(key->fd, &ioque->rfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Start asynchronous write() to the descriptor.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen)
-{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_WRITE;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
-
- return PJ_EPENDING;
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+ addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+ read_op->rmt_addr = addr;
+ read_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->read_list, read_op);
+ PJ_FD_SET(key->fd, &ioqueue->rfdset);
+
+ pj_lock_release(ioqueue->lock);
+ return PJ_EPENDING;
}
/*
@@ -786,41 +893,71 @@ PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
*
* Start asynchronous send() to the descriptor.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags)
-{
- pj_status_t rc;
+{
+ pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op;
+ pj_status_t status;
pj_ssize_t sent;
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_send() if it returns error. */
- rc = pj_sock_send(key->fd, data, &sent, flags);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_send(key->fd, data, &sent, flags);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
}
+
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+
+ pj_list_insert_before(&key->write_list, write_op);
+ PJ_FD_SET(key->fd, &ioqueue->wfdset);
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
+ pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -831,75 +968,149 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
*
* Start asynchronous write() to the descriptor.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
const void *data,
- pj_size_t datalen,
+ pj_ssize_t *length,
unsigned flags,
const pj_sockaddr_t *addr,
int addrlen)
{
- pj_status_t rc;
- pj_ssize_t sent;
-
- PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL);
- PJ_CHECK_STACK();
-
- /* For consistency with other ioqueue implementation, we would reject
- * if descriptor has already been submitted for writing before.
- */
- PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND) == 0 &&
- (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0),
- PJ_EBUSY);
-
- sent = datalen;
- /* sent would be -1 after pj_sock_sendto() if it returns error. */
- rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) {
- return rc;
- }
-
- pj_lock_acquire(ioque->lock);
-
- key->op |= PJ_IOQUEUE_OP_SEND_TO;
- key->wr_buf = NULL;
- key->wr_buflen = datalen;
- PJ_FD_SET(key->fd, &ioque->wfdset);
-
- pj_lock_release(ioque->lock);
- return PJ_EPENDING;
+ pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Check that address storage can hold the address parameter.
+ */
+ PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+ pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+ write_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->write_list, write_op);
+ PJ_FD_SET(key->fd, &ioqueue->wfdset);
+
+ pj_lock_release(ioqueue->lock);
+
+ return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* Initiate overlapped accept() operation.
*/
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- /* check parameters. All must be specified! */
- pj_assert(ioqueue && key && new_sock);
-
- /* Server socket must have no other operation! */
- pj_assert(key->op == 0);
-
- pj_lock_acquire(ioqueue->lock);
-
- key->op = PJ_IOQUEUE_OP_ACCEPT;
- key->accept_fd = new_sock;
- key->rmt_addr = remote;
- key->rmt_addrlen = addrlen;
- key->local_addr = local;
- key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
+{
+ pj_ioqueue_t *ioqueue;
+ struct accept_operation *accept_op;
+ pj_status_t status;
+ /* check parameters. All must be specified! */
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+ /* Fast track:
+ * See if there's new connection available immediately.
+ */
+ if (pj_list_empty(&key->accept_list)) {
+ status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! New connection is available! */
+ if (local && addrlen) {
+ status = pj_sock_getsockname(*new_sock, local, addrlen);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(*new_sock);
+ *new_sock = PJ_INVALID_SOCKET;
+ return status;
+ }
+ }
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * No connection is available immediately.
+ * Schedule accept() operation to be completed when there is incoming
+ * connection available.
+ */
+ ioqueue = key->ioqueue;
+ accept_op = (struct accept_operation*)op_key;
+
+ pj_lock_acquire(ioqueue->lock);
+
+ accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
+ accept_op->accept_fd = new_sock;
+ accept_op->rmt_addr = remote;
+ accept_op->addrlen= addrlen;
+ accept_op->local_addr = local;
+
+ pj_list_insert_before(&key->accept_list, accept_op);
PJ_FD_SET(key->fd, &ioqueue->rfdset);
+
+ pj_lock_release(ioqueue->lock);
- pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
}
@@ -907,37 +1118,37 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
-{
- pj_status_t rc;
+{
+ pj_ioqueue_t *ioqueue;
+ pj_status_t status;
/* check parameters. All must be specified! */
- PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
- /* Connecting socket must have no other operation! */
- PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY);
+ /* Check if socket has not been marked for connecting */
+ if (key->connecting != 0)
+ return PJ_EPENDING;
- rc = pj_sock_connect(key->fd, addr, addrlen);
- if (rc == PJ_SUCCESS) {
+ status = pj_sock_connect(key->fd, addr, addrlen);
+ if (status == PJ_SUCCESS) {
/* Connected! */
return PJ_SUCCESS;
} else {
- if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) ||
- rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
- {
- /* Pending! */
+ if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
+ /* Pending! */
+ ioqueue = key->ioqueue;
pj_lock_acquire(ioqueue->lock);
- key->op = PJ_IOQUEUE_OP_CONNECT;
+ key->connecting = PJ_TRUE;
PJ_FD_SET(key->fd, &ioqueue->wfdset);
PJ_FD_SET(key->fd, &ioqueue->xfdset);
pj_lock_release(ioqueue->lock);
return PJ_EPENDING;
} else {
/* Error! */
- return rc;
+ return status;
}
}
}