summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_winnt.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-30 16:32:18 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-30 16:32:18 +0000
commit974fbe67d6d62efadd129cc81b9072faf3b2f029 (patch)
tree82a44cd7c10d447766280047e035928166833348 /pjlib/src/pj/ioqueue_winnt.c
parent3cf609b42e573adf8e7183070176a450a7b4959e (diff)
Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@365 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c312
1 files changed, 247 insertions, 65 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 40dd31bb..4586c5ca 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -106,15 +106,24 @@ enum { POST_QUIT_LEN = 0xFFFFDEADUL };
*/
struct pj_ioqueue_key_t
{
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+
pj_ioqueue_t *ioqueue;
HANDLE hnd;
void *user_data;
enum handle_type hnd_type;
+ pj_ioqueue_callback cb;
+
#if PJ_HAS_TCP
int connecting;
#endif
- pj_ioqueue_callback cb;
- pj_bool_t has_quit_signal;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_atomic_t *ref_count;
+ pj_bool_t closing;
+ pj_time_val free_time;
+#endif
+
};
/*
@@ -125,9 +134,17 @@ struct pj_ioqueue_t
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_ioqueue_key_t active_list;
+ pj_ioqueue_key_t free_list;
+ pj_ioqueue_key_t closing_list;
+#endif
+
+ /* These are to keep track of connecting sockets */
+#if PJ_HAS_TCP
unsigned event_count;
HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
-#if PJ_HAS_TCP
unsigned connecting_count;
HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
@@ -279,6 +296,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioqueue;
+ unsigned i;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
@@ -290,11 +308,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
+ /* Create IOCP */
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ioqueue->iocp == NULL)
return PJ_RETURN_OS_ERROR(GetLastError());
+ /* Create IOCP mutex */
rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
@@ -303,6 +323,38 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
ioqueue->auto_delete_lock = PJ_TRUE;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /*
+ * Create and initialize key pools.
+ */
+ pj_list_init(&ioqueue->active_list);
+ pj_list_init(&ioqueue->free_list);
+ pj_list_init(&ioqueue->closing_list);
+
+ /* Preallocate keys according to max_fd setting, and put them
+ * in free_list.
+ */
+ for (i=0; i<max_fd; ++i) {
+ pj_ioqueue_key_t *key;
+
+ key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
+
+ rc = pj_atomic_create(pool, 0, &key->ref_count);
+ if (rc != PJ_SUCCESS) {
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+
+ pj_list_push_back(&ioqueue->free_list, key);
+
+ }
+#endif
+
*p_ioqueue = ioqueue;
PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
@@ -315,19 +367,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
unsigned i;
+ pj_ioqueue_key_t *key;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_HAS_TCP
/* Destroy events in the pool */
for (i=0; i<ioqueue->event_count; ++i) {
CloseHandle(ioqueue->event_pool[i]);
}
ioqueue->event_count = 0;
+#endif
if (CloseHandle(ioqueue->iocp) != TRUE)
return PJ_RETURN_OS_ERROR(GetLastError());
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Destroy reference counters */
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+#endif
+
if (ioqueue->auto_delete_lock)
pj_lock_destroy(ioqueue->lock);
@@ -370,28 +448,64 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
- /* Build the key for this socket. */
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* If safe unregistration is used, then get the key record from
+ * the free list.
+ */
+ if (pj_list_empty(&ioqueue->free_list)) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_ETOOMANY;
+ }
+
+ rec = ioqueue->free_list.next;
+ pj_list_erase(rec);
+
+ /* Set initial reference count to 1 */
+ pj_assert(pj_atomic_get(rec->ref_count) == 0);
+ pj_atomic_inc(rec->ref_count);
+
+ rec->closing = 0;
+
+#else
rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+#endif
+
+ /* Build the key for this socket. */
rec->ioqueue = ioqueue;
rec->hnd = (HANDLE)sock;
rec->hnd_type = HND_IS_SOCKET;
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
+#if PJ_HAS_TCP
+ rec->connecting = 0;
+#endif
+
/* Set socket to nonblocking. */
value = 1;
rc = ioctlsocket(sock, FIONBIO, &value);
if (rc != 0) {
+ pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Associate with IOCP */
hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
if (!hioq) {
+ pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(GetLastError());
}
*key = rec;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_list_push_back(&ioqueue->active_list, rec);
+#endif
+
+ pj_lock_release(ioqueue->lock);
+
return PJ_SUCCESS;
}
@@ -422,9 +536,31 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Decrement the key's reference counter, and when the counter reach zero,
+ * destroy the key.
+ */
+static void decrement_counter(pj_ioqueue_key_t *key)
+{
+ if (pj_atomic_dec_and_get(key->ref_count) == 0) {
+
+ pj_lock_acquire(key->ioqueue->lock);
+
+ pj_assert(key->closing == 1);
+ pj_gettimeofday(&key->free_time);
+ key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
+ pj_time_val_normalize(&key->free_time);
+
+ pj_list_erase(key);
+ pj_list_push_back(&key->ioqueue->closing_list, key);
+
+ pj_lock_release(key->ioqueue->lock);
+ }
+}
+#endif
/*
- * Internal function to poll the I/O Completion Port, execute callback,
+ * Poll the I/O Completion Port, execute callback,
* and return the key and bytes transfered of the last operation.
*/
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
@@ -457,16 +593,16 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (p_key)
*p_key = key;
- /* If size_status is POST_QUIT_LEN, mark the key as quitting */
- if (size_status == POST_QUIT_LEN) {
- key->has_quit_signal = 1;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* We shouldn't call callbacks if key is quitting. */
+ if (key->closing)
return PJ_TRUE;
- }
- /* We shouldn't call callbacks if key is quitting.
- * But this should have been taken care by unregister function
- * (the unregister function should have cleared out the callbacks)
+ /* Increment reference counter to prevent this key from being
+ * deleted
*/
+ pj_atomic_inc(key->ref_count);
+#endif
/* Carry out the callback */
switch (pOv->operation) {
@@ -504,6 +640,11 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_assert(0);
break;
}
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ decrement_counter(key);
+#endif
+
return PJ_TRUE;
}
@@ -516,11 +657,6 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
- pj_ssize_t polled_len;
- pj_ioqueue_key_t *polled_key;
- generic_overlapped ov;
- BOOL rc;
-
PJ_ASSERT_RETURN(key, PJ_EINVAL);
#if PJ_HAS_TCP
@@ -542,53 +678,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
pj_lock_release(ioqueue->lock);
}
#endif
-
-
- /* Unregistering handle from IOCP is pretty tricky.
- *
- * Even after the socket has been closed, GetQueuedCompletionStatus
- * may still return events for the handle. This will likely to
- * cause crash in pjlib, because the key associated with the handle
- * most likely will have been destroyed.
- *
- * The solution is to poll the IOCP until we're sure that there are
- * no further events for the handle.
+
+ /* Close handle (the only way to disassociate handle from IOCP).
+ * We also need to close handle to make sure that no further events
+ * will come to the handle.
*/
+ CloseHandle(key->hnd);
- /* Clear up callbacks for the key.
- * We don't want the callback to be called for this key.
- */
- key->cb.on_read_complete = NULL;
- key->cb.on_write_complete = NULL;
+ /* Reset callbacks */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
+ key->cb.on_read_complete = NULL;
+ key->cb.on_write_complete = NULL;
- /* Init overlapped struct */
- pj_memset(&ov, 0, sizeof(ov));
- ov.operation = PJ_IOQUEUE_OP_READ;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Mark key as closing. */
+ key->closing = 1;
- /* Post queued completion status with a special length. */
- rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN,
- (DWORD)key, &ov.overlapped);
+ /* Decrement reference counter. */
+ decrement_counter(key);
- /* Poll IOCP until has_quit_signal is set in the key.
- * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN
- * is detected. We need to have this flag because POST_QUIT_LEN may be
- * detected by other threads.
+ /* Even after handle is closed, I suspect that IOCP may still try to
+ * do something with the handle, causing memory corruption when pool
+ * debugging is enabled.
+ *
+ * Forcing context switch seems to have fixed that, but this is quite
+ * an ugly solution..
*/
- do {
- polled_len = 0;
- polled_key = NULL;
-
- rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key);
-
- } while (rc && !key->has_quit_signal);
-
-
- /* Close handle if this is a file. */
- if (key->hnd_type == HND_IS_FILE) {
- CloseHandle(key->hnd);
- }
+ pj_thread_sleep(0);
+#endif
return PJ_SUCCESS;
}
@@ -602,23 +720,57 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
DWORD dwMsec;
int connect_count = 0;
- pj_bool_t has_event;
+ int event_count = 0;
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
- /* Check the connecting array. */
-#if PJ_HAS_TCP
- connect_count = check_connecting(ioqueue);
-#endif
-
/* Calculate miliseconds timeout for GetQueuedCompletionStatus */
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
/* Poll for completion status. */
- has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+ event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+#if PJ_HAS_TCP
+ /* Check the connecting array, only when there's no activity. */
+ if (event_count == 0) {
+ connect_count = check_connecting(ioqueue);
+ if (connect_count > 0)
+ event_count += connect_count;
+ }
+#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check the closing keys only when there's no activity and when there are
+ * pending closing keys.
+ */
+ if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
+ pj_time_val now;
+ pj_ioqueue_key_t *key;
+
+ pj_gettimeofday(&now);
+
+ /* Move closing keys to free list when they've finished the closing
+ * idle time.
+ */
+ pj_lock_acquire(ioqueue->lock);
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = key->next;
+
+ pj_assert(key->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+ pj_list_erase(key);
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+ key = next;
+ }
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
/* Return number of events. */
- return connect_count + has_event;
+ return event_count;
}
/*
@@ -645,6 +797,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
@@ -715,6 +873,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
@@ -799,7 +963,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
-
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
/*
@@ -872,6 +1042,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
/*
* See if there is a new connection immediately available.
*/
@@ -962,6 +1138,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
/* Initiate connect() */
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
DWORD dwStatus;