From a08b589d09d5197f9a76d549a189e4686bd2ca8c Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 13 Nov 2005 19:40:44 +0000 Subject: Applying license to pjproject git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@49 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_winnt.c | 1807 +++++++++++++++++++++--------------------- 1 file changed, 914 insertions(+), 893 deletions(-) (limited to 'pjlib/src/pj/ioqueue_winnt.c') diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 777cb6be..7e3e31b9 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -1,896 +1,917 @@ -/* $Id$ - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0 -# include -#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0 -# include -#endif - -#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0 -# include -#endif - - -/* The address specified in AcceptEx() must be 16 more than the size of - * SOCKADDR (source: MSDN). - */ -#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) - -typedef struct generic_overlapped -{ - WSAOVERLAPPED overlapped; - pj_ioqueue_operation_e operation; -} generic_overlapped; - -/* - * OVERLAPPPED structure for send and receive. - */ -typedef struct ioqueue_overlapped -{ - WSAOVERLAPPED overlapped; - pj_ioqueue_operation_e operation; - WSABUF wsabuf; - pj_sockaddr_in dummy_addr; - int dummy_addrlen; -} ioqueue_overlapped; - -#if PJ_HAS_TCP -/* - * OVERLAP structure for accept. - */ -typedef struct ioqueue_accept_rec -{ - WSAOVERLAPPED overlapped; - pj_ioqueue_operation_e operation; - pj_sock_t newsock; - pj_sock_t *newsock_ptr; - int *addrlen; - void *remote; - void *local; - char accept_buf[2 * ACCEPT_ADDR_LEN]; -} ioqueue_accept_rec; -#endif - -/* - * Structure to hold pending operation key. - */ -union operation_key -{ - generic_overlapped generic; - ioqueue_overlapped overlapped; -#if PJ_HAS_TCP - ioqueue_accept_rec accept; -#endif -}; - -/* Type of handle in the key. */ -enum handle_type -{ - HND_IS_UNKNOWN, - HND_IS_FILE, - HND_IS_SOCKET, -}; - -/* - * Structure for individual socket. - */ -struct pj_ioqueue_key_t -{ - pj_ioqueue_t *ioqueue; - HANDLE hnd; - void *user_data; - enum handle_type hnd_type; -#if PJ_HAS_TCP - int connecting; -#endif - pj_ioqueue_callback cb; -}; - -/* - * IO Queue structure. - */ -struct pj_ioqueue_t -{ - HANDLE iocp; - pj_lock_t *lock; - pj_bool_t auto_delete_lock; - unsigned event_count; - HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; -#if PJ_HAS_TCP - unsigned connecting_count; - HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; - pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; -#endif -}; - - -#if PJ_HAS_TCP -/* - * Process the socket when the overlapped accept() completed. - */ -static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) -{ - struct sockaddr *local; - struct sockaddr *remote; - int locallen, remotelen; - - PJ_CHECK_STACK(); - - /* Operation complete immediately. */ - GetAcceptExSockaddrs( accept_overlapped->accept_buf, - 0, - ACCEPT_ADDR_LEN, - ACCEPT_ADDR_LEN, - &local, - &locallen, - &remote, - &remotelen); - if (*accept_overlapped->addrlen > locallen) { - pj_memcpy(accept_overlapped->local, local, locallen); - pj_memcpy(accept_overlapped->remote, remote, locallen); - } else { - pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); - pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); - } - *accept_overlapped->addrlen = locallen; - if (accept_overlapped->newsock_ptr) - *accept_overlapped->newsock_ptr = accept_overlapped->newsock; - accept_overlapped->operation = 0; - accept_overlapped->newsock = PJ_INVALID_SOCKET; -} - -static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) -{ - pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; - HANDLE hEvent = ioqueue->connecting_handles[pos]; - - /* Remove key from array of connecting handles. */ - pj_array_erase(ioqueue->connecting_keys, sizeof(key), - ioqueue->connecting_count, pos); - pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE), - ioqueue->connecting_count, pos); - --ioqueue->connecting_count; - - /* Disassociate the socket from the event. */ - WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0); - - /* Put event object to pool. */ - if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) { - ioqueue->event_pool[ioqueue->event_count++] = hEvent; - } else { - /* Shouldn't happen. There should be no more pending connections - * than max. - */ - pj_assert(0); - CloseHandle(hEvent); - } - -} - -/* - * Poll for the completion of non-blocking connect(). - * If there's a completion, the function return the key of the completed - * socket, and 'result' argument contains the connect() result. If connect() - * succeeded, 'result' will have value zero, otherwise will have the error - * code. - */ -static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, - pj_ssize_t *connect_err ) -{ - pj_ioqueue_key_t *key = NULL; - - if (ioqueue->connecting_count) { - DWORD result; - - pj_lock_acquire(ioqueue->lock); - result = WaitForMultipleObjects(ioqueue->connecting_count, - ioqueue->connecting_handles, - FALSE, 0); - if (result >= WAIT_OBJECT_0 && - result < WAIT_OBJECT_0+ioqueue->connecting_count) - { - WSANETWORKEVENTS net_events; - - /* Got completed connect(). */ - unsigned pos = result - WAIT_OBJECT_0; - key = ioqueue->connecting_keys[pos]; - - /* See whether connect has succeeded. */ - WSAEnumNetworkEvents((pj_sock_t)key->hnd, - ioqueue->connecting_handles[pos], - &net_events); - *connect_err = - PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); - - /* Erase socket from pending connect. */ - erase_connecting_socket(ioqueue, pos); - } - pj_lock_release(ioqueue->lock); - } - return key; -} -#endif - -/* - * pj_ioqueue_name() - */ -PJ_DEF(const char*) pj_ioqueue_name(void) -{ - return "iocp"; -} - -/* - * pj_ioqueue_create() - */ -PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, - pj_size_t max_fd, - pj_ioqueue_t **p_ioqueue) -{ - pj_ioqueue_t *ioqueue; - pj_status_t rc; - - PJ_UNUSED_ARG(max_fd); - PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); - - rc = sizeof(union operation_key); - - /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ - PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= - sizeof(union operation_key), PJ_EBUG); - - ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); - ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if (ioqueue->iocp == NULL) - return PJ_RETURN_OS_ERROR(GetLastError()); - - rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); - if (rc != PJ_SUCCESS) { - CloseHandle(ioqueue->iocp); - return rc; - } - - ioqueue->auto_delete_lock = PJ_TRUE; - - *p_ioqueue = ioqueue; - - PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_destroy() - */ -PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) -{ - unsigned i; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); - - /* Destroy events in the pool */ - for (i=0; ievent_count; ++i) { - CloseHandle(ioqueue->event_pool[i]); - } - ioqueue->event_count = 0; - - if (CloseHandle(ioqueue->iocp) != TRUE) - return PJ_RETURN_OS_ERROR(GetLastError()); - - if (ioqueue->auto_delete_lock) - pj_lock_destroy(ioqueue->lock); - - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_set_lock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, - pj_lock_t *lock, - pj_bool_t auto_delete ) -{ - PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); - - if (ioqueue->auto_delete_lock) { - pj_lock_destroy(ioqueue->lock); - } - - ioqueue->lock = lock; - ioqueue->auto_delete_lock = auto_delete; - - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_register_sock() - */ -PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioqueue, - pj_sock_t sock, - void *user_data, - const pj_ioqueue_callback *cb, - pj_ioqueue_key_t **key ) -{ - HANDLE hioq; - pj_ioqueue_key_t *rec; - u_long value; - int rc; - - PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); - - /* Build the key for this socket. */ - rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - rec->ioqueue = ioqueue; - rec->hnd = (HANDLE)sock; - rec->hnd_type = HND_IS_SOCKET; - rec->user_data = user_data; - pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); - - /* Set socket to nonblocking. */ - value = 1; - rc = ioctlsocket(sock, FIONBIO, &value); - if (rc != 0) { - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } - - /* Associate with IOCP */ - hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); - if (!hioq) { - return PJ_RETURN_OS_ERROR(GetLastError()); - } - - *key = rec; - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_unregister() - */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key, PJ_EINVAL); - -#if PJ_HAS_TCP - if (key->connecting) { - unsigned pos; - pj_ioqueue_t *ioqueue; - - ioqueue = key->ioqueue; - - /* Erase from connecting_handles */ - pj_lock_acquire(ioqueue->lock); - for (pos=0; pos < ioqueue->connecting_count; ++pos) { - if (ioqueue->connecting_keys[pos] == key) { - erase_connecting_socket(ioqueue, pos); - break; - } - } - key->connecting = 0; - pj_lock_release(ioqueue->lock); - } -#endif - if (key->hnd_type == HND_IS_FILE) { - CloseHandle(key->hnd); - } - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_get_user_data() - */ -PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key, NULL); - return key->user_data; -} - -/* - * pj_ioqueue_set_user_data() - */ -PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, - void *user_data, - void **old_data ) -{ - PJ_ASSERT_RETURN(key, PJ_EINVAL); - - if (old_data) - *old_data = key->user_data; - - key->user_data = user_data; - return PJ_SUCCESS; -} - -/* - * pj_ioqueue_poll() - * - * Poll for events. - */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) -{ - DWORD dwMsec, dwBytesTransfered, dwKey; - generic_overlapped *pOv; - pj_ioqueue_key_t *key; - pj_ssize_t size_status; - BOOL rc; - - PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); - - /* Check the connecting array. */ -#if PJ_HAS_TCP - key = check_connecting(ioqueue, &size_status); - if (key != NULL) { - key->cb.on_connect_complete(key, (int)size_status); - return 1; - } -#endif - - /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ - dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; - - /* Poll for completion status. */ - rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, - (OVERLAPPED**)&pOv, dwMsec); - - /* The return value is: - * - nonzero if event was dequeued. - * - zero and pOv==NULL if no event was dequeued. - * - zero and pOv!=NULL if event for failed I/O was dequeued. - */ - if (pOv) { - /* Event was dequeued for either successfull or failed I/O */ - key = (pj_ioqueue_key_t*)dwKey; - size_status = dwBytesTransfered; - switch (pOv->operation) { - case PJ_IOQUEUE_OP_READ: - case PJ_IOQUEUE_OP_RECV: - case PJ_IOQUEUE_OP_RECV_FROM: - pOv->operation = 0; - if (key->cb.on_read_complete) - key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, - size_status); - break; - case PJ_IOQUEUE_OP_WRITE: - case PJ_IOQUEUE_OP_SEND: - case PJ_IOQUEUE_OP_SEND_TO: - pOv->operation = 0; - if (key->cb.on_write_complete) - key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, - size_status); - break; -#if PJ_HAS_TCP - case PJ_IOQUEUE_OP_ACCEPT: - /* special case for accept. */ - ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); - if (key->cb.on_accept_complete) { - ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; - key->cb.on_accept_complete(key, - (pj_ioqueue_op_key_t*)pOv, - accept_rec->newsock, - PJ_SUCCESS); - } - break; - case PJ_IOQUEUE_OP_CONNECT: -#endif - case PJ_IOQUEUE_OP_NONE: - pj_assert(0); - break; - } - return 1; - } - - if (GetLastError()==WAIT_TIMEOUT) { - /* Check the connecting array (again). */ -#if PJ_HAS_TCP - key = check_connecting(ioqueue, &size_status); - if (key != NULL) { - key->cb.on_connect_complete(key, (int)size_status); - return 1; - } -#endif - return 0; - } - return -1; -} - -/* - * pj_ioqueue_recv() - * - * Initiate overlapped WSARecv() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags ) -{ - /* - * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and - * addrlen here. But unfortunately it generates EINVAL... :-( - * -bennylp - */ - int rc; - DWORD bytesRead; - DWORD dwFlags = 0; - union operation_key *op_key_rec; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); - - op_key_rec = (union operation_key*)op_key->internal__; - op_key_rec->overlapped.wsabuf.buf = buffer; - op_key_rec->overlapped.wsabuf.len = *length; - - dwFlags = flags; - - /* Try non-overlapped received first to see if data is - * immediately available. - */ - rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesRead, &dwFlags, NULL, NULL); - if (rc == 0) { - *length = bytesRead; - return PJ_SUCCESS; - } else { - DWORD dwError = WSAGetLastError(); - if (dwError != WSAEWOULDBLOCK) { - *length = -1; - return PJ_RETURN_OS_ERROR(dwError); - } - } - - /* - * No immediate data available. - * Register overlapped Recv() operation. - */ - pj_memset(&op_key_rec->overlapped.overlapped, 0, - sizeof(op_key_rec->overlapped.overlapped)); - op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; - - rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesRead, &dwFlags, - &op_key_rec->overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) { - *length = -1; - return PJ_STATUS_FROM_OS(dwStatus); - } - } - - /* Pending operation has been scheduled. */ - return PJ_EPENDING; -} - -/* - * pj_ioqueue_recvfrom() - * - * Initiate overlapped RecvFrom() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - void *buffer, - pj_ssize_t *length, - unsigned flags, - pj_sockaddr_t *addr, - int *addrlen) -{ - int rc; - DWORD bytesRead; - DWORD dwFlags = 0; - union operation_key *op_key_rec; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); - - op_key_rec = (union operation_key*)op_key->internal__; - op_key_rec->overlapped.wsabuf.buf = buffer; - op_key_rec->overlapped.wsabuf.len = *length; - - dwFlags = flags; - - /* Try non-overlapped received first to see if data is - * immediately available. - */ - rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); - if (rc == 0) { - *length = bytesRead; - return PJ_SUCCESS; - } else { - DWORD dwError = WSAGetLastError(); - if (dwError != WSAEWOULDBLOCK) { - *length = -1; - return PJ_RETURN_OS_ERROR(dwError); - } - } - - /* - * No immediate data available. - * Register overlapped Recv() operation. - */ - pj_memset(&op_key_rec->overlapped.overlapped, 0, - sizeof(op_key_rec->overlapped.overlapped)); - op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; - - rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesRead, &dwFlags, addr, addrlen, - &op_key_rec->overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) { - *length = -1; - return PJ_STATUS_FROM_OS(dwStatus); - } - } - - /* Pending operation has been scheduled. */ - return PJ_EPENDING; -} - -/* - * pj_ioqueue_send() - * - * Initiate overlapped Send operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags ) -{ - return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); -} - - -/* - * pj_ioqueue_sendto() - * - * Initiate overlapped SendTo operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - const void *data, - pj_ssize_t *length, - unsigned flags, - const pj_sockaddr_t *addr, - int addrlen) -{ - int rc; - DWORD bytesWritten; - DWORD dwFlags; - union operation_key *op_key_rec; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); - - op_key_rec = (union operation_key*)op_key->internal__; - - dwFlags = flags; - - /* - * First try blocking write. - */ - op_key_rec->overlapped.wsabuf.buf = (void*)data; - op_key_rec->overlapped.wsabuf.len = *length; - - rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesWritten, dwFlags, addr, addrlen, - NULL, NULL); - if (rc == 0) { - *length = bytesWritten; - return PJ_SUCCESS; - } else { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus != WSAEWOULDBLOCK) { - *length = -1; - return PJ_RETURN_OS_ERROR(dwStatus); - } - } - - /* - * Data can't be sent immediately. - * Schedule asynchronous WSASend(). - */ - pj_memset(&op_key_rec->overlapped.overlapped, 0, - sizeof(op_key_rec->overlapped.overlapped)); - op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; - - rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, - &bytesWritten, dwFlags, addr, addrlen, - &op_key_rec->overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) - return PJ_STATUS_FROM_OS(dwStatus); - } - - /* Asynchronous operation successfully submitted. */ - return PJ_EPENDING; -} - -#if PJ_HAS_TCP - -/* - * pj_ioqueue_accept() - * - * Initiate overlapped accept() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) -{ - BOOL rc; - DWORD bytesReceived; - pj_status_t status; - union operation_key *op_key_rec; - SOCKET sock; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); - - /* - * See if there is a new connection immediately available. - */ - sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); - if (sock != INVALID_SOCKET) { - /* Yes! New socket is available! */ - int status; - - status = getsockname(sock, local, addrlen); - if (status != 0) { - DWORD dwError = WSAGetLastError(); - closesocket(sock); - return PJ_RETURN_OS_ERROR(dwError); - } - - *new_sock = sock; - return PJ_SUCCESS; - - } else { - DWORD dwError = WSAGetLastError(); - if (dwError != WSAEWOULDBLOCK) { - return PJ_RETURN_OS_ERROR(dwError); - } - } - - /* - * No connection is immediately available. - * Must schedule an asynchronous operation. - */ - op_key_rec = (union operation_key*)op_key->internal__; - - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, - &op_key_rec->accept.newsock); - if (status != PJ_SUCCESS) - return status; - - /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket - * addresses can be obtained with getsockname() and getpeername(). - */ - status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&key->hnd, sizeof(SOCKET)); - /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. - * So ignore the error status. - */ - - op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; - op_key_rec->accept.addrlen = addrlen; - op_key_rec->accept.local = local; - op_key_rec->accept.remote = remote; - op_key_rec->accept.newsock_ptr = new_sock; - pj_memset(&op_key_rec->accept.overlapped, 0, - sizeof(op_key_rec->accept.overlapped)); - - rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, - op_key_rec->accept.accept_buf, - 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, - &bytesReceived, - &op_key_rec->accept.overlapped ); - - if (rc == TRUE) { - ioqueue_on_accept_complete(&op_key_rec->accept); - return PJ_SUCCESS; - } else { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus!=WSA_IO_PENDING) - return PJ_STATUS_FROM_OS(dwStatus); - } - - /* Asynchronous Accept() has been submitted. */ - return PJ_EPENDING; -} - - -/* - * pj_ioqueue_connect() - * - * Initiate overlapped connect() operation (well, it's non-blocking actually, - * since there's no overlapped version of connect()). - */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, - const pj_sockaddr_t *addr, - int addrlen ) -{ - HANDLE hEvent; - pj_ioqueue_t *ioqueue; - - PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); - - /* Initiate connect() */ - if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { - DWORD dwStatus; - dwStatus = WSAGetLastError(); - if (dwStatus != WSAEWOULDBLOCK) { - return PJ_RETURN_OS_ERROR(dwStatus); - } - } else { - /* Connect has completed immediately! */ - return PJ_SUCCESS; - } - - ioqueue = key->ioqueue; - - /* Add to the array of connecting socket to be polled */ - pj_lock_acquire(ioqueue->lock); - - if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) { - pj_lock_release(ioqueue->lock); - return PJ_ETOOMANYCONN; - } - - /* Get or create event object. */ - if (ioqueue->event_count) { - hEvent = ioqueue->event_pool[ioqueue->event_count - 1]; - --ioqueue->event_count; - } else { - hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - if (hEvent == NULL) { - DWORD dwStatus = GetLastError(); - pj_lock_release(ioqueue->lock); - return PJ_STATUS_FROM_OS(dwStatus); - } - } - - /* Mark key as connecting. - * We can't use array index since key can be removed dynamically. - */ - key->connecting = 1; - - /* Associate socket events to the event object. */ - if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) { - CloseHandle(hEvent); - pj_lock_release(ioqueue->lock); - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } - - /* Add to array. */ - ioqueue->connecting_keys[ ioqueue->connecting_count ] = key; - ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent; - ioqueue->connecting_count++; - - pj_lock_release(ioqueue->lock); - - return PJ_EPENDING; -} -#endif /* #if PJ_HAS_TCP */ - +/* $Id$ + */ +/* + * PJLIB - PJ Foundation Library + * (C)2003-2005 Benny Prijono + * + * Author: + * Benny Prijono + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#if defined(PJ_HAS_WINSOCK2_H) && PJ_HAS_WINSOCK2_H != 0 +# include +#elif defined(PJ_HAS_WINSOCK_H) && PJ_HAS_WINSOCK_H != 0 +# include +#endif + +#if defined(PJ_HAS_MSWSOCK_H) && PJ_HAS_MSWSOCK_H != 0 +# include +#endif + + +/* The address specified in AcceptEx() must be 16 more than the size of + * SOCKADDR (source: MSDN). + */ +#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) + +typedef struct generic_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; +} generic_overlapped; + +/* + * OVERLAPPPED structure for send and receive. + */ +typedef struct ioqueue_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; + WSABUF wsabuf; + pj_sockaddr_in dummy_addr; + int dummy_addrlen; +} ioqueue_overlapped; + +#if PJ_HAS_TCP +/* + * OVERLAP structure for accept. + */ +typedef struct ioqueue_accept_rec +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; + pj_sock_t newsock; + pj_sock_t *newsock_ptr; + int *addrlen; + void *remote; + void *local; + char accept_buf[2 * ACCEPT_ADDR_LEN]; +} ioqueue_accept_rec; +#endif + +/* + * Structure to hold pending operation key. + */ +union operation_key +{ + generic_overlapped generic; + ioqueue_overlapped overlapped; +#if PJ_HAS_TCP + ioqueue_accept_rec accept; +#endif +}; + +/* Type of handle in the key. */ +enum handle_type +{ + HND_IS_UNKNOWN, + HND_IS_FILE, + HND_IS_SOCKET, +}; + +/* + * Structure for individual socket. + */ +struct pj_ioqueue_key_t +{ + pj_ioqueue_t *ioqueue; + HANDLE hnd; + void *user_data; + enum handle_type hnd_type; +#if PJ_HAS_TCP + int connecting; +#endif + pj_ioqueue_callback cb; +}; + +/* + * IO Queue structure. + */ +struct pj_ioqueue_t +{ + HANDLE iocp; + pj_lock_t *lock; + pj_bool_t auto_delete_lock; + unsigned event_count; + HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; +#if PJ_HAS_TCP + unsigned connecting_count; + HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; + pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; +#endif +}; + + +#if PJ_HAS_TCP +/* + * Process the socket when the overlapped accept() completed. + */ +static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) +{ + struct sockaddr *local; + struct sockaddr *remote; + int locallen, remotelen; + + PJ_CHECK_STACK(); + + /* Operation complete immediately. */ + GetAcceptExSockaddrs( accept_overlapped->accept_buf, + 0, + ACCEPT_ADDR_LEN, + ACCEPT_ADDR_LEN, + &local, + &locallen, + &remote, + &remotelen); + if (*accept_overlapped->addrlen > locallen) { + pj_memcpy(accept_overlapped->local, local, locallen); + pj_memcpy(accept_overlapped->remote, remote, locallen); + } else { + pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); + pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); + } + *accept_overlapped->addrlen = locallen; + if (accept_overlapped->newsock_ptr) + *accept_overlapped->newsock_ptr = accept_overlapped->newsock; + accept_overlapped->operation = 0; + accept_overlapped->newsock = PJ_INVALID_SOCKET; +} + +static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) +{ + pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; + HANDLE hEvent = ioqueue->connecting_handles[pos]; + + /* Remove key from array of connecting handles. */ + pj_array_erase(ioqueue->connecting_keys, sizeof(key), + ioqueue->connecting_count, pos); + pj_array_erase(ioqueue->connecting_handles, sizeof(HANDLE), + ioqueue->connecting_count, pos); + --ioqueue->connecting_count; + + /* Disassociate the socket from the event. */ + WSAEventSelect((pj_sock_t)key->hnd, hEvent, 0); + + /* Put event object to pool. */ + if (ioqueue->event_count < MAXIMUM_WAIT_OBJECTS) { + ioqueue->event_pool[ioqueue->event_count++] = hEvent; + } else { + /* Shouldn't happen. There should be no more pending connections + * than max. + */ + pj_assert(0); + CloseHandle(hEvent); + } + +} + +/* + * Poll for the completion of non-blocking connect(). + * If there's a completion, the function return the key of the completed + * socket, and 'result' argument contains the connect() result. If connect() + * succeeded, 'result' will have value zero, otherwise will have the error + * code. + */ +static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, + pj_ssize_t *connect_err ) +{ + pj_ioqueue_key_t *key = NULL; + + if (ioqueue->connecting_count) { + DWORD result; + + pj_lock_acquire(ioqueue->lock); + result = WaitForMultipleObjects(ioqueue->connecting_count, + ioqueue->connecting_handles, + FALSE, 0); + if (result >= WAIT_OBJECT_0 && + result < WAIT_OBJECT_0+ioqueue->connecting_count) + { + WSANETWORKEVENTS net_events; + + /* Got completed connect(). */ + unsigned pos = result - WAIT_OBJECT_0; + key = ioqueue->connecting_keys[pos]; + + /* See whether connect has succeeded. */ + WSAEnumNetworkEvents((pj_sock_t)key->hnd, + ioqueue->connecting_handles[pos], + &net_events); + *connect_err = + PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); + + /* Erase socket from pending connect. */ + erase_connecting_socket(ioqueue, pos); + } + pj_lock_release(ioqueue->lock); + } + return key; +} +#endif + +/* + * pj_ioqueue_name() + */ +PJ_DEF(const char*) pj_ioqueue_name(void) +{ + return "iocp"; +} + +/* + * pj_ioqueue_create() + */ +PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, + pj_size_t max_fd, + pj_ioqueue_t **p_ioqueue) +{ + pj_ioqueue_t *ioqueue; + pj_status_t rc; + + PJ_UNUSED_ARG(max_fd); + PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); + + rc = sizeof(union operation_key); + + /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); + ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (ioqueue->iocp == NULL) + return PJ_RETURN_OS_ERROR(GetLastError()); + + rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); + if (rc != PJ_SUCCESS) { + CloseHandle(ioqueue->iocp); + return rc; + } + + ioqueue->auto_delete_lock = PJ_TRUE; + + *p_ioqueue = ioqueue; + + PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_destroy() + */ +PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) +{ + unsigned i; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + + /* Destroy events in the pool */ + for (i=0; ievent_count; ++i) { + CloseHandle(ioqueue->event_pool[i]); + } + ioqueue->event_count = 0; + + if (CloseHandle(ioqueue->iocp) != TRUE) + return PJ_RETURN_OS_ERROR(GetLastError()); + + if (ioqueue->auto_delete_lock) + pj_lock_destroy(ioqueue->lock); + + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, + pj_lock_t *lock, + pj_bool_t auto_delete ) +{ + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); + + if (ioqueue->auto_delete_lock) { + pj_lock_destroy(ioqueue->lock); + } + + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; + + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_register_sock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **key ) +{ + HANDLE hioq; + pj_ioqueue_key_t *rec; + u_long value; + int rc; + + PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); + + /* Build the key for this socket. */ + rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rec->ioqueue = ioqueue; + rec->hnd = (HANDLE)sock; + rec->hnd_type = HND_IS_SOCKET; + rec->user_data = user_data; + pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); + + /* Set socket to nonblocking. */ + value = 1; + rc = ioctlsocket(sock, FIONBIO, &value); + if (rc != 0) { + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + /* Associate with IOCP */ + hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); + if (!hioq) { + return PJ_RETURN_OS_ERROR(GetLastError()); + } + + *key = rec; + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_unregister() + */ +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + +#if PJ_HAS_TCP + if (key->connecting) { + unsigned pos; + pj_ioqueue_t *ioqueue; + + ioqueue = key->ioqueue; + + /* Erase from connecting_handles */ + pj_lock_acquire(ioqueue->lock); + for (pos=0; pos < ioqueue->connecting_count; ++pos) { + if (ioqueue->connecting_keys[pos] == key) { + erase_connecting_socket(ioqueue, pos); + break; + } + } + key->connecting = 0; + pj_lock_release(ioqueue->lock); + } +#endif + if (key->hnd_type == HND_IS_FILE) { + CloseHandle(key->hnd); + } + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_get_user_data() + */ +PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) +{ + PJ_ASSERT_RETURN(key, NULL); + return key->user_data; +} + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data ) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + + key->user_data = user_data; + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_poll() + * + * Poll for events. + */ +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) +{ + DWORD dwMsec, dwBytesTransfered, dwKey; + generic_overlapped *pOv; + pj_ioqueue_key_t *key; + pj_ssize_t size_status; + BOOL rc; + + PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); + + /* Check the connecting array. */ +#if PJ_HAS_TCP + key = check_connecting(ioqueue, &size_status); + if (key != NULL) { + key->cb.on_connect_complete(key, (int)size_status); + return 1; + } +#endif + + /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ + dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; + + /* Poll for completion status. */ + rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, + (OVERLAPPED**)&pOv, dwMsec); + + /* The return value is: + * - nonzero if event was dequeued. + * - zero and pOv==NULL if no event was dequeued. + * - zero and pOv!=NULL if event for failed I/O was dequeued. + */ + if (pOv) { + /* Event was dequeued for either successfull or failed I/O */ + key = (pj_ioqueue_key_t*)dwKey; + size_status = dwBytesTransfered; + switch (pOv->operation) { + case PJ_IOQUEUE_OP_READ: + case PJ_IOQUEUE_OP_RECV: + case PJ_IOQUEUE_OP_RECV_FROM: + pOv->operation = 0; + if (key->cb.on_read_complete) + key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, + size_status); + break; + case PJ_IOQUEUE_OP_WRITE: + case PJ_IOQUEUE_OP_SEND: + case PJ_IOQUEUE_OP_SEND_TO: + pOv->operation = 0; + if (key->cb.on_write_complete) + key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, + size_status); + break; +#if PJ_HAS_TCP + case PJ_IOQUEUE_OP_ACCEPT: + /* special case for accept. */ + ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); + if (key->cb.on_accept_complete) { + ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; + key->cb.on_accept_complete(key, + (pj_ioqueue_op_key_t*)pOv, + accept_rec->newsock, + PJ_SUCCESS); + } + break; + case PJ_IOQUEUE_OP_CONNECT: +#endif + case PJ_IOQUEUE_OP_NONE: + pj_assert(0); + break; + } + return 1; + } + + if (GetLastError()==WAIT_TIMEOUT) { + /* Check the connecting array (again). */ +#if PJ_HAS_TCP + key = check_connecting(ioqueue, &size_status); + if (key != NULL) { + key->cb.on_connect_complete(key, (int)size_status); + return 1; + } +#endif + return 0; + } + return -1; +} + +/* + * pj_ioqueue_recv() + * + * Initiate overlapped WSARecv() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags ) +{ + /* + * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and + * addrlen here. But unfortunately it generates EINVAL... :-( + * -bennylp + */ + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; +} + +/* + * pj_ioqueue_recvfrom() + * + * Initiate overlapped RecvFrom() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + void *buffer, + pj_ssize_t *length, + unsigned flags, + pj_sockaddr_t *addr, + int *addrlen) +{ + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; +} + +/* + * pj_ioqueue_send() + * + * Initiate overlapped Send operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags ) +{ + return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); +} + + +/* + * pj_ioqueue_sendto() + * + * Initiate overlapped SendTo operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + const void *data, + pj_ssize_t *length, + unsigned flags, + const pj_sockaddr_t *addr, + int addrlen) +{ + int rc; + DWORD bytesWritten; + DWORD dwFlags; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + + dwFlags = flags; + + /* + * First try blocking write. + */ + op_key_rec->overlapped.wsabuf.buf = (void*)data; + op_key_rec->overlapped.wsabuf.len = *length; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + NULL, NULL); + if (rc == 0) { + *length = bytesWritten; + return PJ_SUCCESS; + } else { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwStatus); + } + } + + /* + * Data can't be sent immediately. + * Schedule asynchronous WSASend(). + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) + return PJ_STATUS_FROM_OS(dwStatus); + } + + /* Asynchronous operation successfully submitted. */ + return PJ_EPENDING; +} + +#if PJ_HAS_TCP + +/* + * pj_ioqueue_accept() + * + * Initiate overlapped accept() operation. + */ +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) +{ + BOOL rc; + DWORD bytesReceived; + pj_status_t status; + union operation_key *op_key_rec; + SOCKET sock; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* + * See if there is a new connection immediately available. + */ + sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); + if (sock != INVALID_SOCKET) { + /* Yes! New socket is available! */ + int status; + + status = getsockname(sock, local, addrlen); + if (status != 0) { + DWORD dwError = WSAGetLastError(); + closesocket(sock); + return PJ_RETURN_OS_ERROR(dwError); + } + + *new_sock = sock; + return PJ_SUCCESS; + + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No connection is immediately available. + * Must schedule an asynchronous operation. + */ + op_key_rec = (union operation_key*)op_key->internal__; + + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, + &op_key_rec->accept.newsock); + if (status != PJ_SUCCESS) + return status; + + /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket + * addresses can be obtained with getsockname() and getpeername(). + */ + status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&key->hnd, sizeof(SOCKET)); + /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. + * So ignore the error status. + */ + + op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; + op_key_rec->accept.addrlen = addrlen; + op_key_rec->accept.local = local; + op_key_rec->accept.remote = remote; + op_key_rec->accept.newsock_ptr = new_sock; + pj_memset(&op_key_rec->accept.overlapped, 0, + sizeof(op_key_rec->accept.overlapped)); + + rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, + op_key_rec->accept.accept_buf, + 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, + &bytesReceived, + &op_key_rec->accept.overlapped ); + + if (rc == TRUE) { + ioqueue_on_accept_complete(&op_key_rec->accept); + return PJ_SUCCESS; + } else { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) + return PJ_STATUS_FROM_OS(dwStatus); + } + + /* Asynchronous Accept() has been submitted. */ + return PJ_EPENDING; +} + + +/* + * pj_ioqueue_connect() + * + * Initiate overlapped connect() operation (well, it's non-blocking actually, + * since there's no overlapped version of connect()). + */ +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, + const pj_sockaddr_t *addr, + int addrlen ) +{ + HANDLE hEvent; + pj_ioqueue_t *ioqueue; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + + /* Initiate connect() */ + if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { + DWORD dwStatus; + dwStatus = WSAGetLastError(); + if (dwStatus != WSAEWOULDBLOCK) { + return PJ_RETURN_OS_ERROR(dwStatus); + } + } else { + /* Connect has completed immediately! */ + return PJ_SUCCESS; + } + + ioqueue = key->ioqueue; + + /* Add to the array of connecting socket to be polled */ + pj_lock_acquire(ioqueue->lock); + + if (ioqueue->connecting_count >= MAXIMUM_WAIT_OBJECTS) { + pj_lock_release(ioqueue->lock); + return PJ_ETOOMANYCONN; + } + + /* Get or create event object. */ + if (ioqueue->event_count) { + hEvent = ioqueue->event_pool[ioqueue->event_count - 1]; + --ioqueue->event_count; + } else { + hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (hEvent == NULL) { + DWORD dwStatus = GetLastError(); + pj_lock_release(ioqueue->lock); + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Mark key as connecting. + * We can't use array index since key can be removed dynamically. + */ + key->connecting = 1; + + /* Associate socket events to the event object. */ + if (WSAEventSelect((pj_sock_t)key->hnd, hEvent, FD_CONNECT) != 0) { + CloseHandle(hEvent); + pj_lock_release(ioqueue->lock); + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + /* Add to array. */ + ioqueue->connecting_keys[ ioqueue->connecting_count ] = key; + ioqueue->connecting_handles[ ioqueue->connecting_count ] = hEvent; + ioqueue->connecting_count++; + + pj_lock_release(ioqueue->lock); + + return PJ_EPENDING; +} +#endif /* #if PJ_HAS_TCP */ + PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key, pj_size_t size ) -- cgit v1.2.3