diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 676 |
1 files changed, 342 insertions, 334 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 32a74666..7944953f 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -22,28 +22,28 @@ # include <mswsock.h> #endif -
-/* The address specified in AcceptEx() must be 16 more than the size of
- * SOCKADDR (source: MSDN).
+ +/* The address specified in AcceptEx() must be 16 more than the size of + * SOCKADDR (source: MSDN). */ #define ACCEPT_ADDR_LEN (sizeof(pj_sockaddr_in)+16) -
-typedef struct generic_overlapped
-{
- WSAOVERLAPPED overlapped;
- pj_ioqueue_operation_e operation;
-} generic_overlapped;
-
+ +typedef struct generic_overlapped +{ + WSAOVERLAPPED overlapped; + pj_ioqueue_operation_e operation; +} generic_overlapped; + /* * OVERLAPPPED structure for send and receive. */ typedef struct ioqueue_overlapped { - WSAOVERLAPPED overlapped;
+ WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; - WSABUF wsabuf;
- pj_sockaddr_in dummy_addr;
- int dummy_addrlen;
+ WSABUF wsabuf; + pj_sockaddr_in dummy_addr; + int dummy_addrlen; } ioqueue_overlapped; #if PJ_HAS_TCP @@ -52,7 +52,7 @@ typedef struct ioqueue_overlapped */ typedef struct ioqueue_accept_rec { - WSAOVERLAPPED overlapped;
+ WSAOVERLAPPED overlapped; pj_ioqueue_operation_e operation; pj_sock_t newsock; pj_sock_t *newsock_ptr; @@ -62,35 +62,35 @@ typedef struct ioqueue_accept_rec char accept_buf[2 * ACCEPT_ADDR_LEN]; } ioqueue_accept_rec; #endif -
-/*
- * Structure to hold pending operation key.
- */
-union operation_key
-{
- generic_overlapped generic;
- ioqueue_overlapped overlapped;
-#if PJ_HAS_TCP
- ioqueue_accept_rec accept;
-#endif
-};
-
-/* Type of handle in the key. */
-enum handle_type
-{
- HND_IS_UNKNOWN,
- HND_IS_FILE,
- HND_IS_SOCKET,
-};
+ +/* + * Structure to hold pending operation key. + */ +union operation_key +{ + generic_overlapped generic; + ioqueue_overlapped overlapped; +#if PJ_HAS_TCP + ioqueue_accept_rec accept; +#endif +}; + +/* Type of handle in the key. */ +enum handle_type +{ + HND_IS_UNKNOWN, + HND_IS_FILE, + HND_IS_SOCKET, +}; /* * Structure for individual socket. */ struct pj_ioqueue_key_t -{
+{ pj_ioqueue_t *ioqueue; HANDLE hnd; - void *user_data;
+ void *user_data; enum handle_type hnd_type; #if PJ_HAS_TCP int connecting; @@ -136,13 +136,13 @@ static void ioqueue_on_accept_complete(ioqueue_accept_rec *accept_overlapped) &local, &locallen, &remote, - &remotelen);
+ &remotelen); if (*accept_overlapped->addrlen > locallen) { pj_memcpy(accept_overlapped->local, local, locallen); - pj_memcpy(accept_overlapped->remote, remote, locallen);
- } else {
- pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen);
- pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen);
+ pj_memcpy(accept_overlapped->remote, remote, locallen); + } else { + pj_memset(accept_overlapped->local, 0, *accept_overlapped->addrlen); + pj_memset(accept_overlapped->remote, 0, *accept_overlapped->addrlen); } *accept_overlapped->addrlen = locallen; if (accept_overlapped->newsock_ptr) @@ -211,7 +211,7 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, WSAEnumNetworkEvents((pj_sock_t)key->hnd, ioqueue->connecting_handles[pos], &net_events); - *connect_err =
+ *connect_err = PJ_STATUS_FROM_OS(net_events.iErrorCode[FD_CONNECT_BIT]); /* Erase socket from pending connect. */ @@ -223,8 +223,16 @@ static pj_ioqueue_key_t *check_connecting( pj_ioqueue_t *ioqueue, } #endif -/*
- * pj_ioqueue_create()
+/* + * pj_ioqueue_name() + */ +PJ_DEF(const char*) pj_ioqueue_name(void) +{ + return "iocp"; +} + +/* + * pj_ioqueue_create() */ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_size_t max_fd, @@ -235,12 +243,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_UNUSED_ARG(max_fd); PJ_ASSERT_RETURN(pool && p_ioqueue, PJ_EINVAL); -
- rc = sizeof(union operation_key);
-
- /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */
- PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
- sizeof(union operation_key), PJ_EBUG);
+ + rc = sizeof(union operation_key); + + /* Check that sizeof(pj_ioqueue_op_key_t) makes sense. */ + PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= + sizeof(union operation_key), PJ_EBUG); ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); @@ -260,9 +268,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); return PJ_SUCCESS; } -
-/*
- * pj_ioqueue_destroy()
+ +/* + * pj_ioqueue_destroy() */ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { @@ -278,16 +286,16 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) ioqueue->event_count = 0; if (CloseHandle(ioqueue->iocp) != TRUE) - return PJ_RETURN_OS_ERROR(GetLastError());
-
- if (ioqueue->auto_delete_lock)
- pj_lock_destroy(ioqueue->lock);
-
+ return PJ_RETURN_OS_ERROR(GetLastError()); + + if (ioqueue->auto_delete_lock) + pj_lock_destroy(ioqueue->lock); + return PJ_SUCCESS; } -
-/*
- * pj_ioqueue_set_lock()
+ +/* + * pj_ioqueue_set_lock() */ PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, pj_lock_t *lock, @@ -304,9 +312,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, return PJ_SUCCESS; } -
-/*
- * pj_ioqueue_register_sock()
+ +/* + * pj_ioqueue_register_sock() */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioqueue, @@ -316,27 +324,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_key_t **key ) { HANDLE hioq; - pj_ioqueue_key_t *rec;
+ pj_ioqueue_key_t *rec; u_long value; - int rc;
+ int rc; PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); -
+ /* Build the key for this socket. */ - rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+ rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); rec->ioqueue = ioqueue; - rec->hnd = (HANDLE)sock;
+ rec->hnd = (HANDLE)sock; rec->hnd_type = HND_IS_SOCKET; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); -
- /* Set socket to nonblocking. */
- value = 1;
- rc = ioctlsocket(sock, FIONBIO, &value);
- if (rc != 0) {
- return PJ_RETURN_OS_ERROR(WSAGetLastError());
- }
-
+ + /* Set socket to nonblocking. */ + value = 1; + rc = ioctlsocket(sock, FIONBIO, &value); + if (rc != 0) { + return PJ_RETURN_OS_ERROR(WSAGetLastError()); + } + /* Associate with IOCP */ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { @@ -346,9 +354,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, *key = rec; return PJ_SUCCESS; } -
+ /* - * pj_ioqueue_unregister()
+ * pj_ioqueue_unregister() */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { @@ -357,9 +365,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) #if PJ_HAS_TCP if (key->connecting) { unsigned pos; - pj_ioqueue_t *ioqueue;
-
- ioqueue = key->ioqueue;
+ pj_ioqueue_t *ioqueue; + + ioqueue = key->ioqueue; /* Erase from connecting_handles */ pj_lock_acquire(ioqueue->lock); @@ -369,43 +377,43 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) break; } } - key->connecting = 0;
+ key->connecting = 0; pj_lock_release(ioqueue->lock); } -#endif
- if (key->hnd_type == HND_IS_FILE) {
- CloseHandle(key->hnd);
+#endif + if (key->hnd_type == HND_IS_FILE) { + CloseHandle(key->hnd); } return PJ_SUCCESS; } -
-/*
- * pj_ioqueue_get_user_data()
+ +/* + * pj_ioqueue_get_user_data() */ PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ) { PJ_ASSERT_RETURN(key, NULL); return key->user_data; } -
-/*
- * pj_ioqueue_set_user_data()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
- void *user_data,
- void **old_data )
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
- if (old_data)
- *old_data = key->user_data;
-
- key->user_data = user_data;
- return PJ_SUCCESS;
-}
-
-/*
- * pj_ioqueue_poll()
+ +/* + * pj_ioqueue_set_user_data() + */ +PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, + void *user_data, + void **old_data ) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + if (old_data) + *old_data = key->user_data; + + key->user_data = user_data; + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_poll() * * Poll for events. */ @@ -450,7 +458,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) case PJ_IOQUEUE_OP_RECV_FROM: pOv->operation = 0; if (key->cb.on_read_complete) - key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ key->cb.on_read_complete(key, (pj_ioqueue_op_key_t*)pOv, size_status); break; case PJ_IOQUEUE_OP_WRITE: @@ -458,19 +466,19 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) case PJ_IOQUEUE_OP_SEND_TO: pOv->operation = 0; if (key->cb.on_write_complete) - key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv,
+ key->cb.on_write_complete(key, (pj_ioqueue_op_key_t*)pOv, size_status); break; #if PJ_HAS_TCP case PJ_IOQUEUE_OP_ACCEPT: /* special case for accept. */ ioqueue_on_accept_complete((ioqueue_accept_rec*)pOv); - if (key->cb.on_accept_complete) {
+ if (key->cb.on_accept_complete) { ioqueue_accept_rec *accept_rec = (ioqueue_accept_rec*)pOv; - key->cb.on_accept_complete(key,
- (pj_ioqueue_op_key_t*)pOv,
+ key->cb.on_accept_complete(key, + (pj_ioqueue_op_key_t*)pOv, accept_rec->newsock, - PJ_SUCCESS);
+ PJ_SUCCESS); } break; case PJ_IOQUEUE_OP_CONNECT: @@ -495,74 +503,74 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) } return -1; } -
+ /* * pj_ioqueue_recv() * * Initiate overlapped WSARecv() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, unsigned flags ) -{
- /*
- * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and
- * addrlen here. But unfortunately it generates EINVAL... :-(
- * -bennylp
- */
- int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
- union operation_key *op_key_rec;
-
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
-
- op_key_rec = (union operation_key*)op_key->internal__;
- op_key_rec->overlapped.wsabuf.buf = buffer;
- op_key_rec->overlapped.wsabuf.len = *length;
-
- dwFlags = flags;
-
- /* Try non-overlapped received first to see if data is
- * immediately available.
- */
- rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, NULL, NULL);
- if (rc == 0) {
- *length = bytesRead;
- return PJ_SUCCESS;
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
-
- /*
- * No immediate data available.
- * Register overlapped Recv() operation.
- */
- pj_memset(&op_key_rec->overlapped.overlapped, 0,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
-
- rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING) {
- *length = -1;
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
-
- /* Pending operation has been scheduled. */
- return PJ_EPENDING;
+{ + /* + * Ideally we should just call pj_ioqueue_recvfrom() with NULL addr and + * addrlen here. But unfortunately it generates EINVAL... :-( + * -bennylp + */ + int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecv((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; } /* @@ -570,7 +578,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, * * Initiate overlapped RecvFrom() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, void *buffer, pj_ssize_t *length, @@ -578,57 +586,57 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, pj_sockaddr_t *addr, int *addrlen) { - int rc;
- DWORD bytesRead;
- DWORD dwFlags = 0;
- union operation_key *op_key_rec;
-
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
-
- op_key_rec = (union operation_key*)op_key->internal__;
- op_key_rec->overlapped.wsabuf.buf = buffer;
- op_key_rec->overlapped.wsabuf.len = *length;
-
- dwFlags = flags;
-
- /* Try non-overlapped received first to see if data is
- * immediately available.
- */
- rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, addr, addrlen, NULL, NULL);
- if (rc == 0) {
- *length = bytesRead;
- return PJ_SUCCESS;
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
-
- /*
- * No immediate data available.
- * Register overlapped Recv() operation.
- */
- pj_memset(&op_key_rec->overlapped.overlapped, 0,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV;
-
- rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesRead, &dwFlags, addr, addrlen,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING) {
- *length = -1;
- return PJ_STATUS_FROM_OS(dwStatus);
- }
- }
-
- /* Pending operation has been scheduled. */
- return PJ_EPENDING;
+ int rc; + DWORD bytesRead; + DWORD dwFlags = 0; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + op_key_rec->overlapped.wsabuf.buf = buffer; + op_key_rec->overlapped.wsabuf.len = *length; + + dwFlags = flags; + + /* Try non-overlapped received first to see if data is + * immediately available. + */ + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, NULL, NULL); + if (rc == 0) { + *length = bytesRead; + return PJ_SUCCESS; + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No immediate data available. + * Register overlapped Recv() operation. + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_RECV; + + rc = WSARecvFrom((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesRead, &dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) { + *length = -1; + return PJ_STATUS_FROM_OS(dwStatus); + } + } + + /* Pending operation has been scheduled. */ + return PJ_EPENDING; } /* @@ -636,7 +644,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, * * Initiate overlapped Send operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, @@ -651,64 +659,64 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, * * Initiate overlapped SendTo operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, const void *data, pj_ssize_t *length, unsigned flags, const pj_sockaddr_t *addr, int addrlen) -{
- int rc;
- DWORD bytesWritten;
- DWORD dwFlags;
- union operation_key *op_key_rec;
-
- PJ_CHECK_STACK();
- PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
-
- op_key_rec = (union operation_key*)op_key->internal__;
-
- dwFlags = flags;
-
- /*
- * First try blocking write.
- */
- op_key_rec->overlapped.wsabuf.buf = (void*)data;
- op_key_rec->overlapped.wsabuf.len = *length;
-
- rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesWritten, dwFlags, addr, addrlen,
- NULL, NULL);
- if (rc == 0) {
- *length = bytesWritten;
- return PJ_SUCCESS;
- } else {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus != WSAEWOULDBLOCK) {
- *length = -1;
- return PJ_RETURN_OS_ERROR(dwStatus);
- }
- }
-
- /*
- * Data can't be sent immediately.
- * Schedule asynchronous WSASend().
- */
- pj_memset(&op_key_rec->overlapped.overlapped, 0,
- sizeof(op_key_rec->overlapped.overlapped));
- op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND;
-
- rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1,
- &bytesWritten, dwFlags, addr, addrlen,
- &op_key_rec->overlapped.overlapped, NULL);
- if (rc == SOCKET_ERROR) {
- DWORD dwStatus = WSAGetLastError();
- if (dwStatus!=WSA_IO_PENDING)
- return PJ_STATUS_FROM_OS(dwStatus);
- }
-
- /* Asynchronous operation successfully submitted. */
+{ + int rc; + DWORD bytesWritten; + DWORD dwFlags; + union operation_key *op_key_rec; + + PJ_CHECK_STACK(); + PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); + + op_key_rec = (union operation_key*)op_key->internal__; + + dwFlags = flags; + + /* + * First try blocking write. + */ + op_key_rec->overlapped.wsabuf.buf = (void*)data; + op_key_rec->overlapped.wsabuf.len = *length; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + NULL, NULL); + if (rc == 0) { + *length = bytesWritten; + return PJ_SUCCESS; + } else { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus != WSAEWOULDBLOCK) { + *length = -1; + return PJ_RETURN_OS_ERROR(dwStatus); + } + } + + /* + * Data can't be sent immediately. + * Schedule asynchronous WSASend(). + */ + pj_memset(&op_key_rec->overlapped.overlapped, 0, + sizeof(op_key_rec->overlapped.overlapped)); + op_key_rec->overlapped.operation = PJ_IOQUEUE_OP_SEND; + + rc = WSASendTo((SOCKET)key->hnd, &op_key_rec->overlapped.wsabuf, 1, + &bytesWritten, dwFlags, addr, addrlen, + &op_key_rec->overlapped.overlapped, NULL); + if (rc == SOCKET_ERROR) { + DWORD dwStatus = WSAGetLastError(); + if (dwStatus!=WSA_IO_PENDING) + return PJ_STATUS_FROM_OS(dwStatus); + } + + /* Asynchronous operation successfully submitted. */ return PJ_EPENDING; } @@ -719,69 +727,69 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, * * Initiate overlapped accept() operation. */ -PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
+PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t *new_sock, pj_sockaddr_t *local, pj_sockaddr_t *remote, int *addrlen) { - BOOL rc;
+ BOOL rc; DWORD bytesReceived; - pj_status_t status;
- union operation_key *op_key_rec;
+ pj_status_t status; + union operation_key *op_key_rec; SOCKET sock; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); -
- /*
- * See if there is a new connection immediately available.
- */
- sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0);
- if (sock != INVALID_SOCKET) {
- /* Yes! New socket is available! */
- int status;
-
- status = getsockname(sock, local, addrlen);
- if (status != 0) {
- DWORD dwError = WSAGetLastError();
- closesocket(sock);
- return PJ_RETURN_OS_ERROR(dwError);
- }
-
- *new_sock = sock;
- return PJ_SUCCESS;
-
- } else {
- DWORD dwError = WSAGetLastError();
- if (dwError != WSAEWOULDBLOCK) {
- return PJ_RETURN_OS_ERROR(dwError);
- }
- }
-
- /*
- * No connection is immediately available.
- * Must schedule an asynchronous operation.
- */
- op_key_rec = (union operation_key*)op_key->internal__;
+ + /* + * See if there is a new connection immediately available. + */ + sock = WSAAccept((SOCKET)key->hnd, remote, addrlen, NULL, 0); + if (sock != INVALID_SOCKET) { + /* Yes! New socket is available! */ + int status; + + status = getsockname(sock, local, addrlen); + if (status != 0) { + DWORD dwError = WSAGetLastError(); + closesocket(sock); + return PJ_RETURN_OS_ERROR(dwError); + } + + *new_sock = sock; + return PJ_SUCCESS; + + } else { + DWORD dwError = WSAGetLastError(); + if (dwError != WSAEWOULDBLOCK) { + return PJ_RETURN_OS_ERROR(dwError); + } + } + + /* + * No connection is immediately available. + * Must schedule an asynchronous operation. + */ + op_key_rec = (union operation_key*)op_key->internal__; - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0,
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &op_key_rec->accept.newsock); if (status != PJ_SUCCESS) return status; -
- /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket
- * addresses can be obtained with getsockname() and getpeername().
- */
- status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT,
- (char*)&key->hnd, sizeof(SOCKET));
- /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later.
- * So ignore the error status.
- */
-
- op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT;
+ + /* On WinXP or later, use SO_UPDATE_ACCEPT_CONTEXT so that socket + * addresses can be obtained with getsockname() and getpeername(). + */ + status = setsockopt(op_key_rec->accept.newsock, SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&key->hnd, sizeof(SOCKET)); + /* SO_UPDATE_ACCEPT_CONTEXT is for WinXP or later. + * So ignore the error status. + */ + + op_key_rec->accept.operation = PJ_IOQUEUE_OP_ACCEPT; op_key_rec->accept.addrlen = addrlen; op_key_rec->accept.local = local; op_key_rec->accept.remote = remote; @@ -802,9 +810,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, DWORD dwStatus = WSAGetLastError(); if (dwStatus!=WSA_IO_PENDING) return PJ_STATUS_FROM_OS(dwStatus); - }
-
- /* Asynchronous Accept() has been submitted. */
+ } + + /* Asynchronous Accept() has been submitted. */ return PJ_EPENDING; } @@ -819,11 +827,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, const pj_sockaddr_t *addr, int addrlen ) { - HANDLE hEvent;
+ HANDLE hEvent; pj_ioqueue_t *ioqueue; PJ_CHECK_STACK(); - PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { @@ -836,8 +844,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, /* Connect has completed immediately! */ return PJ_SUCCESS; } -
- ioqueue = key->ioqueue;
+ + ioqueue = key->ioqueue; /* Add to the array of connecting socket to be polled */ pj_lock_acquire(ioqueue->lock); |