summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_winnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c1443
1 files changed, 1443 insertions, 0 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
new file mode 100644
index 0000000..fbe8ab8
--- /dev/null
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -0,0 +1,1443 @@
+/* $Id: ioqueue_winnt.c 3553 2011-05-05 06:14:19Z nanang $ */
+/*
+ * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
+ * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pj/ioqueue.h>
+#include <pj/os.h>
+#include <pj/lock.h>
+#include <pj/pool.h>
+#include <pj/string.h>
+#include <pj/sock.h>
+#include <pj/array.h>
+#include <pj/log.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/compat/socket.h>
+
+
+#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
+# include <winsock2.h>
+#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
+# include <winsock.h>
+#endif
+
+#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
+# include <mswsock.h>
+#endif
+
+
+/* The address specified in AcceptEx() must be 16 more than the size of
+ * SOCKADDR (source: MSDN).
+ */
+#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16)
+
+typedef struct generic_overlapped
+{
+ WSAOVERLAPPED overlapped;
+ pj_ioqueue_operation_e operation;
+} generic_overlapped;
+
+/*
+ * OVERLAPPPED structure for send and receive.
+ */
+typedef struct ioqueue_overlapped
+{
+ WSAOVERLAPPED overlapped;
+ pj_ioqueue_operation_e operation;
+ WSABUF wsabuf;
+ pj_sockaddr_in dummy_addr;
+ int dummy_addrlen;
+} ioqueue_overlapped;
+
+#if PJ_HAS_TCP
+/*
+ * OVERLAP structure for accept.
+ */
+typedef struct ioqueue_accept_rec
+{
+ WSAOVERLAPPED overlapped;
+ pj_ioqueue_operation_e operation;
+ pj_sock_t newsock;
+ pj_sock_t *newsock_ptr;
+ int *addrlen;
+ void *remote;
+ void *local;
+ char accept_buf[2 * ACCEPT_ADDR_LEN];
+} ioqueue_accept_rec;
+#endif
+
+/*
+ * Structure to hold pending operation key.
+ */
+union operation_key
+{
+ generic_overlapped generic;
+ ioqueue_overlapped overlapped;
+#if PJ_HAS_TCP
+ ioqueue_accept_rec accept;
+#endif
+};
+
+/* Type of handle in the key. */
+enum handle_type
+{
+ HND_IS_UNKNOWN,
+ HND_IS_FILE,
+ HND_IS_SOCKET,
+};
+
+enum { POST_QUIT_LEN = 0xFFFFDEADUL };
+
+/*
+ * Structure for individual socket.
+ */
+struct pj_ioqueue_key_t
+{
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+
+ pj_ioqueue_t *ioqueue;
+ HANDLE hnd;
+ void *user_data;
+ enum handle_type hnd_type;
+ pj_ioqueue_callback cb;
+ pj_bool_t allow_concurrent;
+
+#if PJ_HAS_TCP
+ int connecting;
+#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_atomic_t *ref_count;
+ pj_bool_t closing;
+ pj_time_val free_time;
+ pj_mutex_t *mutex;
+#endif
+
+};
+
+/*
+ * IO Queue structure.
+ */
+struct pj_ioqueue_t
+{
+ HANDLE iocp;
+ pj_lock_t *lock;
+ pj_bool_t auto_delete_lock;
+ pj_bool_t default_concurrency;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_ioqueue_key_t active_list;
+ pj_ioqueue_key_t free_list;
+ pj_ioqueue_key_t closing_list;
+#endif
+
+ /* These are to keep track of connecting sockets */
+#if PJ_HAS_TCP
+ unsigned event_count;
+ HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
+ unsigned connecting_count;
+ HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
+ pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
+#endif
+};
+
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Prototype */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue);
+#endif
+
+
+#if PJ_HAS_TCP
+/*
+ * Process the socket when the overlapped accept() completed.
+ */
+static void ioqueue_on_accept_complete(pj_ioqueue_key_t *key,
+ ioqueue_accept_rec *accept_overlapped)
+{
+ struct sockaddr *local;
+ struct sockaddr *remote;
+ int locallen, remotelen;
+ pj_status_t status;
+
+ PJ_CHECK_STACK();
+
+ /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
+ * addresses can be obtained with getsockname() and getpeername().
+ */
+ status = setsockopt(accept_overlapped->newsock, SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&key->hnd,
+ sizeof(SOCKET));
+ /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
+ * So ignore the error status.
+ */
+
+ /* Operation complete immediately. */
+ if (accept_overlapped->addrlen) {
+ GetAcceptExSockaddrs( accept_overlapped->accept_buf,
+ 0,
+ ACCEPT_ADDR_LEN,
+ ACCEPT_ADDR_LEN,
+ &local,
+ &locallen,
+ &remote,
+ &remotelen);
+ if (*accept_overlapped->addrlen >= locallen) {
+ if (accept_overlapped->local)
+ pj_memcpy(accept_overlapped->local, local, locallen);
+ if (accept_overlapped->remote)
+ pj_memcpy(accept_overlapped->remote, remote, locallen);
+ } else {
+ if (accept_overlapped->local)
+ pj_bzero(accept_overlapped->local,
+ *accept_overlapped->addrlen);
+ if (accept_overlapped->remote)
+ pj_bzero(accept_overlapped->remote,
+ *accept_overlapped->addrlen);
+ }
+
+ *accept_overlapped->addrlen = locallen;
+ }
+ if (accept_overlapped->newsock_ptr)
+ *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
+ accept_overlapped->operation = 0;
+}
+
+static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
+{
+ pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
+ HANDLE hEvent = ioqueue->connecting_handles[pos];
+
+ /* Remove key from array of connecting handles. */
+ pj_array_erase(ioqueue->connecting_keys, sizeof(key),
+ ioqueue->connecting_count, pos);
+ pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
+ ioqueue->connecting_count, pos);
+ --ioqueue->connecting_count;
+
+ /* Disassociate the socket from the event. */
+ WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
+
+ /* Put event object to pool. */
+ if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
+ ioqueue->event_pool[ioqueue->event_count++] = hEvent;
+ } else {
+ /* Shouldn't happen. There should be no more pending connections
+ * than max.
+ */
+ pj_assert(0);
+ CloseHandle(hEvent);
+ }
+
+}
+
+/*
+ * Poll for the completion of non-blocking connect().
+ * If there's a completion, the function return the key of the completed
+ * socket, and 'result' argument contains the connect() result. If connect()
+ * succeeded, 'result' will have value zero, otherwise will have the error
+ * code.
+ */
+static int check_connecting( pj_ioqueue_t *ioqueue )
+{
+ if (ioqueue->connecting_count) {
+ int i, count;
+ struct
+ {
+ pj_ioqueue_key_t *key;
+ pj_status_t status;
+ } events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1];
+
+ pj_lock_acquire(ioqueue->lock);
+ for (count=0; count<PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL-1; ++count) {
+ DWORD result;
+
+ result = WaitForMultipleObjects(ioqueue->connecting_count,
+ ioqueue->connecting_handles,
+ FALSE, 0);
+ if (result >= WAIT_OBJECT_0 &&
+ result < WAIT_OBJECT_0+ioqueue->connecting_count)
+ {
+ WSANETWORKEVENTS net_events;
+
+ /* Got completed connect(). */
+ unsigned pos = result - WAIT_OBJECT_0;
+ events[count].key = ioqueue->connecting_keys[pos];
+
+ /* See whether connect has succeeded. */
+ WSAEnumNetworkEvents((pj_sock_t)events[count].key->hnd,
+ ioqueue->connecting_handles[pos],
+ &net_events);
+ events[count].status =
+ PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]);
+
+ /* Erase socket from pending connect. */
+ erase_connecting_socket(ioqueue, pos);
+ } else {
+ /* No more events */
+ break;
+ }
+ }
+ pj_lock_release(ioqueue->lock);
+
+ /* Call callbacks. */
+ for (i=0; i<count; ++i) {
+ if (events[i].key->cb.on_connect_complete) {
+ events[i].key->cb.on_connect_complete(events[i].key,
+ events[i].status);
+ }
+ }
+
+ return count;
+ }
+
+ return 0;
+
+}
+#endif
+
+/*
+ * pj_ioqueue_name()
+ */
+PJ_DEF(const char*) pj_ioqueue_name(void)
+{
+ return "iocp";
+}
+
+/*
+ * pj_ioqueue_create()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
+ pj_size_t max_fd,
+ pj_ioqueue_t **p_ioqueue)
+{
+ pj_ioqueue_t *ioqueue;
+ unsigned i;
+ pj_status_t rc;
+
+ PJ_UNUSED_ARG(max_fd);
+ PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL);
+
+ rc = sizeof(union operation_key);
+
+ /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
+ PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
+ sizeof(union operation_key), PJ_EBUG);
+
+ /* Create IOCP */
+ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
+ ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (ioqueue->iocp == NULL)
+ return PJ_RETURN_OS_ERROR(GetLastError());
+
+ /* Create IOCP mutex */
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
+ if (rc != PJ_SUCCESS) {
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+
+ ioqueue->auto_delete_lock = PJ_TRUE;
+ ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /*
+ * Create and initialize key pools.
+ */
+ pj_list_init(&ioqueue->active_list);
+ pj_list_init(&ioqueue->free_list);
+ pj_list_init(&ioqueue->closing_list);
+
+ /* Preallocate keys according to max_fd setting, and put them
+ * in free_list.
+ */
+ for (i=0; i<max_fd; ++i) {
+ pj_ioqueue_key_t *key;
+
+ key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
+
+ rc = pj_atomic_create(pool, 0, &key->ref_count);
+ if (rc != PJ_SUCCESS) {
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+
+ rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
+ if (rc != PJ_SUCCESS) {
+ pj_atomic_destroy(key->ref_count);
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+#endif
+
+ *p_ioqueue = ioqueue;
+
+ PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_destroy()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
+{
+#if PJ_HAS_TCP
+ unsigned i;
+#endif
+ pj_ioqueue_key_t *key;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_HAS_TCP
+ /* Destroy events in the pool */
+ for (i=0; i<ioqueue->event_count; ++i) {
+ CloseHandle(ioqueue->event_pool[i]);
+ }
+ ioqueue->event_count = 0;
+#endif
+
+ if (CloseHandle(ioqueue->iocp) != TRUE)
+ return PJ_RETURN_OS_ERROR(GetLastError());
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Destroy reference counters */
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+#endif
+
+ if (ioqueue->auto_delete_lock)
+ pj_lock_destroy(ioqueue->lock);
+
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+ ioqueue->default_concurrency = allow;
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_set_lock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
+ pj_lock_t *lock,
+ pj_bool_t auto_delete )
+{
+ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
+
+ if (ioqueue->auto_delete_lock) {
+ pj_lock_destroy(ioqueue->lock);
+ }
+
+ ioqueue->lock = lock;
+ ioqueue->auto_delete_lock = auto_delete;
+
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_register_sock()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+ pj_ioqueue_t *ioqueue,
+ pj_sock_t sock,
+ void *user_data,
+ const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **key )
+{
+ HANDLE hioq;
+ pj_ioqueue_key_t *rec;
+ u_long value;
+ int rc;
+
+ PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
+
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Scan closing list first to release unused keys.
+ * Must do this with lock acquired.
+ */
+ scan_closing_keys(ioqueue);
+
+ /* If safe unregistration is used, then get the key record from
+ * the free list.
+ */
+ if (pj_list_empty(&ioqueue->free_list)) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_ETOOMANY;
+ }
+
+ rec = ioqueue->free_list.next;
+ pj_list_erase(rec);
+
+ /* Set initial reference count to 1 */
+ pj_assert(pj_atomic_get(rec->ref_count) == 0);
+ pj_atomic_inc(rec->ref_count);
+
+ rec->closing = 0;
+
+#else
+ rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+#endif
+
+ /* Build the key for this socket. */
+ rec->ioqueue = ioqueue;
+ rec->hnd = (HANDLE)sock;
+ rec->hnd_type = HND_IS_SOCKET;
+ rec->user_data = user_data;
+ pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
+
+ /* Set concurrency for this handle */
+ rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
+ if (rc != PJ_SUCCESS) {
+ pj_lock_release(ioqueue->lock);
+ return rc;
+ }
+
+#if PJ_HAS_TCP
+ rec->connecting = 0;
+#endif
+
+ /* Set socket to nonblocking. */
+ value = 1;
+ rc = ioctlsocket(sock, FIONBIO, &value);
+ if (rc != 0) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_RETURN_OS_ERROR(WSAGetLastError());
+ }
+
+ /* Associate with IOCP */
+ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
+ if (!hioq) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_RETURN_OS_ERROR(GetLastError());
+ }
+
+ *key = rec;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_list_push_back(&ioqueue->active_list, rec);
+#endif
+
+ pj_lock_release(ioqueue->lock);
+
+ return PJ_SUCCESS;
+}
+
+
+/*
+ * pj_ioqueue_get_user_data()
+ */
+PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
+{
+ PJ_ASSERT_RETURN(key, NULL);
+ return key->user_data;
+}
+
+/*
+ * pj_ioqueue_set_user_data()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
+ void *user_data,
+ void **old_data )
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ if (old_data)
+ *old_data = key->user_data;
+
+ key->user_data = user_data;
+ return PJ_SUCCESS;
+}
+
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Decrement the key's reference counter, and when the counter reach zero,
+ * destroy the key.
+ */
+static void decrement_counter(pj_ioqueue_key_t *key)
+{
+ if (pj_atomic_dec_and_get(key->ref_count) == 0) {
+
+ pj_lock_acquire(key->ioqueue->lock);
+
+ pj_assert(key->closing == 1);
+ pj_gettickcount(&key->free_time);
+ key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
+ pj_time_val_normalize(&key->free_time);
+
+ pj_list_erase(key);
+ pj_list_push_back(&key->ioqueue->closing_list, key);
+
+ pj_lock_release(key->ioqueue->lock);
+ }
+}
+#endif
+
+/*
+ * Poll the I/O Completion Port, execute callback,
+ * and return the key and bytes transfered of the last operation.
+ */
+static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
+ pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
+{
+ DWORD dwBytesTransfered, dwKey;
+ generic_overlapped *pOv;
+ pj_ioqueue_key_t *key;
+ pj_ssize_t size_status = -1;
+ BOOL rcGetQueued;
+
+ /* Poll for completion status. */
+ rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
+ &dwKey, (OVERLAPPED**)&pOv,
+ dwTimeout);
+
+ /* The return value is:
+ * - nonzero if event was dequeued.
+ * - zero and pOv==NULL if no event was dequeued.
+ * - zero and pOv!=NULL if event for failed I/O was dequeued.
+ */
+ if (pOv) {
+ pj_bool_t has_lock;
+
+ /* Event was dequeued for either successfull or failed I/O */
+ key = (pj_ioqueue_key_t*)dwKey;
+ size_status = dwBytesTransfered;
+
+ /* Report to caller regardless */
+ if (p_bytes)
+ *p_bytes = size_status;
+ if (p_key)
+ *p_key = key;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* We shouldn't call callbacks if key is quitting. */
+ if (key->closing)
+ return PJ_TRUE;
+
+ /* If concurrency is disabled, lock the key
+ * (and save the lock status to local var since app may change
+ * concurrency setting while in the callback) */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+
+ /* Now that we get the lock, check again that key is not closing */
+ if (key->closing) {
+ if (has_lock) {
+ pj_mutex_unlock(key->mutex);
+ }
+ return PJ_TRUE;
+ }
+
+ /* Increment reference counter to prevent this key from being
+ * deleted
+ */
+ pj_atomic_inc(key->ref_count);
+#else
+ PJ_UNUSED_ARG(has_lock);
+#endif
+
+ /* Carry out the callback */
+ switch (pOv->operation) {
+ case PJ_IOQUEUE_OP_READ:
+ case PJ_IOQUEUE_OP_RECV:
+ case PJ_IOQUEUE_OP_RECV_FROM:
+ pOv->operation = 0;
+ if (key->cb.on_read_complete)
+ key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status);
+ break;
+ case PJ_IOQUEUE_OP_WRITE:
+ case PJ_IOQUEUE_OP_SEND:
+ case PJ_IOQUEUE_OP_SEND_TO:
+ pOv->operation = 0;
+ if (key->cb.on_write_complete)
+ key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ size_status);
+ break;
+#if PJ_HAS_TCP
+ case PJ_IOQUEUE_OP_ACCEPT:
+ /* special case for accept. */
+ ioqueue_on_accept_complete(key, (ioqueue_accept_rec*)pOv);
+ if (key->cb.on_accept_complete) {
+ ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv;
+ pj_status_t status = PJ_SUCCESS;
+ pj_sock_t newsock;
+
+ newsock = accept_rec->newsock;
+ accept_rec->newsock = PJ_INVALID_SOCKET;
+
+ if (newsock == PJ_INVALID_SOCKET) {
+ int dwError = WSAGetLastError();
+ if (dwError == 0) dwError = OSERR_ENOTCONN;
+ status = PJ_RETURN_OS_ERROR(dwError);
+ }
+
+ key->cb.on_accept_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ newsock, status);
+
+ }
+ break;
+ case PJ_IOQUEUE_OP_CONNECT:
+#endif
+ case PJ_IOQUEUE_OP_NONE:
+ pj_assert(0);
+ break;
+ }
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ decrement_counter(key);
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
+#endif
+
+ return PJ_TRUE;
+ }
+
+ /* No event was queued. */
+ return PJ_FALSE;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
+{
+ unsigned i;
+ pj_bool_t has_lock;
+ enum { RETRY = 10 };
+
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+#if PJ_HAS_TCP
+ if (key->connecting) {
+ unsigned pos;
+ pj_ioqueue_t *ioqueue;
+
+ ioqueue = key->ioqueue;
+
+ /* Erase from connecting_handles */
+ pj_lock_acquire(ioqueue->lock);
+ for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+ if (ioqueue->connecting_keys[pos] == key) {
+ erase_connecting_socket(ioqueue, pos);
+ break;
+ }
+ }
+ key->connecting = 0;
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Mark key as closing before closing handle. */
+ key->closing = 1;
+
+ /* If concurrency is disabled, wait until the key has finished
+ * processing the callback
+ */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+#else
+ PJ_UNUSED_ARG(has_lock);
+#endif
+
+ /* Close handle (the only way to disassociate handle from IOCP).
+ * We also need to close handle to make sure that no further events
+ * will come to the handle.
+ */
+ /* Update 2008/07/18 (http://trac.pjsip.org/repos/ticket/575):
+ * - It seems that CloseHandle() in itself does not actually close
+ * the socket (i.e. it will still appear in "netstat" output). Also
+ * if we only use CloseHandle(), an "Invalid Handle" exception will
+ * be raised in WSACleanup().
+ * - MSDN documentation says that CloseHandle() must be called after
+ * closesocket() call (see
+ * http://msdn.microsoft.com/en-us/library/ms724211(VS.85).aspx).
+ * But turns out that this will raise "Invalid Handle" exception
+ * in debug mode.
+ * So because of this, we replaced CloseHandle() with closesocket()
+ * instead. These was tested on WinXP SP2.
+ */
+ //CloseHandle(key->hnd);
+ pj_sock_close((pj_sock_t)key->hnd);
+
+ /* Reset callbacks */
+ key->cb.on_accept_complete = NULL;
+ key->cb.on_connect_complete = NULL;
+ key->cb.on_read_complete = NULL;
+ key->cb.on_write_complete = NULL;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Even after handle is closed, I suspect that IOCP may still try to
+ * do something with the handle, causing memory corruption when pool
+ * debugging is enabled.
+ *
+ * Forcing context switch seems to have fixed that, but this is quite
+ * an ugly solution..
+ *
+ * Update 2008/02/13:
+ * This should not happen if concurrency is disallowed for the key.
+ * So at least application has a solution for this (i.e. by disallowing
+ * concurrency in the key).
+ */
+ //This will loop forever if unregistration is done on the callback.
+ //Doing this with RETRY I think should solve the IOCP setting the
+ //socket signalled, without causing the deadlock.
+ //while (pj_atomic_get(key->ref_count) != 1)
+ // pj_thread_sleep(0);
+ for (i=0; pj_atomic_get(key->ref_count) != 1 && i<RETRY; ++i)
+ pj_thread_sleep(0);
+
+ /* Decrement reference counter to destroy the key. */
+ decrement_counter(key);
+
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
+#endif
+
+ return PJ_SUCCESS;
+}
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan the closing list, and put pending closing keys to free list.
+ * Must do this with ioqueue mutex held.
+ */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+ if (!pj_list_empty(&ioqueue->closing_list)) {
+ pj_time_val now;
+ pj_ioqueue_key_t *key;
+
+ pj_gettickcount(&now);
+
+ /* Move closing keys to free list when they've finished the closing
+ * idle time.
+ */
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = key->next;
+
+ pj_assert(key->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+ pj_list_erase(key);
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+ key = next;
+ }
+ }
+}
+#endif
+
+/*
+ * pj_ioqueue_poll()
+ *
+ * Poll for events.
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+{
+ DWORD dwMsec;
+#if PJ_HAS_TCP
+ int connect_count = 0;
+#endif
+ int event_count = 0;
+
+ PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
+
+ /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
+ dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+
+ /* Poll for completion status. */
+ event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+#if PJ_HAS_TCP
+ /* Check the connecting array, only when there's no activity. */
+ if (event_count == 0) {
+ connect_count = check_connecting(ioqueue);
+ if (connect_count > 0)
+ event_count += connect_count;
+ }
+#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check the closing keys only when there's no activity and when there are
+ * pending closing keys.
+ */
+ if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
+ pj_lock_acquire(ioqueue->lock);
+ scan_closing_keys(ioqueue);
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
+
+ /* Return number of events. */
+ return event_count;
+}
+
+/*
+ * pj_ioqueue_recv()
+ *
+ * Initiate overlapped WSARecv() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ pj_uint32_t flags )
+{
+ /*
+ * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
+ * addrlen here. But unfortunately it generates EINVAL... :-(
+ * -bennylp
+ */
+ int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+ }
+
+ dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_bzero( &op_key_rec->overlapped.overlapped,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_recvfrom()
+ *
+ * Initiate overlapped RecvFrom() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ void *buffer,
+ pj_ssize_t *length,
+ pj_uint32_t flags,
+ pj_sockaddr_t *addr,
+ int *addrlen)
+{
+ int rc;
+ DWORD bytesRead;
+ DWORD dwFlags = 0;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+ op_key_rec->overlapped.wsabuf.buf = buffer;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ /* Try non-overlapped received first to see if data is
+ * immediately available.
+ */
+ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
+ if (rc == 0) {
+ *length = bytesRead;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+ }
+
+ dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /*
+ * No immediate data available.
+ * Register overlapped Recv() operation.
+ */
+ pj_bzero( &op_key_rec->overlapped.overlapped,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
+
+ rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesRead, &dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING) {
+ *length = -1;
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Pending operation has been scheduled. */
+ return PJ_EPENDING;
+}
+
+/*
+ * pj_ioqueue_send()
+ *
+ * Initiate overlapped Send operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ pj_uint32_t flags )
+{
+ return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0);
+}
+
+
+/*
+ * pj_ioqueue_sendto()
+ *
+ * Initiate overlapped SendTo operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ const void *data,
+ pj_ssize_t *length,
+ pj_uint32_t flags,
+ const pj_sockaddr_t *addr,
+ int addrlen)
+{
+ int rc;
+ DWORD bytesWritten;
+ DWORD dwFlags;
+ union operation_key *op_key_rec;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
+ op_key_rec = (union operation_key*)op_key->internal__;
+
+ /*
+ * First try blocking write.
+ */
+ op_key_rec->overlapped.wsabuf.buf = (void*)data;
+ op_key_rec->overlapped.wsabuf.len = *length;
+
+ dwFlags = flags;
+
+ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ NULL, NULL);
+ if (rc == 0) {
+ *length = bytesWritten;
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus != WSAEWOULDBLOCK) {
+ *length = -1;
+ return PJ_RETURN_OS_ERROR(dwStatus);
+ }
+ }
+ }
+
+ dwFlags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
+
+ /*
+ * Data can't be sent immediately.
+ * Schedule asynchronous WSASend().
+ */
+ pj_bzero( &op_key_rec->overlapped.overlapped,
+ sizeof(op_key_rec->overlapped.overlapped));
+ op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
+
+ rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
+ &bytesWritten, dwFlags, addr, addrlen,
+ &op_key_rec->overlapped.overlapped, NULL);
+ if (rc == SOCKET_ERROR) {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING)
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+
+ /* Asynchronous operation successfully submitted. */
+ return PJ_EPENDING;
+}
+
+#if PJ_HAS_TCP
+
+/*
+ * pj_ioqueue_accept()
+ *
+ * Initiate overlapped accept() operation.
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t *new_sock,
+ pj_sockaddr_t *local,
+ pj_sockaddr_t *remote,
+ int *addrlen)
+{
+ BOOL rc;
+ DWORD bytesReceived;
+ pj_status_t status;
+ union operation_key *op_key_rec;
+ SOCKET sock;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
+ /*
+ * See if there is a new connection immediately available.
+ */
+ sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
+ if (sock != INVALID_SOCKET) {
+ /* Yes! New socket is available! */
+ if (local && addrlen) {
+ int status;
+
+ /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
+ * addresses can be obtained with getsockname() and getpeername().
+ */
+ status = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
+ (char*)&key->hnd, sizeof(SOCKET));
+ /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
+ * So ignore the error status.
+ */
+
+ status = getsockname(sock, local, addrlen);
+ if (status != 0) {
+ DWORD dwError = WSAGetLastError();
+ closesocket(sock);
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ *new_sock = sock;
+ return PJ_SUCCESS;
+
+ } else {
+ DWORD dwError = WSAGetLastError();
+ if (dwError != WSAEWOULDBLOCK) {
+ return PJ_RETURN_OS_ERROR(dwError);
+ }
+ }
+
+ /*
+ * No connection is immediately available.
+ * Must schedule an asynchronous operation.
+ */
+ op_key_rec = (union operation_key*)op_key->internal__;
+
+ status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
+ &op_key_rec->accept.newsock);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
+ op_key_rec->accept.addrlen = addrlen;
+ op_key_rec->accept.local = local;
+ op_key_rec->accept.remote = remote;
+ op_key_rec->accept.newsock_ptr = new_sock;
+ pj_bzero( &op_key_rec->accept.overlapped,
+ sizeof(op_key_rec->accept.overlapped));
+
+ rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock,
+ op_key_rec->accept.accept_buf,
+ 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
+ &bytesReceived,
+ &op_key_rec->accept.overlapped );
+
+ if (rc == TRUE) {
+ ioqueue_on_accept_complete(key, &op_key_rec->accept);
+ return PJ_SUCCESS;
+ } else {
+ DWORD dwStatus = WSAGetLastError();
+ if (dwStatus!=WSA_IO_PENDING)
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+
+ /* Asynchronous Accept() has been submitted. */
+ return PJ_EPENDING;
+}
+
+
+/*
+ * pj_ioqueue_connect()
+ *
+ * Initiate overlapped connect() operation (well, it's non-blocking actually,
+ * since there's no overlapped version of connect()).
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
+ const pj_sockaddr_t *addr,
+ int addrlen )
+{
+ HANDLE hEvent;
+ pj_ioqueue_t *ioqueue;
+
+ PJ_CHECK_STACK();
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
+ /* Initiate connect() */
+ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
+ DWORD dwStatus;
+ dwStatus = WSAGetLastError();
+ if (dwStatus != WSAEWOULDBLOCK) {
+ return PJ_RETURN_OS_ERROR(dwStatus);
+ }
+ } else {
+ /* Connect has completed immediately! */
+ return PJ_SUCCESS;
+ }
+
+ ioqueue = key->ioqueue;
+
+ /* Add to the array of connecting socket to be polled */
+ pj_lock_acquire(ioqueue->lock);
+
+ if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_ETOOMANYCONN;
+ }
+
+ /* Get or create event object. */
+ if (ioqueue->event_count) {
+ hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
+ --ioqueue->event_count;
+ } else {
+ hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (hEvent == NULL) {
+ DWORD dwStatus = GetLastError();
+ pj_lock_release(ioqueue->lock);
+ return PJ_STATUS_FROM_OS(dwStatus);
+ }
+ }
+
+ /* Mark key as connecting.
+ * We can't use array index since key can be removed dynamically.
+ */
+ key->connecting = 1;
+
+ /* Associate socket events to the event object. */
+ if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
+ CloseHandle(hEvent);
+ pj_lock_release(ioqueue->lock);
+ return PJ_RETURN_OS_ERROR(WSAGetLastError());
+ }
+
+ /* Add to array. */
+ ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
+ ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
+ ioqueue->connecting_count++;
+
+ pj_lock_release(ioqueue->lock);
+
+ return PJ_EPENDING;
+}
+#endif /* #if PJ_HAS_TCP */
+
+
+PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
+ pj_size_t size )
+{
+ pj_bzero(op_key, size);
+}
+
+PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key )
+{
+ BOOL rc;
+ DWORD bytesTransfered;
+
+ rc = GetOverlappedResult( key->hnd, (LPOVERLAPPED)op_key,
+ &bytesTransfered, FALSE );
+
+ if (rc == FALSE) {
+ return GetLastError()==ERROR_IO_INCOMPLETE;
+ }
+
+ return FALSE;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_status )
+{
+ BOOL rc;
+
+ rc = PostQueuedCompletionStatus(key->ioqueue->iocp, bytes_status,
+ (long)key, (OVERLAPPED*)op_key );
+ if (rc == FALSE) {
+ return PJ_RETURN_OS_ERROR(GetLastError());
+ }
+
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+ * disabled.
+ */
+ PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+ key->allow_concurrent = allow;
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_lock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_unlock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+