summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-22 11:49:19 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-22 11:49:19 +0000
commitd494bf04e608a604eaa96bd65b9daa72a1d98252 (patch)
tree7c44cbf3f7f6858673ad5c7601ece5005ad835c2
parentac843019cc56478c4423e0181e2ec1452d84e75b (diff)
Fixed bug in ioqueue with IO Completion Port backend, where events may still be called after key is unregistered
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@349 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjlib/build/pjlib.dsp7
-rw-r--r--pjlib/build/pjlib_test.dsp10
-rw-r--r--pjlib/include/pj/ioqueue.h5
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c196
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c174
-rw-r--r--pjlib/src/pjlib-test/main_win32.c2
6 files changed, 332 insertions, 62 deletions
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp
index b2745653..f336459f 100644
--- a/pjlib/build/pjlib.dsp
+++ b/pjlib/build/pjlib.dsp
@@ -238,6 +238,13 @@ SOURCE=..\src\pj\ioqueue_select.c
# Begin Source File
SOURCE=..\src\pj\ioqueue_winnt.c
+
+!IF "$(CFG)" == "pjlib - Win32 Release"
+
+!ELSEIF "$(CFG)" == "pjlib - Win32 Debug"
+
+!ENDIF
+
# End Source File
# Begin Source File
diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp
index e8ce23e6..15bf0d2c 100644
--- a/pjlib/build/pjlib_test.dsp
+++ b/pjlib/build/pjlib_test.dsp
@@ -128,15 +128,6 @@ SOURCE="..\src\pjlib-test\list.c"
# Begin Source File
SOURCE="..\src\pjlib-test\main.c"
-
-!IF "$(CFG)" == "pjlib_test - Win32 Release"
-
-!ELSEIF "$(CFG)" == "pjlib_test - Win32 Debug"
-
-# PROP Exclude_From_Build 1
-
-!ENDIF
-
# End Source File
# Begin Source File
@@ -146,6 +137,7 @@ SOURCE="..\src\pjlib-test\main_mod.c"
# Begin Source File
SOURCE="..\src\pjlib-test\main_win32.c"
+# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index 86a5309f..bd8dafc4 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -327,6 +327,11 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
* Note that asynchronous connect operation will automatically be
* cancelled during the unregistration.
*
+ * Also note that when I/O Completion Port backend is used, application
+ * MUST close the handle immediately after unregistering the key. This is
+ * because there is no unregistering API for IOCP. The only way to
+ * unregister the handle from IOCP is to close the handle.
+ *
* @param key The key that was previously obtained from registration.
*
* @return PJ_SUCCESS on success or the error code.
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 79e46183..40dd31bb 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -99,6 +99,8 @@ enum handle_type
HND_IS_SOCKET,
};
+enum { POST_QUIT_LEN = 0xFFFFDEADUL };
+
/*
* Structure for individual socket.
*/
@@ -112,6 +114,7 @@ struct pj_ioqueue_key_t
int connecting;
#endif
pj_ioqueue_callback cb;
+ pj_bool_t has_quit_signal;
};
/*
@@ -392,37 +395,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
return PJ_SUCCESS;
}
-/*
- * pj_ioqueue_unregister()
- */
-PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
-{
- PJ_ASSERT_RETURN(key, PJ_EINVAL);
-
-#if PJ_HAS_TCP
- if (key->connecting) {
- unsigned pos;
- pj_ioqueue_t *ioqueue;
-
- ioqueue = key->ioqueue;
-
- /* Erase from connecting_handles */
- pj_lock_acquire(ioqueue->lock);
- for (pos=0; pos < ioqueue->connecting_count; ++pos) {
- if (ioqueue->connecting_keys[pos] == key) {
- erase_connecting_socket(ioqueue, pos);
- break;
- }
- }
- key->connecting = 0;
- pj_lock_release(ioqueue->lock);
- }
-#endif
- if (key->hnd_type == HND_IS_FILE) {
- CloseHandle(key->hnd);
- }
- return PJ_SUCCESS;
-}
/*
* pj_ioqueue_get_user_data()
@@ -449,34 +421,25 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
return PJ_SUCCESS;
}
+
+
/*
- * pj_ioqueue_poll()
- *
- * Poll for events.
+ * Internal function to poll the I/O Completion Port, execute callback,
+ * and return the key and bytes transfered of the last operation.
*/
-PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
+ pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key )
{
- DWORD dwMsec, dwBytesTransfered, dwKey;
+ DWORD dwBytesTransfered, dwKey;
generic_overlapped *pOv;
pj_ioqueue_key_t *key;
- int connect_count;
pj_ssize_t size_status = -1;
- BOOL rcGetQueued;;
-
- PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
-
- /* Check the connecting array. */
-#if PJ_HAS_TCP
- connect_count = check_connecting(ioqueue);
-#endif
-
- /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
- dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+ BOOL rcGetQueued;
/* Poll for completion status. */
- rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered,
+ rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered,
&dwKey, (OVERLAPPED**)&pOv,
- dwMsec);
+ dwTimeout);
/* The return value is:
* - nonzero if event was dequeued.
@@ -487,6 +450,25 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
+
+ /* Report to caller regardless */
+ if (p_bytes)
+ *p_bytes = size_status;
+ if (p_key)
+ *p_key = key;
+
+ /* If size_status is POST_QUIT_LEN, mark the key as quitting */
+ if (size_status == POST_QUIT_LEN) {
+ key->has_quit_signal = 1;
+ return PJ_TRUE;
+ }
+
+ /* We shouldn't call callbacks if key is quitting.
+ * But this should have been taken care by unregister function
+ * (the unregister function should have cleared out the callbacks)
+ */
+
+ /* Carry out the callback */
switch (pOv->operation) {
case PJ_IOQUEUE_OP_READ:
case PJ_IOQUEUE_OP_RECV:
@@ -522,11 +504,121 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_assert(0);
break;
}
- return connect_count+1;
+ return PJ_TRUE;
}
/* No event was queued. */
- return connect_count;
+ return PJ_FALSE;
+}
+
+/*
+ * pj_ioqueue_unregister()
+ */
+PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
+{
+ pj_ssize_t polled_len;
+ pj_ioqueue_key_t *polled_key;
+ generic_overlapped ov;
+ BOOL rc;
+
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+#if PJ_HAS_TCP
+ if (key->connecting) {
+ unsigned pos;
+ pj_ioqueue_t *ioqueue;
+
+ ioqueue = key->ioqueue;
+
+ /* Erase from connecting_handles */
+ pj_lock_acquire(ioqueue->lock);
+ for (pos=0; pos < ioqueue->connecting_count; ++pos) {
+ if (ioqueue->connecting_keys[pos] == key) {
+ erase_connecting_socket(ioqueue, pos);
+ break;
+ }
+ }
+ key->connecting = 0;
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
+
+
+ /* Unregistering handle from IOCP is pretty tricky.
+ *
+ * Even after the socket has been closed, GetQueuedCompletionStatus
+ * may still return events for the handle. This will likely to
+ * cause crash in pjlib, because the key associated with the handle
+ * most likely will have been destroyed.
+ *
+ * The solution is to poll the IOCP until we're sure that there are
+ * no further events for the handle.
+ */
+
+ /* Clear up callbacks for the key.
+ * We don't want the callback to be called for this key.
+ */
+ key->cb.on_read_complete = NULL;
+ key->cb.on_write_complete = NULL;
+ key->cb.on_accept_complete = NULL;
+ key->cb.on_connect_complete = NULL;
+
+ /* Init overlapped struct */
+ pj_memset(&ov, 0, sizeof(ov));
+ ov.operation = PJ_IOQUEUE_OP_READ;
+
+ /* Post queued completion status with a special length. */
+ rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN,
+ (DWORD)key, &ov.overlapped);
+
+ /* Poll IOCP until has_quit_signal is set in the key.
+ * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN
+ * is detected. We need to have this flag because POST_QUIT_LEN may be
+ * detected by other threads.
+ */
+ do {
+ polled_len = 0;
+ polled_key = NULL;
+
+ rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key);
+
+ } while (rc && !key->has_quit_signal);
+
+
+ /* Close handle if this is a file. */
+ if (key->hnd_type == HND_IS_FILE) {
+ CloseHandle(key->hnd);
+ }
+
+ return PJ_SUCCESS;
+}
+
+/*
+ * pj_ioqueue_poll()
+ *
+ * Poll for events.
+ */
+PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
+{
+ DWORD dwMsec;
+ int connect_count = 0;
+ pj_bool_t has_event;
+
+ PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
+
+ /* Check the connecting array. */
+#if PJ_HAS_TCP
+ connect_count = check_connecting(ioqueue);
+#endif
+
+ /* Calculate miliseconds timeout for GetQueuedCompletionStatus */
+ dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
+
+ /* Poll for completion status. */
+ has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+ /* Return number of events. */
+ return connect_count + has_event;
}
/*
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index 5f933afa..b81764f2 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -313,6 +313,173 @@ on_error:
}
+
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key);
+
+ PJ_UNUSED_ARG(op_key);
+ PJ_UNUSED_ARG(bytes_read);
+
+ (*p_packet_cnt)++;
+}
+
+/*
+ * unregister_test()
+ * Check if callback is still called after socket has been unregistered or
+ * closed.
+ */
+static int unregister_test(void)
+{
+ enum { RPORT = 50000, SPORT = 50001 };
+ pj_pool_t *pool;
+ pj_ioqueue_t *ioqueue;
+ pj_sock_t ssock;
+ pj_sock_t rsock;
+ int addrlen;
+ pj_sockaddr_in addr;
+ pj_ioqueue_key_t *key;
+ pj_ioqueue_op_key_t opkey;
+ pj_ioqueue_callback cb;
+ unsigned packet_cnt;
+ char sendbuf[10], recvbuf[10];
+ pj_ssize_t bytes;
+ pj_time_val timeout;
+ pj_status_t status;
+
+ pool = pj_pool_create(mem, "test", 4000, 4000, NULL);
+ if (!pool) {
+ app_perror("Unable to create pool", PJ_ENOMEM);
+ return -100;
+ }
+
+ status = pj_ioqueue_create(pool, 16, &ioqueue);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error creating ioqueue", status);
+ return -110;
+ }
+
+ /* Create sender socket */
+ status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error initializing socket", status);
+ return -120;
+ }
+
+ /* Create receiver socket. */
+ status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error initializing socket", status);
+ return -130;
+ }
+
+ /* Register rsock to ioqueue. */
+ pj_memset(&cb, 0, sizeof(cb));
+ cb.on_read_complete = &on_read_complete;
+ packet_cnt = 0;
+ status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt,
+ &cb, &key);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error registering to ioqueue", status);
+ return -140;
+ }
+
+ /* Init operation key. */
+ pj_ioqueue_op_key_init(&opkey, sizeof(opkey));
+
+ /* Start reading. */
+ bytes = sizeof(recvbuf);
+ status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
+ if (status != PJ_EPENDING) {
+ app_perror("Expecting PJ_EPENDING, but got this", status);
+ return -150;
+ }
+
+ /* Init destination address. */
+ addrlen = sizeof(addr);
+ status = pj_sock_getsockname(rsock, &addr, &addrlen);
+ if (status != PJ_SUCCESS) {
+ app_perror("getsockname error", status);
+ return -160;
+ }
+
+ /* Override address with 127.0.0.1, since getsockname will return
+ * zero in the address field.
+ */
+ addr.sin_addr = pj_inet_addr2("127.0.0.1");
+
+ /* Init buffer to send */
+ pj_ansi_strcpy(sendbuf, "Hello0123");
+
+ /* Send one packet. */
+ bytes = sizeof(sendbuf);
+ status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
+ &addr, sizeof(addr));
+
+ if (status != PJ_SUCCESS) {
+ app_perror("sendto error", status);
+ return -170;
+ }
+
+ /* Check if packet is received. */
+ timeout.sec = 1; timeout.msec = 0;
+ pj_ioqueue_poll(ioqueue, &timeout);
+
+ if (packet_cnt != 1) {
+ return -180;
+ }
+
+ /* Just to make sure things are settled.. */
+ pj_thread_sleep(100);
+
+ /* Start reading again. */
+ bytes = sizeof(recvbuf);
+ status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0);
+ if (status != PJ_EPENDING) {
+ app_perror("Expecting PJ_EPENDING, but got this", status);
+ return -190;
+ }
+
+ /* Reset packet counter */
+ packet_cnt = 0;
+
+ /* Send one packet. */
+ bytes = sizeof(sendbuf);
+ status = pj_sock_sendto(ssock, sendbuf, &bytes, 0,
+ &addr, sizeof(addr));
+
+ if (status != PJ_SUCCESS) {
+ app_perror("sendto error", status);
+ return -200;
+ }
+
+ /* Now unregister and close socket. */
+ pj_ioqueue_unregister(key);
+ pj_sock_close(rsock);
+
+ /* Poll ioqueue. */
+ timeout.sec = 1; timeout.msec = 0;
+ pj_ioqueue_poll(ioqueue, &timeout);
+
+ /* Must NOT receive any packets after socket is closed! */
+ if (packet_cnt > 0) {
+ PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet "
+ "after socket has been closed"));
+ return -210;
+ }
+
+ /* Success */
+ pj_sock_close(ssock);
+ pj_ioqueue_destroy(ioqueue);
+
+ pj_pool_release(pool);
+
+ return 0;
+}
+
+
/*
* Testing with many handles.
* This will just test registering PJ_IOQUEUE_MAX_HANDLES count
@@ -625,6 +792,13 @@ int udp_ioqueue_test()
}
PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
+
+ PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));
+ if ((status=unregister_test()) != 0) {
+ return status;
+ }
+ PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
+
if ((status=many_handles_test()) != 0) {
return status;
}
diff --git a/pjlib/src/pjlib-test/main_win32.c b/pjlib/src/pjlib-test/main_win32.c
index 07d91870..6024a953 100644
--- a/pjlib/src/pjlib-test/main_win32.c
+++ b/pjlib/src/pjlib-test/main_win32.c
@@ -55,7 +55,7 @@ static void write_log(int level, const char *data, int len)
PJ_UNUSED_ARG(level);
PJ_UNUSED_ARG(len);
SendMessage(hwndLog, EM_REPLACESEL, FALSE,
- (LPARAM)PJ_STRING_TO_NATIVE(data,wdata));
+ (LPARAM)PJ_STRING_TO_NATIVE(data,wdata,256));
}