From 7c7300624eb867fa7c1ea52b9c636889aac60e80 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 6 Nov 2005 09:37:47 +0000 Subject: Changed ioqueue to allow simultaneous operations on the same key git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_winnt.c | 834 ++++++++++++++++++++++--------------------- 1 file changed, 433 insertions(+), 401 deletions(-) (limited to 'pjlib/src/pj/ioqueue_winnt.c') diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index dbf883a4..afb75c54 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -22,17 +22,28 @@ # include #endif - -#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+20) + +/* The address specified in AcceptEx() must be 16 more than the size of + * SOCKADDR (source: MSDN). + */ +#define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) + +typedef struct generic_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; +} generic_overlapped; /* * OVERLAP structure for send and receive. */ typedef struct ioqueue_overlapped { - WSAOVERLAPPED overlapped; + WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; - WSABUF wsabuf; + WSABUF wsabuf; + pj_sockaddr_in dummy_addr; + int dummy_addrlen; } ioqueue_overlapped; #if PJ_HAS_TCP @@ -41,7 +52,7 @@ typedef struct ioqueue_overlapped */ typedef struct ioqueue_accept_rec { - WSAOVERLAPPED overlapped; + WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; pj_sock_t newsock; pj_sock_t *newsock_ptr; @@ -51,19 +62,29 @@ typedef struct ioqueue_accept_rec char accept_buf[2 * ACCEPT_ADDR_LEN]; } ioqueue_accept_rec; #endif + +/* + * Structure to hold pending operation key. + */ +union operation_key +{ + generic_overlapped generic; + ioqueue_overlapped overlapped; +#if PJ_HAS_TCP + ioqueue_accept_rec accept; +#endif +}; /* * Structure for individual socket. */ struct pj_ioqueue_key_t -{ +{ + pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; - ioqueue_overlapped recv_overlapped; - ioqueue_overlapped send_overlapped; #if PJ_HAS_TCP int connecting; - ioqueue_accept_rec accept_overlapped; #endif pj_ioqueue_callback cb; }; @@ -106,9 +127,14 @@ static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) &local, &locallen, &remote, - &remotelen); - pj_memcpy(accept_overlapped->local, local, locallen); - pj_memcpy(accept_overlapped->remote, remote, locallen); + &remotelen); + if (*accept_overlapped->addrlen > locallen) { + pj_memcpy(accept_overlapped->local, local, locallen); + pj_memcpy(accept_overlapped->remote, remote, locallen); + } else { + pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); + pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); + } *accept_overlapped->addrlen = locallen; if (accept_overlapped->newsock_ptr) *accept_overlapped->newsock_ptr = accept_overlapped->newsock; @@ -120,7 +146,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) { pj_ioqueue_key_t *key = ioqueue->connecting_keys[pos]; HANDLE hEvent = ioqueue->connecting_handles[pos]; - unsigned long optval; /* Remove key from array of connecting handles. */ pj_array_erase(ioqueue->connecting_keys, sizeof(key), @@ -143,12 +168,6 @@ static void erase_connecting_socket( pj_ioqueue_t *ioqueue, unsigned pos) CloseHandle(hEvent); } - /* Set socket to blocking again. */ - optval = 0; - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - DWORD dwStatus; - dwStatus = WSAGetLastError(); - } } /* @@ -183,7 +202,8 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, WSAEnumNetworkEvents((pj_sock_t)key->hnd, ioqueue->connecting_handles[pos], &net_events); - *connect_err = net_events.iErrorCode[FD_CONNECT_BIT]; + *connect_err = + PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); /* Erase socket from pending connect. */ erase_connecting_socket(ioqueue, pos); @@ -194,95 +214,121 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, } #endif - +/* + * pj_ioqueue_create() + */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, - int max_threads, - pj_ioqueue_t **ioqueue) + pj_ioqueue_t **p_ioqueue) { - pj_ioqueue_t *ioq; + pj_ioqueue_t *ioqueue; pj_status_t rc; PJ_UNUSED_ARG(max_fd); - PJ_ASSERT_RETURN(pool && ioqueue, PJ_EINVAL); - - ioq = pj_pool_zalloc(pool, sizeof(*ioq)); - ioq->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, max_threads); - if (ioq->iocp == NULL) + PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); + + rc = sizeof(union operation_key); + + /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); + + ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); + ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); - rc = pj_lock_create_simple_mutex(pool, NULL, &ioq->lock); + rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { - CloseHandle(ioq->iocp); + CloseHandle(ioqueue->iocp); return rc; } - ioq->auto_delete_lock = PJ_TRUE; + ioqueue->auto_delete_lock = PJ_TRUE; - *ioqueue = ioq; + *p_ioqueue = ioqueue; - PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioq)); + PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS; } - -PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioque ) + +/* + * pj_ioqueue_destroy() + */ +PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { unsigned i; PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(ioque, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); /* Destroy events in the pool */ - for (i=0; ievent_count; ++i) { - CloseHandle(ioque->event_pool[i]); + for (i=0; ievent_count; ++i) { + CloseHandle(ioqueue->event_pool[i]); } - ioque->event_count = 0; - - if (ioque->auto_delete_lock) - pj_lock_destroy(ioque->lock); - - if (CloseHandle(ioque->iocp) == TRUE) - return PJ_SUCCESS; - else - return PJ_RETURN_OS_ERROR(GetLastError()); + ioqueue->event_count = 0; + + if (CloseHandle(ioqueue->iocp) != TRUE) + return PJ_RETURN_OS_ERROR(GetLastError()); + + if (ioqueue->auto_delete_lock) + pj_lock_destroy(ioqueue->lock); + + return PJ_SUCCESS; } - -PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, + +/* + * pj_ioqueue_set_lock() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, pj_lock_t *lock, pj_bool_t auto_delete ) { - PJ_ASSERT_RETURN(ioque && lock, PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); - if (ioque->auto_delete_lock) { - pj_lock_destroy(ioque->lock); + if (ioqueue->auto_delete_lock) { + pj_lock_destroy(ioqueue->lock); } - ioque->lock = lock; - ioque->auto_delete_lock = auto_delete; + ioqueue->lock = lock; + ioqueue->auto_delete_lock = auto_delete; return PJ_SUCCESS; } - + +/* + * pj_ioqueue_register_sock() + */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, - pj_ioqueue_t *ioque, - pj_sock_t hnd, + pj_ioqueue_t *ioqueue, + pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **key ) { HANDLE hioq; - pj_ioqueue_key_t *rec; - - PJ_ASSERT_RETURN(pool && ioque && cb && key, PJ_EINVAL); - - rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - rec->hnd = (HANDLE)hnd; + pj_ioqueue_key_t *rec; + u_long value; + int rc; + + PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); + + /* Build the key for this socket. */ + rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); + rec->ioqueue = ioqueue; + rec->hnd = (HANDLE)sock; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); -#if PJ_HAS_TCP - rec->accept_overlapped.newsock = PJ_INVALID_SOCKET; -#endif - hioq = CreateIoCompletionPort((HANDLE)hnd, ioque->iocp, (DWORD)rec, 0); + + /* Set socket to nonblocking. */ + value = 1; + rc = ioctlsocket(sock, FIONBIO, &value); + if (rc != 0) { + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + + /* Associate with IOCP */ + hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { return PJ_RETURN_OS_ERROR(GetLastError()); } @@ -291,58 +337,78 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, return PJ_SUCCESS; } - - -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key ) +/* + * pj_ioqueue_unregister() + */ +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { - PJ_ASSERT_RETURN(ioque && key, PJ_EINVAL); + PJ_ASSERT_RETURN(key, PJ_EINVAL); #if PJ_HAS_TCP if (key->connecting) { unsigned pos; + pj_ioqueue_t *ioqueue; + + ioqueue = key->ioqueue; /* Erase from connecting_handles */ - pj_lock_acquire(ioque->lock); - for (pos=0; pos < ioque->connecting_count; ++pos) { - if (ioque->connecting_keys[pos] == key) { - erase_connecting_socket(ioque, pos); - if (key->accept_overlapped.newsock_ptr) { - /* ??? shouldn't it be newsock instead of newsock_ptr??? */ - closesocket(*key->accept_overlapped.newsock_ptr); - } + pj_lock_acquire(ioqueue->lock); + for (pos=0; pos < ioqueue->connecting_count; ++pos) { + if (ioqueue->connecting_keys[pos] == key) { + erase_connecting_socket(ioqueue, pos); break; } } - pj_lock_release(ioque->lock); - key->connecting = 0; + key->connecting = 0; + pj_lock_release(ioqueue->lock); } #endif return PJ_SUCCESS; } - + +/* + * pj_ioqueue_get_user_data() + */ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) { PJ_ASSERT_RETURN(key, NULL); return key->user_data; } - -/* + +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data ) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + + key->user_data = user_data; + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_poll() + * * Poll for events. */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { DWORD dwMsec, dwBytesTransfered, dwKey; - ioqueue_overlapped *ov; + generic_overlapped *pOv; pj_ioqueue_key_t *key; pj_ssize_t size_status; BOOL rc; - PJ_ASSERT_RETURN(ioque, -PJ_EINVAL); + PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); /* Check the connecting array. */ #if PJ_HAS_TCP - key = check_connecting(ioque, &size_status); + key = check_connecting(ioqueue, &size_status); if (key != NULL) { key->cb.on_connect_complete(key, (int)size_status); return 1; @@ -353,40 +419,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; /* Poll for completion status. */ - rc = GetQueuedCompletionStatus(ioque->iocp, &dwBytesTransfered, &dwKey, - (OVERLAPPED**)&ov, dwMsec); + rc = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, &dwKey, + (OVERLAPPED**)&pOv, dwMsec); /* The return value is: * - nonzero if event was dequeued. - * - zero and ov==NULL if no event was dequeued. - * - zero and ov!=NULL if event for failed I/O was dequeued. + * - zero and pOv==NULL if no event was dequeued. + * - zero and pOv!=NULL if event for failed I/O was dequeued. */ - if (ov) { + if (pOv) { /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; - switch (ov->operation) { + switch (pOv->operation) { case PJ_IOQUEUE_OP_READ: case PJ_IOQUEUE_OP_RECV: case PJ_IOQUEUE_OP_RECV_FROM: - key->recv_overlapped.operation = 0; + pOv->operation = 0; if (key->cb.on_read_complete) - key->cb.on_read_complete(key, size_status); + key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, + size_status); break; case PJ_IOQUEUE_OP_WRITE: case PJ_IOQUEUE_OP_SEND: case PJ_IOQUEUE_OP_SEND_TO: - key->send_overlapped.operation = 0; + pOv->operation = 0; if (key->cb.on_write_complete) - key->cb.on_write_complete(key, size_status); + key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, + size_status); break; #if PJ_HAS_TCP case PJ_IOQUEUE_OP_ACCEPT: /* special case for accept. */ - ioqueue_on_accept_complete((ioqueue_accept_rec*)ov); - if (key->cb.on_accept_complete) - key->cb.on_accept_complete(key, key->accept_overlapped.newsock, - 0); + ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); + if (key->cb.on_accept_complete) { + ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; + key->cb.on_accept_complete(key, + (pj_ioqueue_op_key_t*)pOv, + accept_rec->newsock, + PJ_SUCCESS); + } break; case PJ_IOQUEUE_OP_CONNECT: #endif @@ -398,9 +470,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) } if (GetLastError()==WAIT_TIMEOUT) { - /* Check the connecting array. */ + /* Check the connecting array (again). */ #if PJ_HAS_TCP - key = check_connecting(ioque, &size_status); + key = check_connecting(ioqueue, &size_status); if (key != NULL) { key->cb.on_connect_complete(key, (int)size_status); return 1; @@ -411,96 +483,73 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioque, const pj_time_val *timeout) return -1; } -/* - * pj_ioqueue_read() - * - * Initiate overlapped ReadFile operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_read( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - void *buffer, - pj_size_t buflen) -{ - BOOL rc; - DWORD bytesRead; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this descriptor"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; - - rc = ReadFile(key->hnd, buffer, buflen, &bytesRead, - &key->recv_overlapped.overlapped); - if (rc == FALSE) { - DWORD dwStatus = GetLastError(); - if (dwStatus==ERROR_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* - * This is workaround to a probable bug in Win2000 (probably NT too). - * Even if 'rc' is TRUE, which indicates operation has completed, - * GetQueuedCompletionStatus still will return the key. - * So as work around, we always return PJ_EPENDING here. - */ - return PJ_EPENDING; - } -} - /* * pj_ioqueue_recv() * * Initiate overlapped WSARecv() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags ) -{ - int rc; - DWORD bytesRead; - DWORD dwFlags = 0; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_READ; - - key->recv_overlapped.wsabuf.buf = buffer; - key->recv_overlapped.wsabuf.len = buflen; - - dwFlags = flags; - - rc = WSARecv((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, - &bytesRead, &dwFlags, - &key->recv_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } +{ + /* + * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and + * addrlen here. But unfortunately it generates EINVAL... :-( + * -bennylp + */ + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; } /* @@ -508,136 +557,79 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_t *ioque, * * Initiate overlapped RecvFrom() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, void *buffer, - pj_size_t buflen, + pj_ssize_t *length, unsigned flags, pj_sockaddr_t *addr, int *addrlen) { - BOOL rc; - DWORD bytesRead; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->recv_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->recv_overlapped, 0, sizeof(key->recv_overlapped)); - key->recv_overlapped.operation = PJ_IOQUEUE_OP_RECV_FROM; - key->recv_overlapped.wsabuf.buf = buffer; - key->recv_overlapped.wsabuf.len = buflen; - dwFlags = flags; - rc = WSARecvFrom((SOCKET)key->hnd, &key->recv_overlapped.wsabuf, 1, - &bytesRead, &dwFlags, - addr, addrlen, - &key->recv_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } -} - -/* - * pj_ioqueue_write() - * - * Initiate overlapped WriteFile() operation. - */ -PJ_DEF(pj_status_t) pj_ioqueue_write( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, - const void *data, - pj_size_t datalen) -{ - BOOL rc; - DWORD bytesWritten; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this descriptor"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; - rc = WriteFile(key->hnd, data, datalen, &bytesWritten, - &key->send_overlapped.overlapped); - - if (rc == FALSE) { - DWORD dwStatus = GetLastError(); - if (dwStatus==ERROR_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesWritten; - */ - return PJ_EPENDING; - } + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; } - /* * pj_ioqueue_send() * * Initiate overlapped Send operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags ) { - int rc; - DWORD bytesWritten; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_WRITE; - key->send_overlapped.wsabuf.buf = (void*)data; - key->send_overlapped.wsabuf.len = datalen; - dwFlags = flags; - rc = WSASend((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, - &bytesWritten, dwFlags, - &key->send_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - /* Must always return pending status. - * See comments on pj_ioqueue_read - * return bytesRead; - */ - return PJ_EPENDING; - } + return pj_ioqueue_sendto(key, op_key, data, length, flags, NULL, 0); } @@ -646,46 +638,65 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_t *ioque, * * Initiate overlapped SendTo operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, const void *data, - pj_size_t datalen, + pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen) -{ - BOOL rc; - DWORD bytesSent; - DWORD dwFlags; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioque); - - if (key->send_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - pj_memset(&key->send_overlapped, 0, sizeof(key->send_overlapped)); - key->send_overlapped.operation = PJ_IOQUEUE_OP_SEND_TO; - key->send_overlapped.wsabuf.buf = (char*)data; - key->send_overlapped.wsabuf.len = datalen; - dwFlags = flags; - rc = WSASendTo((SOCKET)key->hnd, &key->send_overlapped.wsabuf, 1, - &bytesSent, dwFlags, addr, - addrlen, &key->send_overlapped.overlapped, NULL); - if (rc == SOCKET_ERROR) { - DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else - return PJ_STATUS_FROM_OS(dwStatus); - } else { - // Must always return pending status. - // See comments on pj_ioqueue_read - // return bytesSent; - return PJ_EPENDING; - } +{ + int rc; + DWORD bytesWritten; + DWORD dwFlags; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + + dwFlags = flags; + + /* + * First try blocking write. + */ + op_key_rec->overlapped.wsabuf.buf = (void*)data; + op_key_rec->overlapped.wsabuf.len = *length; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + NULL, NULL); + if (rc == 0) { + *length = bytesWritten; + return PJ_SUCCESS; + } else { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwStatus); + } + } + + /* + * Data can't be sent immediately. + * Schedule asynchronous WSASend(). + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) + return PJ_STATUS_FROM_OS(dwStatus); + } + + /* Asynchronous operation successfully submitted. */ + return PJ_EPENDING; } #if PJ_HAS_TCP @@ -695,59 +706,93 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_t *ioque, * * Initiate overlapped accept() operation. */ -PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, - pj_sock_t *new_sock, - pj_sockaddr_t *local, - pj_sockaddr_t *remote, - int *addrlen) +PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_sock_t *new_sock, + pj_sockaddr_t *local, + pj_sockaddr_t *remote, + int *addrlen) { - BOOL rc; + BOOL rc; DWORD bytesReceived; - pj_status_t status; + pj_status_t status; + union operation_key *op_key_rec; + SOCKET sock; PJ_CHECK_STACK(); - PJ_UNUSED_ARG(ioqueue); - - if (key->accept_overlapped.operation != PJ_IOQUEUE_OP_NONE) { - pj_assert(!"Operation already pending for this socket"); - return PJ_EBUSY; - } - - if (key->accept_overlapped.newsock == PJ_INVALID_SOCKET) { - pj_sock_t sock; - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &sock); - if (status != PJ_SUCCESS) - return status; - - key->accept_overlapped.newsock = sock; - } - key->accept_overlapped.operation = PJ_IOQUEUE_OP_ACCEPT; - key->accept_overlapped.addrlen = addrlen; - key->accept_overlapped.local = local; - key->accept_overlapped.remote = remote; - key->accept_overlapped.newsock_ptr = new_sock; - pj_memset(&key->accept_overlapped.overlapped, 0, - sizeof(key->accept_overlapped.overlapped)); - - rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)key->accept_overlapped.newsock, - key->accept_overlapped.accept_buf, + PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + + /* + * See if there is a new connection immediately available. + */ + sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); + if (sock != INVALID_SOCKET) { + /* Yes! New socket is available! */ + int status; + + status = getsockname(sock, local, addrlen); + if (status != 0) { + DWORD dwError = WSAGetLastError(); + closesocket(sock); + return PJ_RETURN_OS_ERROR(dwError); + } + + *new_sock = sock; + return PJ_SUCCESS; + + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No connection is immediately available. + * Must schedule an asynchronous operation. + */ + op_key_rec = (union operation_key*)op_key->internal__; + + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, + &op_key_rec->accept.newsock); + if (status != PJ_SUCCESS) + return status; + + /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket + * addresses can be obtained with getsockname() and getpeername(). + */ + status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&key->hnd, sizeof(SOCKET)); + /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. + * So ignore the error status. + */ + + op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; + op_key_rec->accept.addrlen = addrlen; + op_key_rec->accept.local = local; + op_key_rec->accept.remote = remote; + op_key_rec->accept.newsock_ptr = new_sock; + pj_memset(&op_key_rec->accept.overlapped, 0, + sizeof(op_key_rec->accept.overlapped)); + + rc = AcceptEx( (SOCKET)key->hnd, (SOCKET)op_key_rec->accept.newsock, + op_key_rec->accept.accept_buf, 0, ACCEPT_ADDR_LEN, ACCEPT_ADDR_LEN, &bytesReceived, - &key->accept_overlapped.overlapped); + &op_key_rec->accept.overlapped ); if (rc == TRUE) { - ioqueue_on_accept_complete(&key->accept_overlapped); - if (key->cb.on_accept_complete) - key->cb.on_accept_complete(key, key->accept_overlapped.newsock, 0); + ioqueue_on_accept_complete(&op_key_rec->accept); return PJ_SUCCESS; } else { DWORD dwStatus = WSAGetLastError(); - if (dwStatus==WSA_IO_PENDING) - return PJ_EPENDING; - else + if (dwStatus!=WSA_IO_PENDING) return PJ_STATUS_FROM_OS(dwStatus); - } + } + + /* Asynchronous Accept() has been submitted. */ + return PJ_EPENDING; } @@ -757,42 +802,29 @@ PJ_DEF(int) pj_ioqueue_accept( pj_ioqueue_t *ioqueue, * Initiate overlapped connect() operation (well, it's non-blocking actually, * since there's no overlapped version of connect()). */ -PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *key, +PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ) { - unsigned long optval = 1; - HANDLE hEvent; + HANDLE hEvent; + pj_ioqueue_t *ioqueue; PJ_CHECK_STACK(); - - /* Set socket to non-blocking. */ - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } + PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { DWORD dwStatus; dwStatus = WSAGetLastError(); - if (dwStatus != WSAEWOULDBLOCK) { - /* Permanent error */ + if (dwStatus != WSAEWOULDBLOCK) { return PJ_RETURN_OS_ERROR(dwStatus); - } else { - /* Pending operation. This is what we're looking for. */ } } else { /* Connect has completed immediately! */ - /* Restore to blocking mode. */ - optval = 0; - if (ioctlsocket((pj_sock_t)key->hnd, FIONBIO, &optval) != 0) { - return PJ_RETURN_OS_ERROR(WSAGetLastError()); - } - - key->cb.on_connect_complete(key, 0); return PJ_SUCCESS; } + + ioqueue = key->ioqueue; /* Add to the array of connecting socket to be polled */ pj_lock_acquire(ioqueue->lock); -- cgit v1.2.3