From d494bf04e608a604eaa96bd65b9daa72a1d98252 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Wed, 22 Mar 2006 11:49:19 +0000 Subject: Fixed bug in ioqueue with IO Completion Port backend, where events may still be called after key is unregistered git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@349 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/build/pjlib.dsp | 7 ++ pjlib/build/pjlib_test.dsp | 10 +- pjlib/include/pj/ioqueue.h | 5 + pjlib/src/pj/ioqueue_winnt.c | 196 ++++++++++++++++++++++++++++---------- pjlib/src/pjlib-test/ioq_udp.c | 174 +++++++++++++++++++++++++++++++++ pjlib/src/pjlib-test/main_win32.c | 2 +- 6 files changed, 332 insertions(+), 62 deletions(-) diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp index b2745653..f336459f 100644 --- a/pjlib/build/pjlib.dsp +++ b/pjlib/build/pjlib.dsp @@ -238,6 +238,13 @@ SOURCE=..\src\pj\ioqueue_select.c # Begin Source File SOURCE=..\src\pj\ioqueue_winnt.c + +!IF "$(CFG)" == "pjlib - Win32 Release" + +!ELSEIF "$(CFG)" == "pjlib - Win32 Debug" + +!ENDIF + # End Source File # Begin Source File diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp index e8ce23e6..15bf0d2c 100644 --- a/pjlib/build/pjlib_test.dsp +++ b/pjlib/build/pjlib_test.dsp @@ -128,15 +128,6 @@ SOURCE="..\src\pjlib-test\list.c" # Begin Source File SOURCE="..\src\pjlib-test\main.c" - -!IF "$(CFG)" == "pjlib_test - Win32 Release" - -!ELSEIF "$(CFG)" == "pjlib_test - Win32 Debug" - -# PROP Exclude_From_Build 1 - -!ENDIF - # End Source File # Begin Source File @@ -146,6 +137,7 @@ SOURCE="..\src\pjlib-test\main_mod.c" # Begin Source File SOURCE="..\src\pjlib-test\main_win32.c" +# PROP Exclude_From_Build 1 # End Source File # Begin Source File diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 86a5309f..bd8dafc4 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -327,6 +327,11 @@ PJ_DECL(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, * Note that asynchronous connect operation will automatically be * cancelled during the unregistration. * + * Also note that when I/O Completion Port backend is used, application + * MUST close the handle immediately after unregistering the key. This is + * because there is no unregistering API for IOCP. The only way to + * unregister the handle from IOCP is to close the handle. + * * @param key The key that was previously obtained from registration. * * @return PJ_SUCCESS on success or the error code. diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 79e46183..40dd31bb 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -99,6 +99,8 @@ enum handle_type HND_IS_SOCKET, }; +enum { POST_QUIT_LEN = 0xFFFFDEADUL }; + /* * Structure for individual socket. */ @@ -112,6 +114,7 @@ struct pj_ioqueue_key_t int connecting; #endif pj_ioqueue_callback cb; + pj_bool_t has_quit_signal; }; /* @@ -392,37 +395,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, return PJ_SUCCESS; } -/* - * pj_ioqueue_unregister() - */ -PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) -{ - PJ_ASSERT_RETURN(key, PJ_EINVAL); - -#if PJ_HAS_TCP - if (key->connecting) { - unsigned pos; - pj_ioqueue_t *ioqueue; - - ioqueue = key->ioqueue; - - /* Erase from connecting_handles */ - pj_lock_acquire(ioqueue->lock); - for (pos=0; pos < ioqueue->connecting_count; ++pos) { - if (ioqueue->connecting_keys[pos] == key) { - erase_connecting_socket(ioqueue, pos); - break; - } - } - key->connecting = 0; - pj_lock_release(ioqueue->lock); - } -#endif - if (key->hnd_type == HND_IS_FILE) { - CloseHandle(key->hnd); - } - return PJ_SUCCESS; -} /* * pj_ioqueue_get_user_data() @@ -449,34 +421,25 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, return PJ_SUCCESS; } + + /* - * pj_ioqueue_poll() - * - * Poll for events. + * Internal function to poll the I/O Completion Port, execute callback, + * and return the key and bytes transfered of the last operation. */ -PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) +static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, + pj_ssize_t *p_bytes, pj_ioqueue_key_t **p_key ) { - DWORD dwMsec, dwBytesTransfered, dwKey; + DWORD dwBytesTransfered, dwKey; generic_overlapped *pOv; pj_ioqueue_key_t *key; - int connect_count; pj_ssize_t size_status = -1; - BOOL rcGetQueued;; - - PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); - - /* Check the connecting array. */ -#if PJ_HAS_TCP - connect_count = check_connecting(ioqueue); -#endif - - /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ - dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; + BOOL rcGetQueued; /* Poll for completion status. */ - rcGetQueued = GetQueuedCompletionStatus(ioqueue->iocp, &dwBytesTransfered, + rcGetQueued = GetQueuedCompletionStatus(hIocp, &dwBytesTransfered, &dwKey, (OVERLAPPED**)&pOv, - dwMsec); + dwTimeout); /* The return value is: * - nonzero if event was dequeued. @@ -487,6 +450,25 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; + + /* Report to caller regardless */ + if (p_bytes) + *p_bytes = size_status; + if (p_key) + *p_key = key; + + /* If size_status is POST_QUIT_LEN, mark the key as quitting */ + if (size_status == POST_QUIT_LEN) { + key->has_quit_signal = 1; + return PJ_TRUE; + } + + /* We shouldn't call callbacks if key is quitting. + * But this should have been taken care by unregister function + * (the unregister function should have cleared out the callbacks) + */ + + /* Carry out the callback */ switch (pOv->operation) { case PJ_IOQUEUE_OP_READ: case PJ_IOQUEUE_OP_RECV: @@ -522,11 +504,121 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_assert(0); break; } - return connect_count+1; + return PJ_TRUE; } /* No event was queued. */ - return connect_count; + return PJ_FALSE; +} + +/* + * pj_ioqueue_unregister() + */ +PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) +{ + pj_ssize_t polled_len; + pj_ioqueue_key_t *polled_key; + generic_overlapped ov; + BOOL rc; + + PJ_ASSERT_RETURN(key, PJ_EINVAL); + +#if PJ_HAS_TCP + if (key->connecting) { + unsigned pos; + pj_ioqueue_t *ioqueue; + + ioqueue = key->ioqueue; + + /* Erase from connecting_handles */ + pj_lock_acquire(ioqueue->lock); + for (pos=0; pos < ioqueue->connecting_count; ++pos) { + if (ioqueue->connecting_keys[pos] == key) { + erase_connecting_socket(ioqueue, pos); + break; + } + } + key->connecting = 0; + pj_lock_release(ioqueue->lock); + } +#endif + + + /* Unregistering handle from IOCP is pretty tricky. + * + * Even after the socket has been closed, GetQueuedCompletionStatus + * may still return events for the handle. This will likely to + * cause crash in pjlib, because the key associated with the handle + * most likely will have been destroyed. + * + * The solution is to poll the IOCP until we're sure that there are + * no further events for the handle. + */ + + /* Clear up callbacks for the key. + * We don't want the callback to be called for this key. + */ + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; + key->cb.on_accept_complete = NULL; + key->cb.on_connect_complete = NULL; + + /* Init overlapped struct */ + pj_memset(&ov, 0, sizeof(ov)); + ov.operation = PJ_IOQUEUE_OP_READ; + + /* Post queued completion status with a special length. */ + rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, + (DWORD)key, &ov.overlapped); + + /* Poll IOCP until has_quit_signal is set in the key. + * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN + * is detected. We need to have this flag because POST_QUIT_LEN may be + * detected by other threads. + */ + do { + polled_len = 0; + polled_key = NULL; + + rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); + + } while (rc && !key->has_quit_signal); + + + /* Close handle if this is a file. */ + if (key->hnd_type == HND_IS_FILE) { + CloseHandle(key->hnd); + } + + return PJ_SUCCESS; +} + +/* + * pj_ioqueue_poll() + * + * Poll for events. + */ +PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) +{ + DWORD dwMsec; + int connect_count = 0; + pj_bool_t has_event; + + PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); + + /* Check the connecting array. */ +#if PJ_HAS_TCP + connect_count = check_connecting(ioqueue); +#endif + + /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ + dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; + + /* Poll for completion status. */ + has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); + + /* Return number of events. */ + return connect_count + has_event; } /* diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c index 5f933afa..b81764f2 100644 --- a/pjlib/src/pjlib-test/ioq_udp.c +++ b/pjlib/src/pjlib-test/ioq_udp.c @@ -313,6 +313,173 @@ on_error: } + +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + unsigned *p_packet_cnt = pj_ioqueue_get_user_data(key); + + PJ_UNUSED_ARG(op_key); + PJ_UNUSED_ARG(bytes_read); + + (*p_packet_cnt)++; +} + +/* + * unregister_test() + * Check if callback is still called after socket has been unregistered or + * closed. + */ +static int unregister_test(void) +{ + enum { RPORT = 50000, SPORT = 50001 }; + pj_pool_t *pool; + pj_ioqueue_t *ioqueue; + pj_sock_t ssock; + pj_sock_t rsock; + int addrlen; + pj_sockaddr_in addr; + pj_ioqueue_key_t *key; + pj_ioqueue_op_key_t opkey; + pj_ioqueue_callback cb; + unsigned packet_cnt; + char sendbuf[10], recvbuf[10]; + pj_ssize_t bytes; + pj_time_val timeout; + pj_status_t status; + + pool = pj_pool_create(mem, "test", 4000, 4000, NULL); + if (!pool) { + app_perror("Unable to create pool", PJ_ENOMEM); + return -100; + } + + status = pj_ioqueue_create(pool, 16, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("Error creating ioqueue", status); + return -110; + } + + /* Create sender socket */ + status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, SPORT, &ssock); + if (status != PJ_SUCCESS) { + app_perror("Error initializing socket", status); + return -120; + } + + /* Create receiver socket. */ + status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, RPORT, &rsock); + if (status != PJ_SUCCESS) { + app_perror("Error initializing socket", status); + return -130; + } + + /* Register rsock to ioqueue. */ + pj_memset(&cb, 0, sizeof(cb)); + cb.on_read_complete = &on_read_complete; + packet_cnt = 0; + status = pj_ioqueue_register_sock(pool, ioqueue, rsock, &packet_cnt, + &cb, &key); + if (status != PJ_SUCCESS) { + app_perror("Error registering to ioqueue", status); + return -140; + } + + /* Init operation key. */ + pj_ioqueue_op_key_init(&opkey, sizeof(opkey)); + + /* Start reading. */ + bytes = sizeof(recvbuf); + status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); + if (status != PJ_EPENDING) { + app_perror("Expecting PJ_EPENDING, but got this", status); + return -150; + } + + /* Init destination address. */ + addrlen = sizeof(addr); + status = pj_sock_getsockname(rsock, &addr, &addrlen); + if (status != PJ_SUCCESS) { + app_perror("getsockname error", status); + return -160; + } + + /* Override address with 127.0.0.1, since getsockname will return + * zero in the address field. + */ + addr.sin_addr = pj_inet_addr2("127.0.0.1"); + + /* Init buffer to send */ + pj_ansi_strcpy(sendbuf, "Hello0123"); + + /* Send one packet. */ + bytes = sizeof(sendbuf); + status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, + &addr, sizeof(addr)); + + if (status != PJ_SUCCESS) { + app_perror("sendto error", status); + return -170; + } + + /* Check if packet is received. */ + timeout.sec = 1; timeout.msec = 0; + pj_ioqueue_poll(ioqueue, &timeout); + + if (packet_cnt != 1) { + return -180; + } + + /* Just to make sure things are settled.. */ + pj_thread_sleep(100); + + /* Start reading again. */ + bytes = sizeof(recvbuf); + status = pj_ioqueue_recv( key, &opkey, recvbuf, &bytes, 0); + if (status != PJ_EPENDING) { + app_perror("Expecting PJ_EPENDING, but got this", status); + return -190; + } + + /* Reset packet counter */ + packet_cnt = 0; + + /* Send one packet. */ + bytes = sizeof(sendbuf); + status = pj_sock_sendto(ssock, sendbuf, &bytes, 0, + &addr, sizeof(addr)); + + if (status != PJ_SUCCESS) { + app_perror("sendto error", status); + return -200; + } + + /* Now unregister and close socket. */ + pj_ioqueue_unregister(key); + pj_sock_close(rsock); + + /* Poll ioqueue. */ + timeout.sec = 1; timeout.msec = 0; + pj_ioqueue_poll(ioqueue, &timeout); + + /* Must NOT receive any packets after socket is closed! */ + if (packet_cnt > 0) { + PJ_LOG(3,(THIS_FILE, "....errror: not expecting to receive packet " + "after socket has been closed")); + return -210; + } + + /* Success */ + pj_sock_close(ssock); + pj_ioqueue_destroy(ioqueue); + + pj_pool_release(pool); + + return 0; +} + + /* * Testing with many handles. * This will just test registering PJ_IOQUEUE_MAX_HANDLES count @@ -625,6 +792,13 @@ int udp_ioqueue_test() } PJ_LOG(3, (THIS_FILE, "....compliance test ok")); + + PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); + if ((status=unregister_test()) != 0) { + return status; + } + PJ_LOG(3, (THIS_FILE, "....unregister test ok")); + if ((status=many_handles_test()) != 0) { return status; } diff --git a/pjlib/src/pjlib-test/main_win32.c b/pjlib/src/pjlib-test/main_win32.c index 07d91870..6024a953 100644 --- a/pjlib/src/pjlib-test/main_win32.c +++ b/pjlib/src/pjlib-test/main_win32.c @@ -55,7 +55,7 @@ static void write_log(int level, const char *data, int len) PJ_UNUSED_ARG(level); PJ_UNUSED_ARG(len); SendMessage(hwndLog, EM_REPLACESEL, FALSE, - (LPARAM)PJ_STRING_TO_NATIVE(data,wdata)); + (LPARAM)PJ_STRING_TO_NATIVE(data,wdata,256)); } -- cgit v1.2.3