diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 1704 |
1 files changed, 852 insertions, 852 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 93116c9d..2a8fd7e8 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -1,852 +1,852 @@ -/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_winnt.c 11 10/29/05 11:31a Bennylp $ */
-/* $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_winnt.c $
- *
- * 11 10/29/05 11:31a Bennylp
- * Changed accept and lock.
- *
- * 10 10/14/05 12:26a Bennylp
- * Finished error code framework, some fixes in ioqueue, etc. Pretty
- * major.
- *
- * 9 9/17/05 10:37a Bennylp
- * Major reorganization towards version 0.3.
- *
- */
-#include <pj/ioqueue.h>
-#include <pj/os.h>
-#include <pj/lock.h>
-#include <pj/pool.h>
-#include <pj/string.h>
-#include <pj/sock.h>
-#include <pj/array.h>
-#include <pj/log.h>
-#include <pj/assert.h>
-#include <pj/errno.h>
-
-
-#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0
-# include <winsock2.h>
-#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0
-# include <winsock.h>
-#endif
-
-#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0
-# include <mswsock.h>
-#endif
-
-
-#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20)
-
-/*
- * OVERLAP structure for send and receive.
- */
-typedef struct ioqueue_overlapped
-{
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
- WSABUF wsabuf;
-} ioqueue_overlapped;
-
-#if PJ_HAS_TCP
-/*
- * OVERLAP structure for accept.
- */
-typedef struct ioqueue_accept_rec
-{
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
- pj_sock_t newsock;
- pj_sock_t *newsock_ptr;
- int *addrlen;
- void *remote;
- void *local;
- char accept_buf[2 * ACCEPT_ADDR_LEN];
-} ioqueue_accept_rec;
-#endif
-
-/*
- * Structure for individual socket.
- */
-struct pj_ioqueue_key_t
-{
- HANDLE hnd;
- void *user_data;
- ioqueue_overlapped recv_overlapped;
- ioqueue_overlapped send_overlapped;
-#if PJ_HAS_TCP
- int connecting;
- ioqueue_accept_rec accept_overlapped;
-#endif
- pj_ioqueue_callback cb;
-};
-
-/*
- * IO Queue structure.
- */
-struct pj_ioqueue_t
-{
- HANDLE iocp;
- pj_lock_t *lock;
- pj_bool_t auto_delete_lock;
- unsigned event_count;
- HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
-#if PJ_HAS_TCP
- unsigned connecting_count;
- HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
- pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
-#endif
-};
-
-
-#if PJ_HAS_TCP
-/*
- * Process the socket when the overlapped accept() completed.
- */
-static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped)
-{
- struct sockaddr *local;
- struct sockaddr *remote;
- int locallen, remotelen;
-
- PJ_CHECK_STACK();
-
- /* Operation complete immediately. */
- GetAcceptExSockaddrs( accept_overlapped->accept_buf,
- 0,
- ACCEPT_ADDR_LEN,
- ACCEPT_ADDR_LEN,
- &local,
- &locallen,
- &remote,
- &remotelen);
- pj_memcpy(accept_overlapped->local, local, locallen);
- pj_memcpy(accept_overlapped->remote, remote, locallen);
- *accept_overlapped->addrlen = locallen;
- if (accept_overlapped->newsock_ptr)
- *accept_overlapped->newsock_ptr = accept_overlapped->newsock;
- accept_overlapped->operation = 0;
- accept_overlapped->newsock = PJ_INVALID_SOCKET;
-}
-
-static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos)
-{
- pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos];
- HANDLE hEvent = ioqueue->connecting_handles[pos];
- unsigned long optval;
-
- /* Remove key from array of connecting handles. */
- pj_array_erase(ioqueue->connecting_keys, sizeof(key),
- ioqueue->connecting_count, pos);
- pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE),
- ioqueue->connecting_count, pos);
- --ioqueue->connecting_count;
-
- /* Disassociate the socket from the event. */
- WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0);
-
- /* Put event object to pool. */
- if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) {
- ioqueue->event_pool[ioqueue->event_count++] = hEvent;
- } else {
- /* Shouldn't happen. There should be no more pending connections
- * than max.
- */
- pj_assert(0);
- CloseHandle(hEvent);
- }
-
- /* Set socket to blocking again. */
- optval = 0;
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- DWORD dwStatus;
- dwStatus = WSAGetLastError();
- }
-}
-
-/*
- * Poll for the completion of non-blocking connect().
- * If there's a completion, the function return the key of the completed
- * socket, and 'result' argument contains the connect() result. If connect()
- * succeeded, 'result' will have value zero, otherwise will have the error
- * code.
- */
-static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue,
- pj_ssize_t *connect_err )
-{
- pj_ioqueue_key_t *key = NULL;
-
- if (ioqueue->connecting_count) {
- DWORD result;
-
- pj_lock_acquire(ioqueue->lock);
- result = WaitForMultipleObjects(ioqueue->connecting_count,
- ioqueue->connecting_handles,
- FALSE, 0);
- if (result >= WAIT_OBJECT_0 &&
- result < WAIT_OBJECT_0+ioqueue->connecting_count)
- {
- WSANETWORKEVENTS net_events;
-
- /* Got completed connect(). */
- unsigned pos = result - WAIT_OBJECT_0;
- key = ioqueue->connecting_keys[pos];
-
- /* See whether connect has succeeded. */
- WSAEnumNetworkEvents((pj_sock_t)key->hnd,
- ioqueue->connecting_handles[pos],
- &net_events);
- *connect_err = net_events.iErrorCode[FD_CONNECT_BIT];
-
- /* Erase socket from pending connect. */
- erase_connecting_socket(ioqueue, pos);
- }
- pj_lock_release(ioqueue->lock);
- }
- return key;
-}
-#endif
-
-
-PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
- pj_size_t max_fd,
- int max_threads,
- pj_ioqueue_t **ioqueue)
-{
- pj_ioqueue_t *ioq;
- pj_status_t rc;
-
- PJ_UNUSED_ARG(max_fd);
- PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL);
-
- ioq = pj_pool_zalloc(pool, sizeof(*ioq));
- ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads);
- if (ioq->iocp == NULL)
- return PJ_RETURN_OS_ERROR(GetLastError());
-
- rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock);
- if (rc != PJ_SUCCESS) {
- CloseHandle(ioq->iocp);
- return rc;
- }
-
- ioq->auto_delete_lock = PJ_TRUE;
-
- *ioqueue = ioq;
-
- PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq));
- return PJ_SUCCESS;
-}
-
-PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque )
-{
- unsigned i;
-
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(ioque, PJ_EINVAL);
-
- /* Destroy events in the pool */
- for (i=0; i<ioque->event_count; ++i) {
- CloseHandle(ioque->event_pool[i]);
- }
- ioque->event_count = 0;
-
- if (ioque->auto_delete_lock)
- pj_lock_destroy(ioque->lock);
-
- if (CloseHandle(ioque->iocp) == TRUE)
- return PJ_SUCCESS;
- else
- return PJ_RETURN_OS_ERROR(GetLastError());
-}
-
-PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
- pj_lock_t *lock,
- pj_bool_t auto_delete )
-{
- PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL);
-
- if (ioque->auto_delete_lock) {
- pj_lock_destroy(ioque->lock);
- }
-
- ioque->lock = lock;
- ioque->auto_delete_lock = auto_delete;
-
- return PJ_SUCCESS;
-}
-
-PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
- pj_ioqueue_t *ioque,
- pj_sock_t hnd,
- void *user_data,
- const pj_ioqueue_callback *cb,
- pj_ioqueue_key_t **key )
-{
- HANDLE hioq;
- pj_ioqueue_key_t *rec;
-
- PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL);
-
- rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rec->hnd = (HANDLE)hnd;
- rec->user_data = user_data;
- pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
-#if PJ_HAS_TCP
- rec->accept_overlapped.newsock = PJ_INVALID_SOCKET;
-#endif
- hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0);
- if (!hioq) {
- return PJ_RETURN_OS_ERROR(GetLastError());
- }
-
- *key = rec;
- return PJ_SUCCESS;
-}
-
-
-
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL);
-
-#if PJ_HAS_TCP
- if (key->connecting) {
- unsigned pos;
-
- /* Erase from connecting_handles */
- pj_lock_acquire(ioque->lock);
- for (pos=0; pos < ioque->connecting_count; ++pos) {
- if (ioque->connecting_keys[pos] == key) {
- erase_connecting_socket(ioque, pos);
- if (key->accept_overlapped.newsock_ptr) {
- /* ??? shouldn't it be newsock instead of newsock_ptr??? */
- closesocket(*key->accept_overlapped.newsock_ptr);
- }
- break;
- }
- }
- pj_lock_release(ioque->lock);
- key->connecting = 0;
- }
-#endif
- return PJ_SUCCESS;
-}
-
-PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key, NULL);
- return key->user_data;
-}
-
-/*
- * Poll for events.
- */
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout)
-{
- DWORD dwMsec, dwBytesTransfered, dwKey;
- ioqueue_overlapped *ov;
- pj_ioqueue_key_t *key;
- pj_ssize_t size_status;
- BOOL rc;
-
- PJ_ASSERT_RETURN(ioque, -PJ_EINVAL);
-
- /* Check the connecting array. */
-#if PJ_HAS_TCP
- key = check_connecting(ioque, &size_status);
- if (key != NULL) {
- key->cb.on_connect_complete(key, (int)size_status);
- return 1;
- }
-#endif
-
- /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
- dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
-
- /* Poll for completion status. */
- rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey,
- (OVERLAPPED**)&ov, dwMsec);
-
- /* The return value is:
- * - nonzero if event was dequeued.
- * - zero and ov==NULL if no event was dequeued.
- * - zero and ov!=NULL if event for failed I/O was dequeued.
- */
- if (ov) {
- /* Event was dequeued for either successfull or failed I/O */
- key = (pj_ioqueue_key_t*)dwKey;
- size_status = dwBytesTransfered;
- switch (ov->operation) {
- case PJ_IOQUEUE_OP_READ:
- case PJ_IOQUEUE_OP_RECV:
- case PJ_IOQUEUE_OP_RECV_FROM:
- key->recv_overlapped.operation = 0;
- if (key->cb.on_read_complete)
- key->cb.on_read_complete(key, size_status);
- break;
- case PJ_IOQUEUE_OP_WRITE:
- case PJ_IOQUEUE_OP_SEND:
- case PJ_IOQUEUE_OP_SEND_TO:
- key->send_overlapped.operation = 0;
- if (key->cb.on_write_complete)
- key->cb.on_write_complete(key, size_status);
- break;
-#if PJ_HAS_TCP
- case PJ_IOQUEUE_OP_ACCEPT:
- /* special case for accept. */
- ioqueue_on_accept_complete((ioqueue_accept_rec*)ov);
- if (key->cb.on_accept_complete)
- key->cb.on_accept_complete(key, key->accept_overlapped.newsock,
- 0);
- break;
- case PJ_IOQUEUE_OP_CONNECT:
-#endif
- case PJ_IOQUEUE_OP_NONE:
- pj_assert(0);
- break;
- }
- return 1;
- }
-
- if (GetLastError()==WAIT_TIMEOUT) {
- /* Check the connecting array. */
-#if PJ_HAS_TCP
- key = check_connecting(ioque, &size_status);
- if (key != NULL) {
- key->cb.on_connect_complete(key, (int)size_status);
- return 1;
- }
-#endif
- return 0;
- }
- return -1;
-}
-
-/*
- * pj_ioqueue_read()
- *
- * Initiate overlapped ReadFile operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen)
-{
- BOOL rc;
- DWORD bytesRead;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this descriptor");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
- rc = ReadFile(key->hnd, buffer, buflen, &bytesRead,
- &key->recv_overlapped.overlapped);
- if (rc == FALSE) {
- DWORD dwStatus = GetLastError();
- if (dwStatus==ERROR_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /*
- * This is workaround to a probable bug in Win2000 (probably NT too).
- * Even if 'rc' is TRUE, which indicates operation has completed,
- * GetQueuedCompletionStatus still will return the key.
- * So as work around, we always return PJ_EPENDING here.
- */
- return PJ_EPENDING;
- }
-}
-
-/*
- * pj_ioqueue_recv()
- *
- * Initiate overlapped WSARecv() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen,
- unsigned flags )
-{
- int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ;
-
- key->recv_overlapped.wsabuf.buf = buffer;
- key->recv_overlapped.wsabuf.len = buflen;
-
- dwFlags = flags;
-
- rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- &key->recv_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
-}
-
-/*
- * pj_ioqueue_recvfrom()
- *
- * Initiate overlapped RecvFrom() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- void *buffer,
- pj_size_t buflen,
- unsigned flags,
- pj_sockaddr_t *addr,
- int *addrlen)
-{
- BOOL rc;
- DWORD bytesRead;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped));
- key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM;
- key->recv_overlapped.wsabuf.buf = buffer;
- key->recv_overlapped.wsabuf.len = buflen;
- dwFlags = flags;
- rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- addr, addrlen,
- &key->recv_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
-}
-
-/*
- * pj_ioqueue_write()
- *
- * Initiate overlapped WriteFile() operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen)
-{
- BOOL rc;
- DWORD bytesWritten;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this descriptor");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
- rc = WriteFile(key->hnd, data, datalen, &bytesWritten,
- &key->send_overlapped.overlapped);
-
- if (rc == FALSE) {
- DWORD dwStatus = GetLastError();
- if (dwStatus==ERROR_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesWritten;
- */
- return PJ_EPENDING;
- }
-}
-
-
-/*
- * pj_ioqueue_send()
- *
- * Initiate overlapped Send operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen,
- unsigned flags )
-{
- int rc;
- DWORD bytesWritten;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE;
- key->send_overlapped.wsabuf.buf = (void*)data;
- key->send_overlapped.wsabuf.len = datalen;
- dwFlags = flags;
- rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,
- &bytesWritten, dwFlags,
- &key->send_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- /* Must always return pending status.
- * See comments on pj_ioqueue_read
- * return bytesRead;
- */
- return PJ_EPENDING;
- }
-}
-
-
-/*
- * pj_ioqueue_sendto()
- *
- * Initiate overlapped SendTo operation.
- */
-PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque,
- pj_ioqueue_key_t *key,
- const void *data,
- pj_size_t datalen,
- unsigned flags,
- const pj_sockaddr_t *addr,
- int addrlen)
-{
- BOOL rc;
- DWORD bytesSent;
- DWORD dwFlags;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioque);
-
- if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped));
- key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO;
- key->send_overlapped.wsabuf.buf = (char*)data;
- key->send_overlapped.wsabuf.len = datalen;
- dwFlags = flags;
- rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1,
- &bytesSent, dwFlags, addr,
- addrlen, &key->send_overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- } else {
- // Must always return pending status.
- // See comments on pj_ioqueue_read
- // return bytesSent;
- return PJ_EPENDING;
- }
-}
-
-#if PJ_HAS_TCP
-
-/*
- * pj_ioqueue_accept()
- *
- * Initiate overlapped accept() operation.
- */
-PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- pj_sock_t *new_sock,
- pj_sockaddr_t *local,
- pj_sockaddr_t *remote,
- int *addrlen)
-{
- BOOL rc;
- DWORD bytesReceived;
- pj_status_t status;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(ioqueue);
-
- if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) {
- pj_assert(!"Operation already pending for this socket");
- return PJ_EBUSY;
- }
-
- if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) {
- pj_sock_t sock;
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock);
- if (status != PJ_SUCCESS)
- return status;
-
- key->accept_overlapped.newsock = sock;
- }
- key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT;
- key->accept_overlapped.addrlen = addrlen;
- key->accept_overlapped.local = local;
- key->accept_overlapped.remote = remote;
- key->accept_overlapped.newsock_ptr = new_sock;
- pj_memset(&key->accept_overlapped.overlapped, 0,
- sizeof(key->accept_overlapped.overlapped));
-
- rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock,
- key->accept_overlapped.accept_buf,
- 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN,
- &bytesReceived,
- &key->accept_overlapped.overlapped);
-
- if (rc == TRUE) {
- ioqueue_on_accept_complete(&key->accept_overlapped);
- if (key->cb.on_accept_complete)
- key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0);
- return PJ_SUCCESS;
- } else {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus==WSA_IO_PENDING)
- return PJ_EPENDING;
- else
- return PJ_STATUS_FROM_OS(dwStatus);
- }
-}
-
-
-/*
- * pj_ioqueue_connect()
- *
- * Initiate overlapped connect() operation (well, it's non-blocking actually,
- * since there's no overlapped version of connect()).
- */
-PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *key,
- const pj_sockaddr_t *addr,
- int addrlen )
-{
- unsigned long optval = 1;
- HANDLE hEvent;
-
- PJ_CHECK_STACK();
-
- /* Set socket to non-blocking. */
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
-
- /* Initiate connect() */
- if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
- DWORD dwStatus;
- dwStatus = WSAGetLastError();
- if (dwStatus != WSAEWOULDBLOCK) {
- /* Permanent error */
- return PJ_RETURN_OS_ERROR(dwStatus);
- } else {
- /* Pending operation. This is what we're looking for. */
- }
- } else {
- /* Connect has completed immediately! */
- /* Restore to blocking mode. */
- optval = 0;
- if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) {
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
-
- key->cb.on_connect_complete(key, 0);
- return PJ_SUCCESS;
- }
-
- /* Add to the array of connecting socket to be polled */
- pj_lock_acquire(ioqueue->lock);
-
- if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) {
- pj_lock_release(ioqueue->lock);
- return PJ_ETOOMANYCONN;
- }
-
- /* Get or create event object. */
- if (ioqueue->event_count) {
- hEvent = ioqueue->event_pool[ioqueue->event_count - 1];
- --ioqueue->event_count;
- } else {
- hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- if (hEvent == NULL) {
- DWORD dwStatus = GetLastError();
- pj_lock_release(ioqueue->lock);
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
-
- /* Mark key as connecting.
- * We can't use array index since key can be removed dynamically.
- */
- key->connecting = 1;
-
- /* Associate socket events to the event object. */
- if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) {
- CloseHandle(hEvent);
- pj_lock_release(ioqueue->lock);
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
-
- /* Add to array. */
- ioqueue->connecting_keys[ ioqueue->connecting_count ] = key;
- ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent;
- ioqueue->connecting_count++;
-
- pj_lock_release(ioqueue->lock);
-
- return PJ_EPENDING;
-}
-#endif /* #if PJ_HAS_TCP */
-
+/* $Header: /pjproject-0.3/pjlib/src/pj/ioqueue_winnt.c 11 10/29/05 11:31a Bennylp $ */ +/* $Log: /pjproject-0.3/pjlib/src/pj/ioqueue_winnt.c $ + * + * 11 10/29/05 11:31a Bennylp + * Changed accept and lock. + * + * 10 10/14/05 12:26a Bennylp + * Finished error code framework, some fixes in ioqueue, etc. Pretty + * major. + * + * 9 9/17/05 10:37a Bennylp + * Major reorganization towards version 0.3. + * + */ +#include <pj/ioqueue.h> +#include <pj/os.h> +#include <pj/lock.h> +#include <pj/pool.h> +#include <pj/string.h> +#include <pj/sock.h> +#include <pj/array.h> +#include <pj/log.h> +#include <pj/assert.h> +#include <pj/errno.h> + + +#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0 +# include <winsock2.h> +#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0 +# include <winsock.h> +#endif + +#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0 +# include <mswsock.h> +#endif + + +#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20) + +/* + * OVERLAP structure for send and receive. + */ +typedef struct ioqueue_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; + WSABUF wsabuf; +} ioqueue_overlapped; + +#if PJ_HAS_TCP +/* + * OVERLAP structure for accept. + */ +typedef struct ioqueue_accept_rec +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; + pj_sock_t newsock; + pj_sock_t *newsock_ptr; + int *addrlen; + void *remote; + void *local; + char accept_buf[2 * ACCEPT_ADDR_LEN]; +} ioqueue_accept_rec; +#endif + +/* + * Structure for individual socket. + */ +struct pj_ioqueue_key_t +{ + HANDLE hnd; + void *user_data; + ioqueue_overlapped recv_overlapped; + ioqueue_overlapped send_overlapped; +#if PJ_HAS_TCP + int connecting; + ioqueue_accept_rec accept_overlapped; +#endif + pj_ioqueue_callback cb; +}; + +/* + * IO Queue structure. + */ +struct pj_ioqueue_t +{ + HANDLE iocp; + pj_lock_t *lock; + pj_bool_t auto_delete_lock; + unsigned event_count; + HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; +#if PJ_HAS_TCP + unsigned connecting_count; + HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; + pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; +#endif +}; + + +#if PJ_HAS_TCP +/* + * Process the socket when the overlapped accept() completed. + */ +static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) +{ + struct sockaddr *local; + struct sockaddr *remote; + int locallen, remotelen; + + PJ_CHECK_STACK(); + + /* Operation complete immediately. */ + GetAcceptExSockaddrs( accept_overlapped->accept_buf, + 0, + ACCEPT_ADDR_LEN, + ACCEPT_ADDR_LEN, + &local, + &locallen, + &remote, + &remotelen); + pj_memcpy(accept_overlapped->local, local, locallen); + pj_memcpy(accept_overlapped->remote, remote, locallen); + *accept_overlapped->addrlen = locallen; + if (accept_overlapped->newsock_ptr) + *accept_overlapped->newsock_ptr = accept_overlapped->newsock; + accept_overlapped->operation = 0; + accept_overlapped->newsock = PJ_INVALID_SOCKET; +} + +static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) +{ + pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; + HANDLE hEvent = ioqueue->connecting_handles[pos]; + unsigned long optval; + + /* Remove key from array of connecting handles. */ + pj_array_erase(ioqueue->connecting_keys, sizeof(key), + ioqueue->connecting_count, pos); + pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE), + ioqueue->connecting_count, pos); + --ioqueue->connecting_count; + + /* Disassociate the socket from the event. */ + WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0); + + /* Put event object to pool. */ + if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) { + ioqueue->event_pool[ioqueue->event_count++] = hEvent; + } else { + /* Shouldn't happen. There should be no more pending connections + * than max. + */ + pj_assert(0); + CloseHandle(hEvent); + } + + /* Set socket to blocking again. */ + optval = 0; + if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { + DWORD dwStatus; + dwStatus = WSAGetLastError(); + } +} + +/* + * Poll for the completion of non-blocking connect(). + * If there's a completion, the function return the key of the completed + * socket, and 'result' argument contains the connect() result. If connect() + * succeeded, 'result' will have value zero, otherwise will have the error + * code. + */ +static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, + pj_ssize_t *connect_err ) +{ + pj_ioqueue_key_t *key = NULL; + + if (ioqueue->connecting_count) { + DWORD result; + + pj_lock_acquire(ioqueue->lock); + result = WaitForMultipleObjects(ioqueue->connecting_count, + ioqueue->connecting_handles, + FALSE, 0); + if (result >= WAIT_OBJECT_0 && + result < WAIT_OBJECT_0+ioqueue->connecting_count) + { + WSANETWORKEVENTS net_events; + + /* Got completed connect(). */ + unsigned pos = result - WAIT_OBJECT_0; + key = ioqueue->connecting_keys[pos]; + + /* See whether connect has succeeded. */ + WSAEnumNetworkEvents((pj_sock_t)key->hnd, + ioqueue->connecting_handles[pos], + &net_events); + *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; + + /* Erase socket from pending connect. */ + erase_connecting_socket(ioqueue, pos); + } + pj_lock_release(ioqueue->lock); + } + return key; +} +#endif + + +PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, + pj_size_t max_fd, + int max_threads, + pj_ioqueue_t **ioqueue) +{ + pj_ioqueue_t *ioq; + pj_status_t rc; + + PJ_UNUSED_ARG(max_fd); + PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); + + ioq = pj_pool_zalloc(pool, sizeof(*ioq)); + ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); + if (ioq->iocp == NULL) + return PJ_RETURN_OS_ERROR(GetLastError()); + + rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock); + if (rc != PJ_SUCCESS) { + CloseHandle(ioq->iocp); + return rc; + } + + ioq->auto_delete_lock = PJ_TRUE; + + *ioqueue = ioq; + + PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq)); + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) +{ + unsigned i; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(ioque, PJ_EINVAL); + + /* Destroy events in the pool */ + for (i=0; i<ioque->event_count; ++i) { + CloseHandle(ioque->event_pool[i]); + } + ioque->event_count = 0; + + if (ioque->auto_delete_lock) + pj_lock_destroy(ioque->lock); + + if (CloseHandle(ioque->iocp) == TRUE) + return PJ_SUCCESS; + else + return PJ_RETURN_OS_ERROR(GetLastError()); +} + +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); + + if (ioque->auto_delete_lock) { + pj_lock_destroy(ioque->lock); + } + + ioque->lock = lock; + ioque->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, + pj_ioqueue_t *ioque, + pj_sock_t hnd, + void *user_data, + const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **key ) +{ + HANDLE hioq; + pj_ioqueue_key_t *rec; + + PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL); + + rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rec->hnd = (HANDLE)hnd; + rec->user_data = user_data; + pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); +#if PJ_HAS_TCP + rec->accept_overlapped.newsock = PJ_INVALID_SOCKET; +#endif + hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0); + if (!hioq) { + return PJ_RETURN_OS_ERROR(GetLastError()); + } + + *key = rec; + return PJ_SUCCESS; +} + + + +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + +#if PJ_HAS_TCP + if (key->connecting) { + unsigned pos; + + /* Erase from connecting_handles */ + pj_lock_acquire(ioque->lock); + for (pos=0; pos < ioque->connecting_count; ++pos) { + if (ioque->connecting_keys[pos] == key) { + erase_connecting_socket(ioque, pos); + if (key->accept_overlapped.newsock_ptr) { + /* ??? shouldn't it be newsock instead of newsock_ptr??? */ + closesocket(*key->accept_overlapped.newsock_ptr); + } + break; + } + } + pj_lock_release(ioque->lock); + key->connecting = 0; + } +#endif + return PJ_SUCCESS; +} + +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key, NULL); + return key->user_data; +} + +/* + * Poll for events. + */ +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +{ + DWORD dwMsec, dwBytesTransfered, dwKey; + ioqueue_overlapped *ov; + pj_ioqueue_key_t *key; + pj_ssize_t size_status; + BOOL rc; + + PJ_ASSERT_RETURN(ioque, -PJ_EINVAL); + + /* Check the connecting array. */ +#if PJ_HAS_TCP + key = check_connecting(ioque, &size_status); + if (key != NULL) { + key->cb.on_connect_complete(key, (int)size_status); + return 1; + } +#endif + + /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ + dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; + + /* Poll for completion status. */ + rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey, + (OVERLAPPED**)&ov, dwMsec); + + /* The return value is: + * - nonzero if event was dequeued. + * - zero and ov==NULL if no event was dequeued. + * - zero and ov!=NULL if event for failed I/O was dequeued. + */ + if (ov) { + /* Event was dequeued for either successfull or failed I/O */ + key = (pj_ioqueue_key_t*)dwKey; + size_status = dwBytesTransfered; + switch (ov->operation) { + case PJ_IOQUEUE_OP_READ: + case PJ_IOQUEUE_OP_RECV: + case PJ_IOQUEUE_OP_RECV_FROM: + key->recv_overlapped.operation = 0; + if (key->cb.on_read_complete) + key->cb.on_read_complete(key, size_status); + break; + case PJ_IOQUEUE_OP_WRITE: + case PJ_IOQUEUE_OP_SEND: + case PJ_IOQUEUE_OP_SEND_TO: + key->send_overlapped.operation = 0; + if (key->cb.on_write_complete) + key->cb.on_write_complete(key, size_status); + break; +#if PJ_HAS_TCP + case PJ_IOQUEUE_OP_ACCEPT: + /* special case for accept. */ + ioqueue_on_accept_complete((ioqueue_accept_rec*)ov); + if (key->cb.on_accept_complete) + key->cb.on_accept_complete(key, key->accept_overlapped.newsock, + 0); + break; + case PJ_IOQUEUE_OP_CONNECT: +#endif + case PJ_IOQUEUE_OP_NONE: + pj_assert(0); + break; + } + return 1; + } + + if (GetLastError()==WAIT_TIMEOUT) { + /* Check the connecting array. */ +#if PJ_HAS_TCP + key = check_connecting(ioque, &size_status); + if (key != NULL) { + key->cb.on_connect_complete(key, (int)size_status); + return 1; + } +#endif + return 0; + } + return -1; +} + +/* + * pj_ioqueue_read() + * + * Initiate overlapped ReadFile operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + void *buffer, + pj_size_t buflen) +{ + BOOL rc; + DWORD bytesRead; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this descriptor"); + return PJ_EBUSY; + } + + pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); + key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; + + rc = ReadFile(key->hnd, buffer, buflen, &bytesRead, + &key->recv_overlapped.overlapped); + if (rc == FALSE) { + DWORD dwStatus = GetLastError(); + if (dwStatus==ERROR_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + /* + * This is workaround to a probable bug in Win2000 (probably NT too). + * Even if 'rc' is TRUE, which indicates operation has completed, + * GetQueuedCompletionStatus still will return the key. + * So as work around, we always return PJ_EPENDING here. + */ + return PJ_EPENDING; + } +} + +/* + * pj_ioqueue_recv() + * + * Initiate overlapped WSARecv() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + void *buffer, + pj_size_t buflen, + unsigned flags ) +{ + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this socket"); + return PJ_EBUSY; + } + + pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); + key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; + + key->recv_overlapped.wsabuf.buf = buffer; + key->recv_overlapped.wsabuf.len = buflen; + + dwFlags = flags; + + rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, + &bytesRead, &dwFlags, + &key->recv_overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus==WSA_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + /* Must always return pending status. + * See comments on pj_ioqueue_read + * return bytesRead; + */ + return PJ_EPENDING; + } +} + +/* + * pj_ioqueue_recvfrom() + * + * Initiate overlapped RecvFrom() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + void *buffer, + pj_size_t buflen, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + BOOL rc; + DWORD bytesRead; + DWORD dwFlags; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this socket"); + return PJ_EBUSY; + } + + pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); + key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM; + key->recv_overlapped.wsabuf.buf = buffer; + key->recv_overlapped.wsabuf.len = buflen; + dwFlags = flags; + rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, + &bytesRead, &dwFlags, + addr, addrlen, + &key->recv_overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus==WSA_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + /* Must always return pending status. + * See comments on pj_ioqueue_read + * return bytesRead; + */ + return PJ_EPENDING; + } +} + +/* + * pj_ioqueue_write() + * + * Initiate overlapped WriteFile() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + const void *data, + pj_size_t datalen) +{ + BOOL rc; + DWORD bytesWritten; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this descriptor"); + return PJ_EBUSY; + } + + pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); + key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; + rc = WriteFile(key->hnd, data, datalen, &bytesWritten, + &key->send_overlapped.overlapped); + + if (rc == FALSE) { + DWORD dwStatus = GetLastError(); + if (dwStatus==ERROR_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + /* Must always return pending status. + * See comments on pj_ioqueue_read + * return bytesWritten; + */ + return PJ_EPENDING; + } +} + + +/* + * pj_ioqueue_send() + * + * Initiate overlapped Send operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + const void *data, + pj_size_t datalen, + unsigned flags ) +{ + int rc; + DWORD bytesWritten; + DWORD dwFlags; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this socket"); + return PJ_EBUSY; + } + + pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); + key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; + key->send_overlapped.wsabuf.buf = (void*)data; + key->send_overlapped.wsabuf.len = datalen; + dwFlags = flags; + rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, + &bytesWritten, dwFlags, + &key->send_overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus==WSA_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + /* Must always return pending status. + * See comments on pj_ioqueue_read + * return bytesRead; + */ + return PJ_EPENDING; + } +} + + +/* + * pj_ioqueue_sendto() + * + * Initiate overlapped SendTo operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, + pj_ioqueue_key_t *key, + const void *data, + pj_size_t datalen, + unsigned flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + BOOL rc; + DWORD bytesSent; + DWORD dwFlags; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioque); + + if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this socket"); + return PJ_EBUSY; + } + + pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); + key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO; + key->send_overlapped.wsabuf.buf = (char*)data; + key->send_overlapped.wsabuf.len = datalen; + dwFlags = flags; + rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, + &bytesSent, dwFlags, addr, + addrlen, &key->send_overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus==WSA_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } else { + // Must always return pending status. + // See comments on pj_ioqueue_read + // return bytesSent; + return PJ_EPENDING; + } +} + +#if PJ_HAS_TCP + +/* + * pj_ioqueue_accept() + * + * Initiate overlapped accept() operation. + */ +PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + BOOL rc; + DWORD bytesReceived; + pj_status_t status; + + PJ_CHECK_STACK(); + PJ_UNUSED_ARG(ioqueue); + + if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) { + pj_assert(!"Operation already pending for this socket"); + return PJ_EBUSY; + } + + if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) { + pj_sock_t sock; + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); + if (status != PJ_SUCCESS) + return status; + + key->accept_overlapped.newsock = sock; + } + key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT; + key->accept_overlapped.addrlen = addrlen; + key->accept_overlapped.local = local; + key->accept_overlapped.remote = remote; + key->accept_overlapped.newsock_ptr = new_sock; + pj_memset(&key->accept_overlapped.overlapped, 0, + sizeof(key->accept_overlapped.overlapped)); + + rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock, + key->accept_overlapped.accept_buf, + 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, + &bytesReceived, + &key->accept_overlapped.overlapped); + + if (rc == TRUE) { + ioqueue_on_accept_complete(&key->accept_overlapped); + if (key->cb.on_accept_complete) + key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0); + return PJ_SUCCESS; + } else { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus==WSA_IO_PENDING) + return PJ_EPENDING; + else + return PJ_STATUS_FROM_OS(dwStatus); + } +} + + +/* + * pj_ioqueue_connect() + * + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + unsigned long optval = 1; + HANDLE hEvent; + + PJ_CHECK_STACK(); + + /* Set socket to non-blocking. */ + if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + /* Initiate connect() */ + if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { + DWORD dwStatus; + dwStatus = WSAGetLastError(); + if (dwStatus != WSAEWOULDBLOCK) { + /* Permanent error */ + return PJ_RETURN_OS_ERROR(dwStatus); + } else { + /* Pending operation. This is what we're looking for. */ + } + } else { + /* Connect has completed immediately! */ + /* Restore to blocking mode. */ + optval = 0; + if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + key->cb.on_connect_complete(key, 0); + return PJ_SUCCESS; + } + + /* Add to the array of connecting socket to be polled */ + pj_lock_acquire(ioqueue->lock); + + if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) { + pj_lock_release(ioqueue->lock); + return PJ_ETOOMANYCONN; + } + + /* Get or create event object. */ + if (ioqueue->event_count) { + hEvent = ioqueue->event_pool[ioqueue->event_count - 1]; + --ioqueue->event_count; + } else { + hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (hEvent == NULL) { + DWORD dwStatus = GetLastError(); + pj_lock_release(ioqueue->lock); + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Mark key as connecting. + * We can't use array index since key can be removed dynamically. + */ + key->connecting = 1; + + /* Associate socket events to the event object. */ + if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) { + CloseHandle(hEvent); + pj_lock_release(ioqueue->lock); + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + /* Add to array. */ + ioqueue->connecting_keys[ ioqueue->connecting_count ] = key; + ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent; + ioqueue->connecting_count++; + + pj_lock_release(ioqueue->lock); + + return PJ_EPENDING; +} +#endif /* #if PJ_HAS_TCP */ + |