diff options
author | Benny Prijono <bennylp@teluu.com> | 2006-03-30 16:32:18 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2006-03-30 16:32:18 +0000 |
commit | 974fbe67d6d62efadd129cc81b9072faf3b2f029 (patch) | |
tree | 82a44cd7c10d447766280047e035928166833348 /pjlib/src/pj/ioqueue_winnt.c | |
parent | 3cf609b42e573adf8e7183070176a450a7b4959e (diff) |
Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@365 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 312 |
1 files changed, 247 insertions, 65 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 40dd31bb..4586c5ca 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -106,15 +106,24 @@ enum { POST_QUIT_LEN = 0xFFFFDEADUL }; */ struct pj_ioqueue_key_t { + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); + pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; enum handle_type hnd_type; + pj_ioqueue_callback cb; + #if PJ_HAS_TCP int connecting; #endif - pj_ioqueue_callback cb; - pj_bool_t has_quit_signal; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_atomic_t *ref_count; + pj_bool_t closing; + pj_time_val free_time; +#endif + }; /* @@ -125,9 +134,17 @@ struct pj_ioqueue_t HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_ioqueue_key_t active_list; + pj_ioqueue_key_t free_list; + pj_ioqueue_key_t closing_list; +#endif + + /* These are to keep track of connecting sockets */ +#if PJ_HAS_TCP unsigned event_count; HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; -#if PJ_HAS_TCP unsigned connecting_count; HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; @@ -279,6 +296,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_ioqueue_t **p_ioqueue) { pj_ioqueue_t *ioqueue; + unsigned i; pj_status_t rc; PJ_UNUSED_ARG(max_fd); @@ -290,11 +308,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); + /* Create IOCP */ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); + /* Create IOCP mutex */ rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { CloseHandle(ioqueue->iocp); @@ -303,6 +323,38 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, ioqueue->auto_delete_lock = PJ_TRUE; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* + * Create and initialize key pools. + */ + pj_list_init(&ioqueue->active_list); + pj_list_init(&ioqueue->free_list); + pj_list_init(&ioqueue->closing_list); + + /* Preallocate keys according to max_fd setting, and put them + * in free_list. + */ + for (i=0; i<max_fd; ++i) { + pj_ioqueue_key_t *key; + + key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t)); + + rc = pj_atomic_create(pool, 0, &key->ref_count); + if (rc != PJ_SUCCESS) { + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + CloseHandle(ioqueue->iocp); + return rc; + } + + pj_list_push_back(&ioqueue->free_list, key); + + } +#endif + *p_ioqueue = ioqueue; PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); @@ -315,19 +367,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { unsigned i; + pj_ioqueue_key_t *key; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + pj_lock_acquire(ioqueue->lock); + +#if PJ_HAS_TCP /* Destroy events in the pool */ for (i=0; i<ioqueue->event_count; ++i) { CloseHandle(ioqueue->event_pool[i]); } ioqueue->event_count = 0; +#endif if (CloseHandle(ioqueue->iocp) != TRUE) return PJ_RETURN_OS_ERROR(GetLastError()); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Destroy reference counters */ + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } +#endif + if (ioqueue->auto_delete_lock) pj_lock_destroy(ioqueue->lock); @@ -370,28 +448,64 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); - /* Build the key for this socket. */ + pj_lock_acquire(ioqueue->lock); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* If safe unregistration is used, then get the key record from + * the free list. + */ + if (pj_list_empty(&ioqueue->free_list)) { + pj_lock_release(ioqueue->lock); + return PJ_ETOOMANY; + } + + rec = ioqueue->free_list.next; + pj_list_erase(rec); + + /* Set initial reference count to 1 */ + pj_assert(pj_atomic_get(rec->ref_count) == 0); + pj_atomic_inc(rec->ref_count); + + rec->closing = 0; + +#else rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); +#endif + + /* Build the key for this socket. */ rec->ioqueue = ioqueue; rec->hnd = (HANDLE)sock; rec->hnd_type = HND_IS_SOCKET; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); +#if PJ_HAS_TCP + rec->connecting = 0; +#endif + /* Set socket to nonblocking. */ value = 1; rc = ioctlsocket(sock, FIONBIO, &value); if (rc != 0) { + pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(WSAGetLastError()); } /* Associate with IOCP */ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { + pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(GetLastError()); } *key = rec; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_list_push_back(&ioqueue->active_list, rec); +#endif + + pj_lock_release(ioqueue->lock); + return PJ_SUCCESS; } @@ -422,9 +536,31 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Decrement the key's reference counter, and when the counter reach zero, + * destroy the key. + */ +static void decrement_counter(pj_ioqueue_key_t *key) +{ + if (pj_atomic_dec_and_get(key->ref_count) == 0) { + + pj_lock_acquire(key->ioqueue->lock); + + pj_assert(key->closing == 1); + pj_gettimeofday(&key->free_time); + key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; + pj_time_val_normalize(&key->free_time); + + pj_list_erase(key); + pj_list_push_back(&key->ioqueue->closing_list, key); + + pj_lock_release(key->ioqueue->lock); + } +} +#endif /* - * Internal function to poll the I/O Completion Port, execute callback, + * Poll the I/O Completion Port, execute callback, * and return the key and bytes transfered of the last operation. */ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, @@ -457,16 +593,16 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (p_key) *p_key = key; - /* If size_status is POST_QUIT_LEN, mark the key as quitting */ - if (size_status == POST_QUIT_LEN) { - key->has_quit_signal = 1; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* We shouldn't call callbacks if key is quitting. */ + if (key->closing) return PJ_TRUE; - } - /* We shouldn't call callbacks if key is quitting. - * But this should have been taken care by unregister function - * (the unregister function should have cleared out the callbacks) + /* Increment reference counter to prevent this key from being + * deleted */ + pj_atomic_inc(key->ref_count); +#endif /* Carry out the callback */ switch (pOv->operation) { @@ -504,6 +640,11 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, pj_assert(0); break; } + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + decrement_counter(key); +#endif + return PJ_TRUE; } @@ -516,11 +657,6 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { - pj_ssize_t polled_len; - pj_ioqueue_key_t *polled_key; - generic_overlapped ov; - BOOL rc; - PJ_ASSERT_RETURN(key, PJ_EINVAL); #if PJ_HAS_TCP @@ -542,53 +678,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) pj_lock_release(ioqueue->lock); } #endif - - - /* Unregistering handle from IOCP is pretty tricky. - * - * Even after the socket has been closed, GetQueuedCompletionStatus - * may still return events for the handle. This will likely to - * cause crash in pjlib, because the key associated with the handle - * most likely will have been destroyed. - * - * The solution is to poll the IOCP until we're sure that there are - * no further events for the handle. + + /* Close handle (the only way to disassociate handle from IOCP). + * We also need to close handle to make sure that no further events + * will come to the handle. */ + CloseHandle(key->hnd); - /* Clear up callbacks for the key. - * We don't want the callback to be called for this key. - */ - key->cb.on_read_complete = NULL; - key->cb.on_write_complete = NULL; + /* Reset callbacks */ key->cb.on_accept_complete = NULL; key->cb.on_connect_complete = NULL; + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; - /* Init overlapped struct */ - pj_memset(&ov, 0, sizeof(ov)); - ov.operation = PJ_IOQUEUE_OP_READ; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Mark key as closing. */ + key->closing = 1; - /* Post queued completion status with a special length. */ - rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, - (DWORD)key, &ov.overlapped); + /* Decrement reference counter. */ + decrement_counter(key); - /* Poll IOCP until has_quit_signal is set in the key. - * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN - * is detected. We need to have this flag because POST_QUIT_LEN may be - * detected by other threads. + /* Even after handle is closed, I suspect that IOCP may still try to + * do something with the handle, causing memory corruption when pool + * debugging is enabled. + * + * Forcing context switch seems to have fixed that, but this is quite + * an ugly solution.. */ - do { - polled_len = 0; - polled_key = NULL; - - rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); - - } while (rc && !key->has_quit_signal); - - - /* Close handle if this is a file. */ - if (key->hnd_type == HND_IS_FILE) { - CloseHandle(key->hnd); - } + pj_thread_sleep(0); +#endif return PJ_SUCCESS; } @@ -602,23 +720,57 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { DWORD dwMsec; int connect_count = 0; - pj_bool_t has_event; + int event_count = 0; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); - /* Check the connecting array. */ -#if PJ_HAS_TCP - connect_count = check_connecting(ioqueue); -#endif - /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; /* Poll for completion status. */ - has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); + event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); + +#if PJ_HAS_TCP + /* Check the connecting array, only when there's no activity. */ + if (event_count == 0) { + connect_count = check_connecting(ioqueue); + if (connect_count > 0) + event_count += connect_count; + } +#endif + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check the closing keys only when there's no activity and when there are + * pending closing keys. + */ + if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { + pj_time_val now; + pj_ioqueue_key_t *key; + + pj_gettimeofday(&now); + + /* Move closing keys to free list when they've finished the closing + * idle time. + */ + pj_lock_acquire(ioqueue->lock); + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = key->next; + + pj_assert(key->closing != 0); + + if (PJ_TIME_VAL_GTE(now, key->free_time)) { + pj_list_erase(key); + pj_list_push_back(&ioqueue->free_list, key); + } + key = next; + } + pj_lock_release(ioqueue->lock); + } +#endif /* Return number of events. */ - return connect_count + has_event; + return event_count; } /* @@ -645,6 +797,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; @@ -715,6 +873,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; @@ -799,7 +963,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); - + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; /* @@ -872,6 +1042,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + /* * See if there is a new connection immediately available. */ @@ -962,6 +1138,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { DWORD dwStatus; |