diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
commit | 7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch) | |
tree | 58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pj | |
parent | 58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff) |
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj')
-rw-r--r-- | pjlib/src/pj/config.c | 2 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 1235 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 834 |
3 files changed, 1157 insertions, 914 deletions
diff --git a/pjlib/src/pj/config.c b/pjlib/src/pj/config.c index 962fbdcf..3354f133 100644 --- a/pjlib/src/pj/config.c +++ b/pjlib/src/pj/config.c @@ -4,7 +4,7 @@ #include <pj/log.h> static const char *id = "config.c"; -const char *PJ_VERSION = "0.3.0-pre1"; +const char *PJ_VERSION = "0.3.0-pre4"; PJ_DEF(void) pj_dump_config(void) { diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 37d6f661..367ffb5e 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -33,23 +33,33 @@ * */ #define THIS_FILE "ioq_select" +
+/*
+ * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
+ * the correct error code.
+ */
+#if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
+# error "Error reporting must be enabled for this function to work!"
+#endif
+
+/**
+ * Get the number of descriptors in the set. This is defined in sock_select.c
+ * This function will only return the number of sockets set from PJ_FD_SET
+ * operation. When the set is modified by other means (such as by select()),
+ * the count will not be reflected here.
+ *
+ * That's why don't export this function in the header file, to avoid
+ * misunderstanding.
+ *
+ * @param fdsetp The descriptor set.
+ *
+ * @return Number of descriptors in the set.
+ */
+PJ_DECL(pj_size_t) PJ_FD_COUNT(const pj_fd_set_t *fdsetp);
+
-#define PJ_IOQUEUE_IS_READ_OP(op) ((op & PJ_IOQUEUE_OP_READ) || \ - (op & PJ_IOQUEUE_OP_RECV) || \ - (op & PJ_IOQUEUE_OP_RECV_FROM)) -#define PJ_IOQUEUE_IS_WRITE_OP(op) ((op & PJ_IOQUEUE_OP_WRITE) || \ - (op & PJ_IOQUEUE_OP_SEND) || \ - (op & PJ_IOQUEUE_OP_SEND_TO)) -#if PJ_HAS_TCP -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) (op & PJ_IOQUEUE_OP_ACCEPT) -# define PJ_IOQUEUE_IS_CONNECT_OP(op) (op & PJ_IOQUEUE_OP_CONNECT) -#else -# define PJ_IOQUEUE_IS_ACCEPT_OP(op) 0 -# define PJ_IOQUEUE_IS_CONNECT_OP(op) 0 -#endif - /* * During debugging build, VALIDATE_FD_SET is set. * This will check the validity of the fd_sets. @@ -59,31 +69,77 @@ #else # define VALIDATE_FD_SET 0 #endif +
+struct generic_operation
+{
+ PJ_DECL_LIST_MEMBER(struct generic_operation);
+ pj_ioqueue_operation_e op;
+};
+
+struct read_operation
+{
+ PJ_DECL_LIST_MEMBER(struct read_operation);
+ pj_ioqueue_operation_e op;
+
+ void *buf;
+ pj_size_t size;
+ unsigned flags;
+ pj_sockaddr_t *rmt_addr;
+ int *rmt_addrlen;
+};
+
+struct write_operation
+{
+ PJ_DECL_LIST_MEMBER(struct write_operation);
+ pj_ioqueue_operation_e op;
+
+ char *buf;
+ pj_size_t size;
+ pj_ssize_t written;
+ unsigned flags;
+ pj_sockaddr_in rmt_addr;
+ int rmt_addrlen;
+};
+
+#if PJ_HAS_TCP
+struct accept_operation
+{
+ PJ_DECL_LIST_MEMBER(struct accept_operation);
+ pj_ioqueue_operation_e op;
+
+ pj_sock_t *accept_fd;
+ pj_sockaddr_t *local_addr;
+ pj_sockaddr_t *rmt_addr;
+ int *addrlen;
+};
+#endif
+
+union operation_key
+{
+ struct generic_operation generic;
+ struct read_operation read;
+ struct write_operation write;
+#if PJ_HAS_TCP
+ struct accept_operation accept;
+#endif
+};
/* * This describes each key. */ struct pj_ioqueue_key_t { - PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t) + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+ pj_ioqueue_t *ioqueue; pj_sock_t fd; - pj_ioqueue_operation_e op; void *user_data; - pj_ioqueue_callback cb; - - void *rd_buf; - unsigned rd_flags; - pj_size_t rd_buflen; - void *wr_buf; - pj_size_t wr_buflen; - - pj_sockaddr_t *rmt_addr; - int *rmt_addrlen; - - pj_sockaddr_t *local_addr; - int *local_addrlen; - - pj_sock_t *accept_fd; + pj_ioqueue_callback cb;
+ int connecting;
+ struct read_operation read_list;
+ struct write_operation write_list;
+#if PJ_HAS_TCP
+ struct accept_operation accept_list;
+#endif }; /* @@ -94,7 +150,7 @@ struct pj_ioqueue_t pj_lock_t *lock; pj_bool_t auto_delete_lock; unsigned max, count; - pj_ioqueue_key_t hlist; + pj_ioqueue_key_t key_list; pj_fd_set_t rfdset; pj_fd_set_t wfdset; #if PJ_HAS_TCP @@ -109,38 +165,39 @@ struct pj_ioqueue_t */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioque; + pj_ioqueue_t *ioqueue; pj_status_t rc; - - PJ_UNUSED_ARG(max_threads); - - if (max_fd > PJ_IOQUEUE_MAX_HANDLES) { - pj_assert(!"max_fd too large"); - return PJ_EINVAL; - } - - ioque = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioque->max = max_fd; - ioque->count = 0; - PJ_FD_ZERO(&ioque->rfdset); - PJ_FD_ZERO(&ioque->wfdset); +
+ /* Check that arguments are valid. */
+ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
+ max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES,
+ PJ_EINVAL);
+
+ /* Check that size of pj_ioqueue_op_key_t is sufficient */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); + ioqueue->max = max_fd; + ioqueue->count = 0; + PJ_FD_ZERO(&ioqueue->rfdset); + PJ_FD_ZERO(&ioqueue->wfdset); #if PJ_HAS_TCP - PJ_FD_ZERO(&ioque->xfdset); + PJ_FD_ZERO(&ioqueue->xfdset); #endif - pj_list_init(&ioque->hlist); + pj_list_init(&ioqueue->key_list); - rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioque->lock); + rc = pj_lock_create_recursive_mutex(pool, "ioq%p", &ioqueue->lock); if (rc != PJ_SUCCESS) return rc; - ioque->auto_delete_lock = PJ_TRUE; + ioqueue->auto_delete_lock = PJ_TRUE; - PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioque)); + PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue)); - *p_ioqueue = ioque; + *p_ioqueue = ioqueue; return PJ_SUCCESS; } @@ -149,46 +206,28 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, * * Destroy ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioque) +PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); +
+ pj_lock_acquire(ioqueue->lock);
- if (ioque->auto_delete_lock) - rc = pj_lock_destroy(ioque->lock); + if (ioqueue->auto_delete_lock) + rc = pj_lock_destroy(ioqueue->lock); return rc; } /* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); - - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); - } - - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; - - return PJ_SUCCESS; -} - - -/* * pj_ioqueue_register_sock() * * Register a handle to ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, + pj_ioqueue_t *ioqueue, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, @@ -198,12 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_uint32_t value; pj_status_t rc = PJ_SUCCESS; - PJ_ASSERT_RETURN(pool && ioque && sock != PJ_INVALID_SOCKET && + PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); - pj_lock_acquire(ioque->lock); + pj_lock_acquire(ioqueue->lock); - if (ioque->count >= ioque->max) { + if (ioqueue->count >= ioqueue->max) { rc = PJ_ETOOMANY; goto on_return; } @@ -211,7 +250,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, /* Set socket to nonblocking. */ value = 1; #ifdef PJ_WIN32 - if (ioctlsocket(sock, FIONBIO, (unsigned long*)&value)) { + if (ioctlsocket(sock, FIONBIO, (u_long*)&value)) { #else if (ioctl(sock, FIONBIO, &value)) { #endif @@ -220,20 +259,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ key->ioqueue = ioqueue; key->fd = sock; - key->user_data = user_data; - + key->user_data = user_data;
+ pj_list_init(&key->read_list);
+ pj_list_init(&key->write_list);
+#if PJ_HAS_TCP
+ pj_list_init(&key->accept_list);
+#endif +
/* Save callback. */ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); /* Register */ - pj_list_insert_before(&ioque->hlist, key); - ++ioque->count; + pj_list_insert_before(&ioqueue->key_list, key); + ++ioqueue->count; -on_return: +on_return:
+ /* On error, socket may be left in non-blocking mode. */ *p_key = key; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return rc; } @@ -243,23 +289,26 @@ on_return: * * Unregister handle from ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key) -{ - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) +{
+ pj_ioqueue_t *ioqueue;
+ + PJ_ASSERT_RETURN(key, PJ_EINVAL); +
+ ioqueue = key->ioqueue;
- pj_lock_acquire(ioque->lock); + pj_lock_acquire(ioqueue->lock); - pj_assert(ioque->count > 0); - --ioque->count; + pj_assert(ioqueue->count > 0); + --ioqueue->count; pj_list_erase(key); - PJ_FD_CLR(key->fd, &ioque->rfdset); - PJ_FD_CLR(key->fd, &ioque->wfdset); + PJ_FD_CLR(key->fd, &ioqueue->rfdset); + PJ_FD_CLR(key->fd, &ioqueue->wfdset); #if PJ_HAS_TCP - PJ_FD_CLR(key->fd, &ioque->xfdset); + PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - - pj_lock_release(ioque->lock); +
+ pj_lock_release(ioqueue->lock); return PJ_SUCCESS; } @@ -274,25 +323,40 @@ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) return key->user_data; } +
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+ key->user_data = user_data;
+
+ return PJ_SUCCESS;
+}
+
/* This supposed to check whether the fd_set values are consistent * with the operation currently set in each key. */ #if VALIDATE_FD_SET -static void validate_sets(const pj_ioqueue_t *ioque, +static void validate_sets(const pj_ioqueue_t *ioqueue, const pj_fd_set_t *rfdset, const pj_fd_set_t *wfdset, const pj_fd_set_t *xfdset) { pj_ioqueue_key_t *key; - key = ioque->hlist.next; - while (key != &ioque->hlist) { - if ((key->op & PJ_IOQUEUE_OP_READ) - || (key->op & PJ_IOQUEUE_OP_RECV) - || (key->op & PJ_IOQUEUE_OP_RECV_FROM) + key = ioqueue->key_list.next; + while (key != &ioqueue->key_list) { + if (!pj_list_empty(&key->read_list) #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - || (key->op & PJ_IOQUEUE_OP_ACCEPT) + || !pj_list_empty(&key->accept_list) #endif ) { @@ -301,11 +365,9 @@ static void validate_sets(const pj_ioqueue_t *ioque, else { pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0); } - if ((key->op & PJ_IOQUEUE_OP_WRITE) - || (key->op & PJ_IOQUEUE_OP_SEND) - || (key->op & PJ_IOQUEUE_OP_SEND_TO) + if (!pj_list_empty(&key->write_list) #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - || (key->op & PJ_IOQUEUE_OP_CONNECT) + || key->connecting #endif ) { @@ -315,7 +377,7 @@ static void validate_sets(const pj_ioqueue_t *ioque, pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0); } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 - if (key->op & PJ_IOQUEUE_OP_CONNECT) + if (key->connecting) { pj_assert(PJ_FD_ISSET(key->fd, xfdset)); } @@ -347,124 +409,263 @@ static void validate_sets(const pj_ioqueue_t *ioque, * - to guarantee preemptiveness etc, the poll function must strictly * work on fd_set copy of the ioqueue (not the original one). */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; int count; pj_ioqueue_key_t *h; +
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
/* Lock ioqueue before making fd_set copies */ - pj_lock_acquire(ioque->lock); - - if (PJ_FD_COUNT(&ioque->rfdset)==0 && - PJ_FD_COUNT(&ioque->wfdset)==0 && - PJ_FD_COUNT(&ioque->xfdset)==0) + pj_lock_acquire(ioqueue->lock); +
+ /* We will only do select() when there are sockets to be polled.
+ * Otherwise select() will return error.
+ */ + if (PJ_FD_COUNT(&ioqueue->rfdset)==0 && + PJ_FD_COUNT(&ioqueue->wfdset)==0 && + PJ_FD_COUNT(&ioqueue->xfdset)==0) { - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); if (timeout) pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); return 0; } /* Copy ioqueue's pj_fd_set_t to local variables. */ - pj_memcpy(&rfdset, &ioque->rfdset, sizeof(pj_fd_set_t)); - pj_memcpy(&wfdset, &ioque->wfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t)); #if PJ_HAS_TCP - pj_memcpy(&xfdset, &ioque->xfdset, sizeof(pj_fd_set_t)); + pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t)); #else PJ_FD_ZERO(&xfdset); #endif #if VALIDATE_FD_SET - validate_sets(ioque, &rfdset, &wfdset, &xfdset); + validate_sets(ioqueue, &rfdset, &wfdset, &xfdset); #endif /* Unlock ioqueue before select(). */ - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); count = pj_sock_select(FD_SETSIZE, &rfdset, &wfdset, &xfdset, timeout); if (count <= 0) return count; - /* Lock ioqueue again before scanning for signalled sockets. */ - pj_lock_acquire(ioque->lock); - -#if PJ_HAS_TCP - /* Scan for exception socket */ - h = ioque->hlist.next; -do_except_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((h->op & PJ_IOQUEUE_OP_CONNECT) && PJ_FD_ISSET(h->fd, &xfdset)) - break; - } - if (h != &ioque->hlist) { - /* 'connect()' should be the only operation. */ - pj_assert((h->op == PJ_IOQUEUE_OP_CONNECT)); - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_CONNECT); - PJ_FD_CLR(h->fd, &ioque->wfdset); - PJ_FD_CLR(h->fd, &ioque->xfdset); - PJ_FD_CLR(h->fd, &wfdset); - PJ_FD_CLR(h->fd, &xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, -1); - - /* Re-scan exception list. */ - goto do_except_scan; - } -#endif /* PJ_HAS_TCP */ + /* Lock ioqueue again before scanning for signalled sockets.
+ * We must strictly use recursive mutex since application may invoke
+ * the ioqueue again inside the callback.
+ */ + pj_lock_acquire(ioqueue->lock); + /* Scan for writable sockets first to handle piggy-back data
+ * coming with accept().
+ */
+ h = ioqueue->key_list.next;
+do_writable_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if ( (!pj_list_empty(&h->write_list) || h->connecting)
+ && PJ_FD_ISSET(h->fd, &wfdset))
+ {
+ break;
+ }
+ }
+ if (h != &ioqueue->key_list) {
+ pj_assert(!pj_list_empty(&h->write_list) || h->connecting);
+
+#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
+ if (h->connecting) {
+ /* Completion of connect() operation */
+ pj_ssize_t bytes_transfered;
+
+#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
+ /* from connect(2):
+ * On Linux, use getsockopt to read the SO_ERROR option at
+ * level SOL_SOCKET to determine whether connect() completed
+ * successfully (if SO_ERROR is zero).
+ */
+ int value;
+ socklen_t vallen = sizeof(value);
+ int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
+ &value, &vallen);
+ if (gs_rc != 0) {
+ /* Argh!! What to do now???
+ * Just indicate that the socket is connected. The
+ * application will get error as soon as it tries to use
+ * the socket to send/receive.
+ */
+ bytes_transfered = 0;
+ } else {
+ bytes_transfered = value;
+ }
+#elif defined(PJ_WIN32) && PJ_WIN32!=0
+ bytes_transfered = 0; /* success */
+#else
+ /* Excellent information in D.J. Bernstein page:
+ * http://cr.yp.to/docs/connect.html
+ *
+ * Seems like the most portable way of detecting connect()
+ * failure is to call getpeername(). If socket is connected,
+ * getpeername() will return 0. If the socket is not connected,
+ * it will return ENOTCONN, and read(fd, &ch, 1) will produce
+ * the right errno through error slippage. This is a combination
+ * of suggestions from Douglas C. Schmidt and Ken Keys.
+ */
+ int gp_rc;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
+ bytes_transfered = gp_rc;
+#endif
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, bytes_transfered);
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+
+ } else
+#endif /* PJ_HAS_TCP */
+ {
+ /* Socket is writable. */
+ struct write_operation *write_op;
+ pj_ssize_t sent;
+ pj_status_t send_rc;
+
+ /* Get the first in the queue. */
+ write_op = h->write_list.next;
+
+ /* Send the data. */
+ sent = write_op->size - write_op->written;
+ if (write_op->op == PJ_IOQUEUE_OP_SEND) {
+ send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
+ &sent, write_op->flags);
+ } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
+ send_rc = pj_sock_sendto(h->fd,
+ write_op->buf+write_op->written,
+ &sent, write_op->flags,
+ &write_op->rmt_addr,
+ write_op->rmt_addrlen);
+ } else {
+ pj_assert(!"Invalid operation type!");
+ send_rc = PJ_EBUG;
+ }
+
+ if (send_rc == PJ_SUCCESS) {
+ write_op->written += sent;
+ } else {
+ pj_assert(send_rc > 0);
+ write_op->written = -send_rc;
+ }
+
+ /* In any case we don't need to process this descriptor again. */
+ PJ_FD_CLR(h->fd, &wfdset);
+
+ /* Are we finished with this buffer? */
+ if (send_rc!=PJ_SUCCESS ||
+ write_op->written == (pj_ssize_t)write_op->size)
+ {
+ pj_list_erase(write_op);
+
+ /* Clear operation if there's no more data to send. */
+ if (pj_list_empty(&h->write_list))
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+
+ /* Call callback. */
+ if (h->cb.on_write_complete) {
+ (*h->cb.on_write_complete)(h,
+ (pj_ioqueue_op_key_t*)write_op,
+ write_op->written);
+ }
+ }
+
+ /* Re-scan writable sockets. */
+ goto do_writable_scan;
+ }
+ }
+
/* Scan for readable socket. */ - h = ioque->hlist.next; + h = ioqueue->key_list.next; do_readable_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((PJ_IOQUEUE_IS_READ_OP(h->op) || PJ_IOQUEUE_IS_ACCEPT_OP(h->op)) && - PJ_FD_ISSET(h->fd, &rfdset)) + for ( ; h!=&ioqueue->key_list; h = h->next) { + if ((!pj_list_empty(&h->read_list)
+#if PJ_HAS_TCP
+ || !pj_list_empty(&h->accept_list)
+#endif
+ ) && PJ_FD_ISSET(h->fd, &rfdset)) { break; } } - if (h != &ioque->hlist) { + if (h != &ioqueue->key_list) { pj_status_t rc; - pj_assert(PJ_IOQUEUE_IS_READ_OP(h->op) || - PJ_IOQUEUE_IS_ACCEPT_OP(h->op)); +#if PJ_HAS_TCP
+ pj_assert(!pj_list_empty(&h->read_list) ||
+ !pj_list_empty(&h->accept_list));
+#else
+ pj_assert(!pj_list_empty(&h->read_list)); +#endif
# if PJ_HAS_TCP - if ((h->op & PJ_IOQUEUE_OP_ACCEPT)) { - /* accept() must be the only operation specified on server socket */ - pj_assert(h->op == PJ_IOQUEUE_OP_ACCEPT); - - rc=pj_sock_accept(h->fd, h->accept_fd, h->rmt_addr, h->rmt_addrlen); - if (rc==0 && h->local_addr) { - rc = pj_sock_getsockname(*h->accept_fd, h->local_addr, - h->local_addrlen); + if (!pj_list_empty(&h->accept_list)) {
+
+ struct accept_operation *accept_op; +
+ /* Get one accept operation from the list. */ + accept_op = h->accept_list.next;
+ pj_list_erase(accept_op); + + rc=pj_sock_accept(h->fd, accept_op->accept_fd,
+ accept_op->rmt_addr, accept_op->addrlen); + if (rc==PJ_SUCCESS && accept_op->local_addr) { + rc = pj_sock_getsockname(*accept_op->accept_fd,
+ accept_op->local_addr, + accept_op->addrlen); } - h->op &= ~(PJ_IOQUEUE_OP_ACCEPT); - PJ_FD_CLR(h->fd, &ioque->rfdset); + /* Clear bit in fdset if there is no more pending accept */
+ if (pj_list_empty(&h->accept_list))
+ PJ_FD_CLR(h->fd, &ioqueue->rfdset); /* Call callback. */ if (h->cb.on_accept_complete) - (*h->cb.on_accept_complete)(h, *h->accept_fd, rc); + (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op,
+ *accept_op->accept_fd, rc); /* Re-scan readable sockets. */ goto do_readable_scan; - } + } else { -# endif - pj_ssize_t bytes_read = h->rd_buflen; - - if ((h->op & PJ_IOQUEUE_OP_RECV_FROM)) { - rc = pj_sock_recvfrom(h->fd, h->rd_buf, &bytes_read, 0, - h->rmt_addr, h->rmt_addrlen); - } else if ((h->op & PJ_IOQUEUE_OP_RECV)) { - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); - } else { +# endif
+ struct read_operation *read_op;
+ pj_ssize_t bytes_read;
+
+ pj_assert(!pj_list_empty(&h->read_list));
+
+ /* Get one pending read operation from the list. */
+ read_op = h->read_list.next;
+ pj_list_erase(read_op);
+ + bytes_read = read_op->size; + + if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { + rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 0, + read_op->rmt_addr,
+ read_op->rmt_addrlen); + } else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) { + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0); + } else {
+ pj_assert(read_op->op == PJ_IOQUEUE_OP_READ); /* * User has specified pj_ioqueue_read(). * On Win32, we should do ReadFile(). But because we got @@ -478,9 +679,10 @@ do_readable_scan: * that error is easier to catch. */ # if defined(PJ_WIN32) && PJ_WIN32 != 0 - rc = pj_sock_recv(h->fd, h->rd_buf, &bytes_read, 0); -# elif (defined(PJ_LINUX) && PJ_LINUX != 0) || \ - (defined(PJ_SUNOS) && PJ_SUNOS != 0) + rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 0);
+ //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
+ // &bytes_read, NULL);
+# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0) bytes_read = read(h->fd, h->rd_buf, bytes_read); rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error(); # elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0 @@ -503,124 +705,61 @@ do_readable_scan: */ if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) { - PJ_LOG(4,(THIS_FILE, - "Ignored ICMP port unreach. on key=%p", h)); + //PJ_LOG(4,(THIS_FILE, + // "Ignored ICMP port unreach. on key=%p", h)); } # endif /* In any case we would report this to caller. */ bytes_read = -rc; } - - h->op &= ~(PJ_IOQUEUE_OP_READ | PJ_IOQUEUE_OP_RECV | - PJ_IOQUEUE_OP_RECV_FROM); - PJ_FD_CLR(h->fd, &ioque->rfdset); +
+ /* Clear fdset if there is no pending read. */
+ if (pj_list_empty(&h->read_list)) + PJ_FD_CLR(h->fd, &ioqueue->rfdset);
+
+ /* In any case clear from temporary set. */ PJ_FD_CLR(h->fd, &rfdset); /* Call callback. */ if (h->cb.on_read_complete) - (*h->cb.on_read_complete)(h, bytes_read); + (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op,
+ bytes_read); /* Re-scan readable sockets. */ goto do_readable_scan; } } - - /* Scan for writable socket */ - h = ioque->hlist.next; -do_writable_scan: - for ( ; h!=&ioque->hlist; h = h->next) { - if ((PJ_IOQUEUE_IS_WRITE_OP(h->op) || PJ_IOQUEUE_IS_CONNECT_OP(h->op)) - && PJ_FD_ISSET(h->fd, &wfdset)) - { - break; - } - } - if (h != &ioque->hlist) { - pj_assert(PJ_IOQUEUE_IS_WRITE_OP(h->op) || - PJ_IOQUEUE_IS_CONNECT_OP(h->op)); - -#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 - if ((h->op & PJ_IOQUEUE_OP_CONNECT)) { - /* Completion of connect() operation */ - pj_ssize_t bytes_transfered; - -#if (defined(PJ_LINUX) && PJ_LINUX!=0) || \ - (defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL!=0) - /* from connect(2): - * On Linux, use getsockopt to read the SO_ERROR option at - * level SOL_SOCKET to determine whether connect() completed - * successfully (if SO_ERROR is zero). - */ - int value; - socklen_t vallen = sizeof(value); - int gs_rc = getsockopt(h->fd, SOL_SOCKET, SO_ERROR, - &value, &vallen); - if (gs_rc != 0) { - /* Argh!! What to do now??? - * Just indicate that the socket is connected. The - * application will get error as soon as it tries to use - * the socket to send/receive. - */ - bytes_transfered = 0; - } else { - bytes_transfered = value; - } -#elif defined(PJ_WIN32) && PJ_WIN32!=0 - bytes_transfered = 0; /* success */ -#else - /* Excellent information in D.J. Bernstein page: - * http://cr.yp.to/docs/connect.html - * - * Seems like the most portable way of detecting connect() - * failure is to call getpeername(). If socket is connected, - * getpeername() will return 0. If the socket is not connected, - * it will return ENOTCONN, and read(fd, &ch, 1) will produce - * the right errno through error slippage. This is a combination - * of suggestions from Douglas C. Schmidt and Ken Keys. - */ - int gp_rc; - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - - gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); - bytes_transfered = gp_rc; -#endif - - /* Clear operation. */ - h->op &= (~PJ_IOQUEUE_OP_CONNECT); - PJ_FD_CLR(h->fd, &ioque->wfdset); - PJ_FD_CLR(h->fd, &ioque->xfdset); - - /* Call callback. */ - if (h->cb.on_connect_complete) - (*h->cb.on_connect_complete)(h, bytes_transfered); - - /* Re-scan writable sockets. */ - goto do_writable_scan; - - } else -#endif /* PJ_HAS_TCP */ - { - /* Completion of write(), send(), or sendto() operation. */ - - /* Clear operation. */ - h->op &= ~(PJ_IOQUEUE_OP_WRITE | PJ_IOQUEUE_OP_SEND | - PJ_IOQUEUE_OP_SEND_TO); - PJ_FD_CLR(h->fd, &ioque->wfdset); - PJ_FD_CLR(h->fd, &wfdset); - - /* Call callback. */ - /* All data must have been sent? */ - if (h->cb.on_write_complete) - (*h->cb.on_write_complete)(h, h->wr_buflen); - - /* Re-scan writable sockets. */ - goto do_writable_scan; - } - } - +
+#if PJ_HAS_TCP
+ /* Scan for exception socket for TCP connection error. */
+ h = ioqueue->key_list.next;
+do_except_scan:
+ for ( ; h!=&ioqueue->key_list; h = h->next) {
+ if (h->connecting && PJ_FD_ISSET(h->fd, &xfdset))
+ break;
+ }
+ if (h != &ioqueue->key_list) {
+
+ pj_assert(h->connecting);
+
+ /* Clear operation. */
+ h->connecting = 0;
+ PJ_FD_CLR(h->fd, &ioqueue->wfdset);
+ PJ_FD_CLR(h->fd, &ioqueue->xfdset);
+ PJ_FD_CLR(h->fd, &wfdset);
+ PJ_FD_CLR(h->fd, &xfdset);
+
+ /* Call callback. */
+ if (h->cb.on_connect_complete)
+ (*h->cb.on_connect_complete)(h, -1);
+
+ /* Re-scan exception list. */
+ goto do_except_scan;
+ }
+#endif /* PJ_HAS_TCP */
+
/* Shouldn't happen. */ /* For strange reason on WinXP select() can return 1 while there is no * pj_fd_set_t signaled. */ @@ -628,75 +767,63 @@ do_writable_scan: //count = 0; - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return count; } /* - * pj_ioqueue_read() - * - * Start asynchronous read from the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_READ; - key->rd_flags = 0; - key->rd_buf = buffer; - key->rd_buflen = buflen; - PJ_FD_SET(key->fd, &ioque->rfdset); - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - - -/* * pj_ioqueue_recv() * * Start asynchronous recv() from the socket. */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags ) -{ - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); +{
+ pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
- key->op |= PJ_IOQUEUE_OP_RECV; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - PJ_FD_SET(key->fd, &ioque->rfdset); + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + PJ_CHECK_STACK(); +
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recv(key->fd, buffer, &size, flags);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ } +
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue; + pj_lock_acquire(ioqueue->lock); +
+ read_op = (struct read_operation*)op_key;
+ + read_op->op = PJ_IOQUEUE_OP_RECV; + read_op->buf = buffer; + read_op->size = *length; + read_op->flags = flags;
+
+ pj_list_insert_before(&key->read_list, read_op); + PJ_FD_SET(key->fd, &ioqueue->rfdset); - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return PJ_EPENDING; } @@ -705,80 +832,60 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, * * Start asynchronous recvfrom() from the socket. */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen) { - PJ_ASSERT_RETURN(ioque && key && buffer, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for reading before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_READ) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV) == 0 && - (key->op & PJ_IOQUEUE_OP_RECV_FROM) == 0), - PJ_EBUSY); - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_RECV_FROM; - key->rd_buf = buffer; - key->rd_buflen = buflen; - key->rd_flags = flags; - key->rmt_addr = addr; - key->rmt_addrlen = addrlen; - PJ_FD_SET(key->fd, &ioque->rfdset); - - pj_lock_release(ioque->lock); - return PJ_EPENDING; -} - -/* - * pj_ioqueue_write() - * - * Start asynchronous write() to the descriptor. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_WRITE; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); - - return PJ_EPENDING; + pj_status_t status;
+ pj_ssize_t size;
+ struct read_operation *read_op;
+ pj_ioqueue_t *ioqueue;
+
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Try to see if there's data immediately available.
+ */
+ size = *length;
+ status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
+ addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! Data is available! */
+ *length = size;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
+ return status;
+ }
+
+ /*
+ * No data is immediately available.
+ * Must schedule asynchronous operation to the ioqueue.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ read_op = (struct read_operation*)op_key;
+
+ read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
+ read_op->buf = buffer;
+ read_op->size = *length;
+ read_op->flags = flags;
+ read_op->rmt_addr = addr;
+ read_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->read_list, read_op);
+ PJ_FD_SET(key->fd, &ioqueue->rfdset);
+
+ pj_lock_release(ioqueue->lock);
+ return PJ_EPENDING;
} /* @@ -786,41 +893,71 @@ PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, * * Start asynchronous send() to the descriptor. */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags) -{ - pj_status_t rc; +{
+ pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op; + pj_status_t status;
pj_ssize_t sent; - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); + PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_send() if it returns error. */ - rc = pj_sock_send(key->fd, data, &sent, flags); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; +
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */ + sent = *length; + status = pj_sock_send(key->fd, data, &sent, flags); + if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
} +
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock); +
+ write_op = (struct write_operation*)op_key; + write_op->op = PJ_IOQUEUE_OP_SEND; + write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+
+ pj_list_insert_before(&key->write_list, write_op); + PJ_FD_SET(key->fd, &ioqueue->wfdset); - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); + pj_lock_release(ioqueue->lock); return PJ_EPENDING; } @@ -831,75 +968,149 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, * * Start asynchronous write() to the descriptor. */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen) { - pj_status_t rc; - pj_ssize_t sent; - - PJ_ASSERT_RETURN(ioque && key && data, PJ_EINVAL); - PJ_CHECK_STACK(); - - /* For consistency with other ioqueue implementation, we would reject - * if descriptor has already been submitted for writing before. - */ - PJ_ASSERT_RETURN(((key->op & PJ_IOQUEUE_OP_WRITE) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND) == 0 && - (key->op & PJ_IOQUEUE_OP_SEND_TO) == 0), - PJ_EBUSY); - - sent = datalen; - /* sent would be -1 after pj_sock_sendto() if it returns error. */ - rc = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen); - if (rc != PJ_SUCCESS && rc != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) { - return rc; - } - - pj_lock_acquire(ioque->lock); - - key->op |= PJ_IOQUEUE_OP_SEND_TO; - key->wr_buf = NULL; - key->wr_buflen = datalen; - PJ_FD_SET(key->fd, &ioque->wfdset); - - pj_lock_release(ioque->lock); - return PJ_EPENDING; + pj_ioqueue_t *ioqueue;
+ struct write_operation *write_op;
+ pj_status_t status;
+ pj_ssize_t sent;
+
+ PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
+ PJ_CHECK_STACK();
+
+ /* Fast track:
+ * Try to send data immediately, only if there's no pending write!
+ * Note:
+ * We are speculating that the list is empty here without properly
+ * acquiring ioqueue's mutex first. This is intentional, to maximize
+ * performance via parallelism.
+ *
+ * This should be safe, because:
+ * - by convention, we require caller to make sure that the
+ * key is not unregistered while other threads are invoking
+ * an operation on the same key.
+ * - pj_list_empty() is safe to be invoked by multiple threads,
+ * even when other threads are modifying the list.
+ */
+ if (pj_list_empty(&key->write_list)) {
+ /*
+ * See if data can be sent immediately.
+ */
+ sent = *length;
+ status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Success! */
+ *length = sent;
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * Check that address storage can hold the address parameter.
+ */
+ PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
+
+ /*
+ * Schedule asynchronous send.
+ */
+ ioqueue = key->ioqueue;
+ pj_lock_acquire(ioqueue->lock);
+
+ write_op = (struct write_operation*)op_key;
+ write_op->op = PJ_IOQUEUE_OP_SEND_TO;
+ write_op->buf = NULL;
+ write_op->size = *length;
+ write_op->written = 0;
+ write_op->flags = flags;
+ pj_memcpy(&write_op->rmt_addr, addr, addrlen);
+ write_op->rmt_addrlen = addrlen;
+
+ pj_list_insert_before(&key->write_list, write_op);
+ PJ_FD_SET(key->fd, &ioqueue->wfdset);
+
+ pj_lock_release(ioqueue->lock);
+
+ return PJ_EPENDING;
} #if PJ_HAS_TCP /* * Initiate overlapped accept() operation. */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - /* check parameters. All must be specified! */ - pj_assert(ioqueue && key && new_sock); - - /* Server socket must have no other operation! */ - pj_assert(key->op == 0); - - pj_lock_acquire(ioqueue->lock); - - key->op = PJ_IOQUEUE_OP_ACCEPT; - key->accept_fd = new_sock; - key->rmt_addr = remote; - key->rmt_addrlen = addrlen; - key->local_addr = local; - key->local_addrlen = addrlen; /* use same addr. as rmt_addrlen */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{
+ pj_ioqueue_t *ioqueue;
+ struct accept_operation *accept_op;
+ pj_status_t status;
+ /* check parameters. All must be specified! */ + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* Fast track:
+ * See if there's new connection available immediately.
+ */
+ if (pj_list_empty(&key->accept_list)) {
+ status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
+ if (status == PJ_SUCCESS) {
+ /* Yes! New connection is available! */
+ if (local && addrlen) {
+ status = pj_sock_getsockname(*new_sock, local, addrlen);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(*new_sock);
+ *new_sock = PJ_INVALID_SOCKET;
+ return status;
+ }
+ }
+ return PJ_SUCCESS;
+ } else {
+ /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
+ * the error to caller.
+ */
+ if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
+ return status;
+ }
+ }
+ }
+
+ /*
+ * No connection is available immediately.
+ * Schedule accept() operation to be completed when there is incoming
+ * connection available.
+ */
+ ioqueue = key->ioqueue;
+ accept_op = (struct accept_operation*)op_key;
+ + pj_lock_acquire(ioqueue->lock);
+ + accept_op->op = PJ_IOQUEUE_OP_ACCEPT; + accept_op->accept_fd = new_sock; + accept_op->rmt_addr = remote; + accept_op->addrlen= addrlen; + accept_op->local_addr = local; +
+ pj_list_insert_before(&key->accept_list, accept_op); PJ_FD_SET(key->fd, &ioqueue->rfdset); +
+ pj_lock_release(ioqueue->lock);
- pj_lock_release(ioqueue->lock); return PJ_EPENDING; } @@ -907,37 +1118,37 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, * Initiate overlapped connect() operation (well, it's non-blocking actually, * since there's no overlapped version of connect()). */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ) -{ - pj_status_t rc; +{
+ pj_ioqueue_t *ioqueue; + pj_status_t status; /* check parameters. All must be specified! */ - PJ_ASSERT_RETURN(ioqueue && key && addr && addrlen, PJ_EINVAL); + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); - /* Connecting socket must have no other operation! */ - PJ_ASSERT_RETURN(key->op == 0, PJ_EBUSY); + /* Check if socket has not been marked for connecting */ + if (key->connecting != 0)
+ return PJ_EPENDING; - rc = pj_sock_connect(key->fd, addr, addrlen); - if (rc == PJ_SUCCESS) { + status = pj_sock_connect(key->fd, addr, addrlen); + if (status == PJ_SUCCESS) { /* Connected! */ return PJ_SUCCESS; } else { - if (rc == PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) || - rc == PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) - { - /* Pending! */ + if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { + /* Pending! */
+ ioqueue = key->ioqueue; pj_lock_acquire(ioqueue->lock); - key->op = PJ_IOQUEUE_OP_CONNECT; + key->connecting = PJ_TRUE; PJ_FD_SET(key->fd, &ioqueue->wfdset); PJ_FD_SET(key->fd, &ioqueue->xfdset); pj_lock_release(ioqueue->lock); return PJ_EPENDING; } else { /* Error! */ - return rc; + return status; } } } diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index dbf883a4..afb75c54 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -22,17 +22,28 @@ # include <mswsock.h> #endif - -#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20) +
+/* The address specified in AcceptEx() must be 16 more than the size of
+ * SOCKADDR (source: MSDN).
+ */ +#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) +
+typedef struct generic_overlapped
+{
+ WSAOVERLAPPED overlapped;
+ pj_ioqueue_operation_e operation;
+} generic_overlapped;
/* * OVERLAP structure for send and receive. */ typedef struct ioqueue_overlapped { - WSAOVERLAPPED overlapped; + WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation; - WSABUF wsabuf; + WSABUF wsabuf;
+ pj_sockaddr_in dummy_addr;
+ int dummy_addrlen;
} ioqueue_overlapped; #if PJ_HAS_TCP @@ -41,7 +52,7 @@ typedef struct ioqueue_overlapped */ typedef struct ioqueue_accept_rec { - WSAOVERLAPPED overlapped; + WSAOVERLAPPED overlapped;
pj_ioqueue_operation_e operation; pj_sock_t newsock; pj_sock_t *newsock_ptr; @@ -51,19 +62,29 @@ typedef struct ioqueue_accept_rec char accept_buf[2 * ACCEPT_ADDR_LEN]; } ioqueue_accept_rec; #endif +
+/*
+ * Structure to hold pending operation key.
+ */
+union operation_key
+{
+ generic_overlapped generic;
+ ioqueue_overlapped overlapped;
+#if PJ_HAS_TCP
+ ioqueue_accept_rec accept;
+#endif
+};
/* * Structure for individual socket. */ struct pj_ioqueue_key_t -{ +{
+ pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; - ioqueue_overlapped recv_overlapped; - ioqueue_overlapped send_overlapped; #if PJ_HAS_TCP int connecting; - ioqueue_accept_rec accept_overlapped; #endif pj_ioqueue_callback cb; }; @@ -106,9 +127,14 @@ static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) &local, &locallen, &remote, - &remotelen); - pj_memcpy(accept_overlapped->local, local, locallen); - pj_memcpy(accept_overlapped->remote, remote, locallen); + &remotelen);
+ if (*accept_overlapped->addrlen > locallen) { + pj_memcpy(accept_overlapped->local, local, locallen); + pj_memcpy(accept_overlapped->remote, remote, locallen);
+ } else {
+ pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);
+ pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);
+ } *accept_overlapped->addrlen = locallen; if (accept_overlapped->newsock_ptr) *accept_overlapped->newsock_ptr = accept_overlapped->newsock; @@ -120,7 +146,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) { pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; HANDLE hEvent = ioqueue->connecting_handles[pos]; - unsigned long optval; /* Remove key from array of connecting handles. */ pj_array_erase(ioqueue->connecting_keys, sizeof(key), @@ -143,12 +168,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) CloseHandle(hEvent); } - /* Set socket to blocking again. */ - optval = 0; - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - DWORD dwStatus; - dwStatus = WSAGetLastError(); - } } /* @@ -183,7 +202,8 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, WSAEnumNetworkEvents((pj_sock_t)key->hnd, ioqueue->connecting_handles[pos], &net_events); - *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; + *connect_err =
+ PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); /* Erase socket from pending connect. */ erase_connecting_socket(ioqueue, pos); @@ -194,95 +214,121 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, } #endif - +/*
+ * pj_ioqueue_create()
+ */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, - pj_ioqueue_t **ioqueue) + pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioq; + pj_ioqueue_t *ioqueue; pj_status_t rc; PJ_UNUSED_ARG(max_fd); - PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); - - ioq = pj_pool_zalloc(pool, sizeof(*ioq)); - ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); - if (ioq->iocp == NULL) + PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); +
+ rc = sizeof(union operation_key);
+
+ /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+ + ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); + ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); - rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock); + rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { - CloseHandle(ioq->iocp); + CloseHandle(ioqueue->iocp); return rc; } - ioq->auto_delete_lock = PJ_TRUE; + ioqueue->auto_delete_lock = PJ_TRUE; - *ioqueue = ioq; + *p_ioqueue = ioqueue; - PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq)); + PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS; } - -PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) +
+/*
+ * pj_ioqueue_destroy()
+ */ +PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { unsigned i; PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Destroy events in the pool */ - for (i=0; i<ioque->event_count; ++i) { - CloseHandle(ioque->event_pool[i]); + for (i=0; i<ioqueue->event_count; ++i) { + CloseHandle(ioqueue->event_pool[i]); } - ioque->event_count = 0; - - if (ioque->auto_delete_lock) - pj_lock_destroy(ioque->lock); - - if (CloseHandle(ioque->iocp) == TRUE) - return PJ_SUCCESS; - else - return PJ_RETURN_OS_ERROR(GetLastError()); + ioqueue->event_count = 0; + + if (CloseHandle(ioqueue->iocp) != TRUE) + return PJ_RETURN_OS_ERROR(GetLastError());
+
+ if (ioqueue->auto_delete_lock)
+ pj_lock_destroy(ioqueue->lock);
+
+ return PJ_SUCCESS; } - -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, +
+/*
+ * pj_ioqueue_set_lock()
+ */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, pj_lock_t *lock, pj_bool_t auto_delete ) { - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); + if (ioqueue->auto_delete_lock) { + pj_lock_destroy(ioqueue->lock); } - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; return PJ_SUCCESS; } - +
+/*
+ * pj_ioqueue_register_sock()
+ */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, - pj_sock_t hnd, + pj_ioqueue_t *ioqueue, + pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key ) { HANDLE hioq; - pj_ioqueue_key_t *rec; - - PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL); - - rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - rec->hnd = (HANDLE)hnd; + pj_ioqueue_key_t *rec;
+ u_long value; + int rc;
+ + PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); +
+ /* Build the key for this socket. */ + rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ rec->ioqueue = ioqueue; + rec->hnd = (HANDLE)sock; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); -#if PJ_HAS_TCP - rec->accept_overlapped.newsock = PJ_INVALID_SOCKET; -#endif - hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0); +
+ /* Set socket to nonblocking. */
+ value = 1;
+ rc = ioctlsocket(sock, FIONBIO, &value);
+ if (rc != 0) {
+ return PJ_RETURN_OS_ERROR(WSAGetLastError());
+ }
+
+ /* Associate with IOCP */ + hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { return PJ_RETURN_OS_ERROR(GetLastError()); } @@ -291,58 +337,78 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, return PJ_SUCCESS; } - - -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key ) +/* + * pj_ioqueue_unregister()
+ */ +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + PJ_ASSERT_RETURN(key, PJ_EINVAL); #if PJ_HAS_TCP if (key->connecting) { unsigned pos; + pj_ioqueue_t *ioqueue;
+
+ ioqueue = key->ioqueue;
/* Erase from connecting_handles */ - pj_lock_acquire(ioque->lock); - for (pos=0; pos < ioque->connecting_count; ++pos) { - if (ioque->connecting_keys[pos] == key) { - erase_connecting_socket(ioque, pos); - if (key->accept_overlapped.newsock_ptr) { - /* ??? shouldn't it be newsock instead of newsock_ptr??? */ - closesocket(*key->accept_overlapped.newsock_ptr); - } + pj_lock_acquire(ioqueue->lock); + for (pos=0; pos < ioqueue->connecting_count; ++pos) { + if (ioqueue->connecting_keys[pos] == key) { + erase_connecting_socket(ioqueue, pos); break; } } - pj_lock_release(ioque->lock); - key->connecting = 0; + key->connecting = 0;
+ pj_lock_release(ioqueue->lock); } #endif return PJ_SUCCESS; } - +
+/*
+ * pj_ioqueue_get_user_data()
+ */ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) { PJ_ASSERT_RETURN(key, NULL); return key->user_data; } - -/* +
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data )
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+
+ key->user_data = user_data;
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_poll()
+ * * Poll for events. */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { DWORD dwMsec, dwBytesTransfered, dwKey; - ioqueue_overlapped *ov; + generic_overlapped *pOv; pj_ioqueue_key_t *key; pj_ssize_t size_status; BOOL rc; - PJ_ASSERT_RETURN(ioque, -PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); /* Check the connecting array. */ #if PJ_HAS_TCP - key = check_connecting(ioque, &size_status); + key = check_connecting(ioqueue, &size_status); if (key != NULL) { key->cb.on_connect_complete(key, (int)size_status); return 1; @@ -353,40 +419,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; /* Poll for completion status. */ - rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey, - (OVERLAPPED**)&ov, dwMsec); + rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, + (OVERLAPPED**)&pOv, dwMsec); /* The return value is: * - nonzero if event was dequeued. - * - zero and ov==NULL if no event was dequeued. - * - zero and ov!=NULL if event for failed I/O was dequeued. + * - zero and pOv==NULL if no event was dequeued. + * - zero and pOv!=NULL if event for failed I/O was dequeued. */ - if (ov) { + if (pOv) { /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; - switch (ov->operation) { + switch (pOv->operation) { case PJ_IOQUEUE_OP_READ: case PJ_IOQUEUE_OP_RECV: case PJ_IOQUEUE_OP_RECV_FROM: - key->recv_overlapped.operation = 0; + pOv->operation = 0; if (key->cb.on_read_complete) - key->cb.on_read_complete(key, size_status); + key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status); break; case PJ_IOQUEUE_OP_WRITE: case PJ_IOQUEUE_OP_SEND: case PJ_IOQUEUE_OP_SEND_TO: - key->send_overlapped.operation = 0; + pOv->operation = 0; if (key->cb.on_write_complete) - key->cb.on_write_complete(key, size_status); + key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status); break; #if PJ_HAS_TCP case PJ_IOQUEUE_OP_ACCEPT: /* special case for accept. */ - ioqueue_on_accept_complete((ioqueue_accept_rec*)ov); - if (key->cb.on_accept_complete) - key->cb.on_accept_complete(key, key->accept_overlapped.newsock, - 0); + ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); + if (key->cb.on_accept_complete) {
+ ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; + key->cb.on_accept_complete(key,
+ (pj_ioqueue_op_key_t*)pOv,
+ accept_rec->newsock, + PJ_SUCCESS);
+ } break; case PJ_IOQUEUE_OP_CONNECT: #endif @@ -398,9 +470,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) } if (GetLastError()==WAIT_TIMEOUT) { - /* Check the connecting array. */ + /* Check the connecting array (again). */ #if PJ_HAS_TCP - key = check_connecting(ioque, &size_status); + key = check_connecting(ioqueue, &size_status); if (key != NULL) { key->cb.on_connect_complete(key, (int)size_status); return 1; @@ -412,95 +484,72 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) } /* - * pj_ioqueue_read() - * - * Initiate overlapped ReadFile operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - BOOL rc; - DWORD bytesRead; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this descriptor"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; - - rc = ReadFile(key->hnd, buffer, buflen, &bytesRead, - &key->recv_overlapped.overlapped); - if (rc == FALSE) { - DWORD dwStatus = GetLastError(); - if (dwStatus==ERROR_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* - * This is workaround to a probable bug in Win2000 (probably NT too). - * Even if 'rc' is TRUE, which indicates operation has completed, - * GetQueuedCompletionStatus still will return the key. - * So as work around, we always return PJ_EPENDING here. - */ - return PJ_EPENDING; - } -} - -/* * pj_ioqueue_recv() * * Initiate overlapped WSARecv() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags ) -{ - int rc; - DWORD bytesRead; - DWORD dwFlags = 0; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; - - key->recv_overlapped.wsabuf.buf = buffer; - key->recv_overlapped.wsabuf.len = buflen; - - dwFlags = flags; - - rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, - &bytesRead, &dwFlags, - &key->recv_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } +{
+ /*
+ * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
+ * addrlen here. But unfortunately it generates EINVAL... :-(
+ * -bennylp
+ */
+ int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
} /* @@ -508,136 +557,79 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, * * Initiate overlapped RecvFrom() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen) { - BOOL rc; - DWORD bytesRead; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM; - key->recv_overlapped.wsabuf.buf = buffer; - key->recv_overlapped.wsabuf.len = buflen; - dwFlags = flags; - rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, - &bytesRead, &dwFlags, - addr, addrlen, - &key->recv_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } -} - -/* - * pj_ioqueue_write() - * - * Initiate overlapped WriteFile() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - BOOL rc; - DWORD bytesWritten; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this descriptor"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; - rc = WriteFile(key->hnd, data, datalen, &bytesWritten, - &key->send_overlapped.overlapped); - - if (rc == FALSE) { - DWORD dwStatus = GetLastError(); - if (dwStatus==ERROR_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesWritten; - */ - return PJ_EPENDING; - } + int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
} - /* * pj_ioqueue_send() * * Initiate overlapped Send operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags ) { - int rc; - DWORD bytesWritten; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; - key->send_overlapped.wsabuf.buf = (void*)data; - key->send_overlapped.wsabuf.len = datalen; - dwFlags = flags; - rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, - &bytesWritten, dwFlags, - &key->send_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } + return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); } @@ -646,46 +638,65 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, * * Initiate overlapped SendTo operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen) -{ - BOOL rc; - DWORD bytesSent; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO; - key->send_overlapped.wsabuf.buf = (char*)data; - key->send_overlapped.wsabuf.len = datalen; - dwFlags = flags; - rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, - &bytesSent, dwFlags, addr, - addrlen, &key->send_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - // Must always return pending status. - // See comments on pj_ioqueue_read - // return bytesSent; - return PJ_EPENDING; - } +{
+ int rc;
+ DWORD bytesWritten;
+ DWORD dwFlags;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+
+ dwFlags = flags;
+
+ /*
+ * First try blocking write.
+ */
+ op_key_rec->overlapped.wsabuf.buf = (void*)data;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ NULL, NULL);
+ if (rc == 0) {
+ *length = bytesWritten;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwStatus);
+ }
+ }
+
+ /*
+ * Data can't be sent immediately.
+ * Schedule asynchronous WSASend().
+ */
+ pj_memset(&op_key_rec->overlapped.overlapped, 0,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
+
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING)
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+
+ /* Asynchronous operation successfully submitted. */
+ return PJ_EPENDING; } #if PJ_HAS_TCP @@ -695,59 +706,93 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, * * Initiate overlapped accept() operation. */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) { - BOOL rc; + BOOL rc;
DWORD bytesReceived; - pj_status_t status; + pj_status_t status;
+ union operation_key *op_key_rec;
+ SOCKET sock; PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioqueue); - - if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) { - pj_sock_t sock; - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); - if (status != PJ_SUCCESS) - return status; - - key->accept_overlapped.newsock = sock; - } - key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT; - key->accept_overlapped.addrlen = addrlen; - key->accept_overlapped.local = local; - key->accept_overlapped.remote = remote; - key->accept_overlapped.newsock_ptr = new_sock; - pj_memset(&key->accept_overlapped.overlapped, 0, - sizeof(key->accept_overlapped.overlapped)); - - rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock, - key->accept_overlapped.accept_buf, + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +
+ /*
+ * See if there is a new connection immediately available.
+ */
+ sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
+ if (sock != INVALID_SOCKET) {
+ /* Yes! New socket is available! */
+ int status;
+
+ status = getsockname(sock, local, addrlen);
+ if (status != 0) {
+ DWORD dwError = WSAGetLastError();
+ closesocket(sock);
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+
+ *new_sock = sock;
+ return PJ_SUCCESS;
+
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No connection is immediately available.
+ * Must schedule an asynchronous operation.
+ */
+ op_key_rec = (union operation_key*)op_key->internal__;
+ + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
+ &op_key_rec->accept.newsock); + if (status != PJ_SUCCESS) + return status; +
+ /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
+ * addresses can be obtained with getsockname() and getpeername().
+ */
+ status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&key->hnd, sizeof(SOCKET));
+ /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
+ * So ignore the error status.
+ */
+
+ op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
+ op_key_rec->accept.addrlen = addrlen; + op_key_rec->accept.local = local; + op_key_rec->accept.remote = remote; + op_key_rec->accept.newsock_ptr = new_sock; + pj_memset(&op_key_rec->accept.overlapped, 0, + sizeof(op_key_rec->accept.overlapped)); + + rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, + op_key_rec->accept.accept_buf, 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, &bytesReceived, - &key->accept_overlapped.overlapped); + &op_key_rec->accept.overlapped ); if (rc == TRUE) { - ioqueue_on_accept_complete(&key->accept_overlapped); - if (key->cb.on_accept_complete) - key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0); + ioqueue_on_accept_complete(&op_key_rec->accept); return PJ_SUCCESS; } else { DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else + if (dwStatus!=WSA_IO_PENDING) return PJ_STATUS_FROM_OS(dwStatus); - } + }
+
+ /* Asynchronous Accept() has been submitted. */
+ return PJ_EPENDING; } @@ -757,42 +802,29 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, * Initiate overlapped connect() operation (well, it's non-blocking actually, * since there's no overlapped version of connect()). */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ) { - unsigned long optval = 1; - HANDLE hEvent; + HANDLE hEvent;
+ pj_ioqueue_t *ioqueue; PJ_CHECK_STACK(); - - /* Set socket to non-blocking. */ - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
/* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { DWORD dwStatus; dwStatus = WSAGetLastError(); - if (dwStatus != WSAEWOULDBLOCK) { - /* Permanent error */ + if (dwStatus != WSAEWOULDBLOCK) { return PJ_RETURN_OS_ERROR(dwStatus); - } else { - /* Pending operation. This is what we're looking for. */ } } else { /* Connect has completed immediately! */ - /* Restore to blocking mode. */ - optval = 0; - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } - - key->cb.on_connect_complete(key, 0); return PJ_SUCCESS; } +
+ ioqueue = key->ioqueue;
/* Add to the array of connecting socket to be polled */ pj_lock_acquire(ioqueue->lock); |