From 974fbe67d6d62efadd129cc81b9072faf3b2f029 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Thu, 30 Mar 2006 16:32:18 +0000 Subject: Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@365 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_common_abs.c | 214 ++++++++++------------ pjlib/src/pj/ioqueue_common_abs.h | 13 +- pjlib/src/pj/ioqueue_select.c | 246 +++++++++++++++++++++++--- pjlib/src/pj/ioqueue_winnt.c | 312 +++++++++++++++++++++++++------- pjlib/src/pj/pool_dbg_win32.c | 237 ------------------------- pjlib/src/pjlib-test/ioq_perf.c | 2 - pjlib/src/pjlib-test/ioq_udp.c | 5 - pjlib/src/pjlib-test/ioq_unreg.c | 361 ++++++++++++++++++++++++++++++++++++++ pjlib/src/pjlib-test/test.c | 4 + pjlib/src/pjlib-test/test.h | 2 + 10 files changed, 934 insertions(+), 462 deletions(-) delete mode 100644 pjlib/src/pj/pool_dbg_win32.c create mode 100644 pjlib/src/pjlib-test/ioq_unreg.c (limited to 'pjlib/src') diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 30e2602e..b128d810 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -27,25 +27,10 @@ * This file is NOT supposed to be compiled as stand-alone source. */ -static long ioqueue_tls_id = -1; - -typedef struct key_lock_data { - struct key_lock_data *prev; - pj_ioqueue_key_t *key; - int is_alive; -} key_lock_data; - - static void ioqueue_init( pj_ioqueue_t *ioqueue ) { ioqueue->lock = NULL; ioqueue->auto_delete_lock = 0; - - if (ioqueue_tls_id == -1) { - pj_status_t status; - status = pj_thread_local_alloc(&ioqueue_tls_id); - pj_thread_local_set(ioqueue_tls_id, NULL); - } } static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) @@ -93,11 +78,20 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, pj_list_init(&key->write_list); #if PJ_HAS_TCP pj_list_init(&key->accept_list); + key->connecting = 0; #endif /* Save callback. */ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Set initial reference count to 1 */ + pj_assert(key->ref_count == 0); + ++key->ref_count; + + key->closing = 0; +#endif + /* Get socket type. When socket type is datagram, some optimization * will be performed during send to allow parallel send operations. */ @@ -107,68 +101,14 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, if (rc != PJ_SUCCESS) key->fd_type = PJ_SOCK_STREAM; - key->inside_callback = 0; - key->destroy_requested = 0; - /* Create mutex for the key. */ - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); +#if !PJ_IOQUEUE_HAS_SAFE_UNREG + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); +#endif return rc; } -/* Lock the key and also keep the lock data in thread local storage. - * The lock data is used to detect if the key is deleted by application - * when we call its callback. - */ -static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) -{ - struct key_lock_data *prev_data; - - pj_mutex_lock(key->mutex); - prev_data = (struct key_lock_data *) - pj_thread_local_get(ioqueue_tls_id); - lck->prev = prev_data; - lck->key = key; - lck->is_alive = 1; - pj_thread_local_set(ioqueue_tls_id, lck); -} - -/* Unlock the key only if it is still valid. */ -static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) -{ - pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); - pj_assert( lck->key == key ); - pj_thread_local_set(ioqueue_tls_id, lck->prev); - if (lck->is_alive) - pj_mutex_unlock(key->mutex); -} - -/* Destroy key */ -static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) -{ - key_lock_data *lck; - - /* Make sure that no other threads are doing something with - * the key. - */ - pj_mutex_lock(key->mutex); - - /* Check if this function is called within a callback context. - * If so, then we need to inform the callback that the key has - * been destroyed so that it doesn't attempt to unlock the - * key's mutex. - */ - lck = pj_thread_local_get(ioqueue_tls_id); - while (lck) { - if (lck->key == key) { - lck->is_alive = 0; - } - lck = lck->prev; - } - - pj_mutex_destroy(key->mutex); -} - /* * pj_ioqueue_get_user_data() * @@ -221,6 +161,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +# define IS_CLOSING(key) (key->closing) +#else +# define IS_CLOSING(key) (0) +#endif + + /* * ioqueue_dispatch_event() * @@ -229,10 +176,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) */ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { - key_lock_data lck_data; - /* Lock the key. */ - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); + + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 if (h->connecting) { @@ -245,8 +195,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //pj_mutex_unlock(h->mutex); #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) /* from connect(2): @@ -293,8 +241,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } #endif + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_connect_complete) + if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, bytes_transfered); /* Done. */ @@ -319,7 +270,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - //pj_mutex_unlock(h->mutex); } /* Send the data. @@ -365,19 +315,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - /* No need to hold mutex anymore */ - //pj_mutex_unlock(h->mutex); } + /* No need to hold mutex anymore */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_write_complete) { + if (h->cb.on_write_complete && !IS_CLOSING(h)) { (*h->cb.on_write_complete)(h, (pj_ioqueue_op_key_t*)write_op, write_op->written); } } else { - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } /* Done. */ @@ -387,20 +338,21 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * are signalled for the same event, but only one thread eventually * able to process the event. */ - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } - - /* Finally unlock key */ - unlock_key(h, &lck_data); } void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { - key_lock_data lck_data; pj_status_t rc; /* Lock the key. */ - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); + + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } # if PJ_HAS_TCP if (!pj_list_empty(&h->accept_list)) { @@ -416,9 +368,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (pj_list_empty(&h->accept_list)) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //pj_mutex_unlock(h->mutex); - rc=pj_sock_accept(h->fd, accept_op->accept_fd, accept_op->rmt_addr, accept_op->addrlen); if (rc==PJ_SUCCESS && accept_op->local_addr) { @@ -427,8 +376,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) accept_op->addrlen); } + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_accept_complete) { + if (h->cb.on_accept_complete && !IS_CLOSING(h)) { (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, *accept_op->accept_fd, rc); @@ -449,11 +401,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (pj_list_empty(&h->read_list)) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //Crash as of revision 353 (since we added pjmedia socket to - //main ioqueue). - //pj_mutex_unlock(h->mutex); - bytes_read = read_op->size; if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { @@ -516,8 +463,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read = -rc; } + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_read_complete) { + if (h->cb.on_read_complete && !IS_CLOSING(h)) { (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, bytes_read); @@ -529,44 +479,41 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * are signalled for the same event, but only one thread eventually * able to process the event. */ - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } - - /* Unlock handle. */ - unlock_key(h, &lck_data); } void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { - key_lock_data lck_data; - - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); if (!h->connecting) { /* It is possible that more than one thread was woken up, thus * the remaining thread will see h->connecting as zero because * it has been processed by other thread. */ - //pj_mutex_unlock(h->mutex); - unlock_key(h, &lck_data); + pj_mutex_unlock(h->mutex); return; } + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } + /* Clear operation. */ h->connecting = 0; - //pj_mutex_unlock(h->mutex); - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_connect_complete) + if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, -1); - - unlock_key(h, &lck_data); } /* @@ -588,6 +535,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, read_op = (struct read_operation*)op_key; read_op->op = 0; + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + /* Try to see if there's data immediately available. */ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { @@ -646,6 +597,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + read_op = (struct read_operation*)op_key; read_op->op = 0; @@ -710,6 +665,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + write_op = (struct write_operation*)op_key; write_op->op = 0; @@ -788,6 +747,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + write_op = (struct write_operation*)op_key; write_op->op = 0; @@ -869,6 +832,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + accept_op = (struct accept_operation*)op_key; accept_op->op = 0; @@ -930,6 +897,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + /* Check if socket has not been marked for connecting */ if (key->connecting != 0) return PJ_EPENDING; @@ -986,13 +957,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, pj_ssize_t bytes_status ) { struct generic_operation *op_rec; - key_lock_data lck_data; /* * Find the operation key in all pending operation list to * really make sure that it's still there; then call the callback. */ - lock_key(key, &lck_data); + pj_mutex_lock(key->mutex); /* Find the operation in the pending read list. */ op_rec = (struct generic_operation*)key->read_list.next; @@ -1000,11 +970,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_read_complete)(key, op_key, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -1016,11 +984,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_write_complete)(key, op_key, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -1032,19 +998,17 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_accept_complete)(key, op_key, PJ_INVALID_SOCKET, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; } - unlock_key(key, &lck_data); + pj_mutex_unlock(key->mutex); return PJ_EINVALIDOP; } diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index 1ca70aa8..e7d05561 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -87,6 +87,16 @@ union operation_key #endif }; +#if PJ_IOQUEUE_HAS_SAFE_UNREG +# define UNREG_FIELDS \ + unsigned ref_count; \ + pj_bool_t closing; \ + pj_time_val free_time; \ + +#else +# define UNREG_FIELDS +#endif + #define DECLARE_COMMON_KEY \ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ pj_ioqueue_t *ioqueue; \ @@ -100,7 +110,8 @@ union operation_key int connecting; \ struct read_operation read_list; \ struct write_operation write_list; \ - struct accept_operation accept_list; + struct accept_operation accept_list; \ + UNREG_FIELDS #define DECLARE_COMMON_IOQUEUE \ diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 16a511a8..4aa4f910 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -109,12 +109,18 @@ struct pj_ioqueue_t DECLARE_COMMON_IOQUEUE unsigned max, count; - pj_ioqueue_key_t key_list; + pj_ioqueue_key_t active_list; pj_fd_set_t rfdset; pj_fd_set_t wfdset; #if PJ_HAS_TCP pj_fd_set_t xfdset; #endif + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_mutex_t *ref_cnt_mutex; + pj_ioqueue_key_t closing_list; + pj_ioqueue_key_t free_list; +#endif }; /* Include implementation for common abstraction after we declare @@ -141,6 +147,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, { pj_ioqueue_t *ioqueue; pj_lock_t *lock; + unsigned i; pj_status_t rc; /* Check that arguments are valid. */ @@ -152,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); + /* Create and init common ioqueue stuffs */ ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t)); - ioqueue_init(ioqueue); ioqueue->max = max_fd; @@ -163,8 +170,49 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, #if PJ_HAS_TCP PJ_FD_ZERO(&ioqueue->xfdset); #endif - pj_list_init(&ioqueue->key_list); + pj_list_init(&ioqueue->active_list); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* When safe unregistration is used (the default), we pre-create + * all keys and put them in the free list. + */ + + /* Mutex to protect key's reference counter + * We don't want to use key's mutex or ioqueue's mutex because + * that would create deadlock situation in some cases. + */ + rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); + if (rc != PJ_SUCCESS) + return rc; + + + /* Init key list */ + pj_list_init(&ioqueue->free_list); + pj_list_init(&ioqueue->closing_list); + + /* Pre-create all keys according to max_fd */ + for (i=0; iref_count = 0; + rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + if (rc != PJ_SUCCESS) { + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + pj_mutex_destroy(ioqueue->ref_cnt_mutex); + return rc; + } + + pj_list_push_back(&ioqueue->free_list, key); + } +#endif + + /* Create and init ioqueue mutex */ rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; @@ -186,9 +234,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { + pj_ioqueue_key_t *key; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); pj_lock_acquire(ioqueue->lock); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Destroy reference counters */ + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + pj_mutex_destroy(ioqueue->ref_cnt_mutex); +#endif + return ioqueue_destroy(ioqueue); } @@ -196,7 +270,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) /* * pj_ioqueue_register_sock() * - * Register a handle to ioqueue. + * Register socket handle to ioqueue. */ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_ioqueue_t *ioqueue, @@ -219,6 +293,28 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, goto on_return; } + /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get + * the key from the free list. Otherwise allocate a new one. + */ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_assert(!pj_list_empty(&ioqueue->free_list)); + if (pj_list_empty(&ioqueue->free_list)) { + rc = PJ_ETOOMANY; + goto on_return; + } + + key = ioqueue->free_list.next; + pj_list_erase(key); +#else + key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); +#endif + + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + if (rc != PJ_SUCCESS) { + key = NULL; + goto on_return; + } + /* Set socket to nonblocking. */ value = 1; #if defined(PJ_WIN32) && PJ_WIN32!=0 || \ @@ -231,16 +327,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, goto on_return; } - /* Create key. */ - key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); - rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); - if (rc != PJ_SUCCESS) { - key = NULL; - goto on_return; - } - /* Register */ - pj_list_insert_before(&ioqueue->key_list, key); + /* Put in active list. */ + pj_list_insert_before(&ioqueue->active_list, key); ++ioqueue->count; on_return: @@ -251,6 +340,41 @@ on_return: return rc; } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Increment key's reference counter */ +static void increment_counter(pj_ioqueue_key_t *key) +{ + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + ++key->ref_count; + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); +} + +/* Decrement the key's reference counter, and when the counter reach zero, + * destroy the key. + * + * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. + */ +static void decrement_counter(pj_ioqueue_key_t *key) +{ + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + --key->ref_count; + if (key->ref_count == 0) { + + pj_assert(key->closing == 1); + pj_gettimeofday(&key->free_time); + key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; + pj_time_val_normalize(&key->free_time); + + pj_lock_acquire(key->ioqueue->lock); + pj_list_erase(key); + pj_list_push_back(&key->ioqueue->closing_list, key); + pj_lock_release(key->ioqueue->lock); + } + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); +} +#endif + + /* * pj_ioqueue_unregister() * @@ -264,6 +388,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) ioqueue = key->ioqueue; + /* Lock the key to make sure no callback is simultaneously modifying + * the key. We need to lock the key before ioqueue here to prevent + * deadlock. + */ + pj_mutex_lock(key->mutex); + + /* Also lock ioqueue */ pj_lock_acquire(ioqueue->lock); pj_assert(ioqueue->count > 0); @@ -275,15 +406,32 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) PJ_FD_CLR(key->fd, &ioqueue->xfdset); #endif - /* ioqueue_destroy may try to acquire key's mutex. - * Since normally the order of locking is to lock key's mutex first - * then ioqueue's mutex, ioqueue_destroy may deadlock unless we - * release ioqueue's mutex first. + /* Close socket. */ + pj_sock_close(key->fd); + + /* Clear callback */ + key->cb.on_accept_complete = NULL; + key->cb.on_connect_complete = NULL; + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; + + /* Must release ioqueue lock first before decrementing counter, to + * prevent deadlock. */ pj_lock_release(ioqueue->lock); - /* Destroy the key. */ - ioqueue_destroy_key(key); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Mark key is closing. */ + key->closing = 1; + + /* Decrement counter. */ + decrement_counter(key); + + /* Done. */ + pj_mutex_unlock(key->mutex); +#else + pj_mutex_destroy(key->mutex); +#endif return PJ_SUCCESS; } @@ -308,8 +456,8 @@ static void validate_sets(const pj_ioqueue_t *ioqueue, */ pj_assert(0); - key = ioqueue->key_list.next; - while (key != &ioqueue->key_list) { + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { if (!pj_list_empty(&key->read_list) #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0 || !pj_list_empty(&key->accept_list) @@ -395,6 +543,30 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, pj_lock_release(ioqueue->lock); } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan closing keys to be put to free list again */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue) +{ + pj_time_val now; + pj_ioqueue_key_t *h; + + pj_gettimeofday(&now); + h = ioqueue->closing_list.next; + while (h != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = h->next; + + pj_assert(h->closing != 0); + + if (PJ_TIME_VAL_GTE(now, h->free_time)) { + pj_list_erase(h); + pj_list_push_back(&ioqueue->free_list, h); + } + h = next; + } +} +#endif + + /* * pj_ioqueue_poll() * @@ -435,7 +607,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_FD_COUNT(&ioqueue->wfdset)==0 && PJ_FD_COUNT(&ioqueue->xfdset)==0) { - pj_lock_release(ioqueue->lock); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + scan_closing_keys(ioqueue); +#endif + pj_lock_release(ioqueue->lock); if (timeout) pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout)); return 0; @@ -475,11 +650,15 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ - h = ioqueue->key_list.next; - for ( ; h!=&ioqueue->key_list && counternext) { + h = ioqueue->active_list.next; + for ( ; h!=&ioqueue->active_list && counternext) { + if ( (key_has_pending_write(h) || key_has_pending_connect(h)) - && PJ_FD_ISSET(h->fd, &wfdset)) + && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing) { +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif event[counter].key = h; event[counter].event_type = WRITEABLE_EVENT; ++counter; @@ -487,15 +666,23 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Scan for readable socket. */ if ((key_has_pending_read(h) || key_has_pending_accept(h)) - && PJ_FD_ISSET(h->fd, &rfdset)) + && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing) { +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif event[counter].key = h; event[counter].event_type = READABLE_EVENT; ++counter; } #if PJ_HAS_TCP - if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) { + if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && + !h->closing) + { +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif event[counter].key = h; event[counter].event_type = EXCEPTION_EVENT; ++counter; @@ -525,8 +712,13 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_assert(!"Invalid event!"); break; } + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + decrement_counter(event[counter].key); +#endif } + return count; } diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 40dd31bb..4586c5ca 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -106,15 +106,24 @@ enum { POST_QUIT_LEN = 0xFFFFDEADUL }; */ struct pj_ioqueue_key_t { + PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); + pj_ioqueue_t *ioqueue; HANDLE hnd; void *user_data; enum handle_type hnd_type; + pj_ioqueue_callback cb; + #if PJ_HAS_TCP int connecting; #endif - pj_ioqueue_callback cb; - pj_bool_t has_quit_signal; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_atomic_t *ref_count; + pj_bool_t closing; + pj_time_val free_time; +#endif + }; /* @@ -125,9 +134,17 @@ struct pj_ioqueue_t HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_ioqueue_key_t active_list; + pj_ioqueue_key_t free_list; + pj_ioqueue_key_t closing_list; +#endif + + /* These are to keep track of connecting sockets */ +#if PJ_HAS_TCP unsigned event_count; HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1]; -#if PJ_HAS_TCP unsigned connecting_count; HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1]; pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1]; @@ -279,6 +296,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_ioqueue_t **p_ioqueue) { pj_ioqueue_t *ioqueue; + unsigned i; pj_status_t rc; PJ_UNUSED_ARG(max_fd); @@ -290,11 +308,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >= sizeof(union operation_key), PJ_EBUG); + /* Create IOCP */ ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue)); ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (ioqueue->iocp == NULL) return PJ_RETURN_OS_ERROR(GetLastError()); + /* Create IOCP mutex */ rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { CloseHandle(ioqueue->iocp); @@ -303,6 +323,38 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, ioqueue->auto_delete_lock = PJ_TRUE; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* + * Create and initialize key pools. + */ + pj_list_init(&ioqueue->active_list); + pj_list_init(&ioqueue->free_list); + pj_list_init(&ioqueue->closing_list); + + /* Preallocate keys according to max_fd setting, and put them + * in free_list. + */ + for (i=0; iref_count); + if (rc != PJ_SUCCESS) { + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + CloseHandle(ioqueue->iocp); + return rc; + } + + pj_list_push_back(&ioqueue->free_list, key); + + } +#endif + *p_ioqueue = ioqueue; PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue)); @@ -315,19 +367,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) { unsigned i; + pj_ioqueue_key_t *key; PJ_CHECK_STACK(); PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); + pj_lock_acquire(ioqueue->lock); + +#if PJ_HAS_TCP /* Destroy events in the pool */ for (i=0; ievent_count; ++i) { CloseHandle(ioqueue->event_pool[i]); } ioqueue->event_count = 0; +#endif if (CloseHandle(ioqueue->iocp) != TRUE) return PJ_RETURN_OS_ERROR(GetLastError()); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Destroy reference counters */ + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } + + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + key = key->next; + } +#endif + if (ioqueue->auto_delete_lock) pj_lock_destroy(ioqueue->lock); @@ -370,28 +448,64 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL); - /* Build the key for this socket. */ + pj_lock_acquire(ioqueue->lock); + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* If safe unregistration is used, then get the key record from + * the free list. + */ + if (pj_list_empty(&ioqueue->free_list)) { + pj_lock_release(ioqueue->lock); + return PJ_ETOOMANY; + } + + rec = ioqueue->free_list.next; + pj_list_erase(rec); + + /* Set initial reference count to 1 */ + pj_assert(pj_atomic_get(rec->ref_count) == 0); + pj_atomic_inc(rec->ref_count); + + rec->closing = 0; + +#else rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); +#endif + + /* Build the key for this socket. */ rec->ioqueue = ioqueue; rec->hnd = (HANDLE)sock; rec->hnd_type = HND_IS_SOCKET; rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); +#if PJ_HAS_TCP + rec->connecting = 0; +#endif + /* Set socket to nonblocking. */ value = 1; rc = ioctlsocket(sock, FIONBIO, &value); if (rc != 0) { + pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(WSAGetLastError()); } /* Associate with IOCP */ hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0); if (!hioq) { + pj_lock_release(ioqueue->lock); return PJ_RETURN_OS_ERROR(GetLastError()); } *key = rec; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_list_push_back(&ioqueue->active_list, rec); +#endif + + pj_lock_release(ioqueue->lock); + return PJ_SUCCESS; } @@ -422,9 +536,31 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Decrement the key's reference counter, and when the counter reach zero, + * destroy the key. + */ +static void decrement_counter(pj_ioqueue_key_t *key) +{ + if (pj_atomic_dec_and_get(key->ref_count) == 0) { + + pj_lock_acquire(key->ioqueue->lock); + + pj_assert(key->closing == 1); + pj_gettimeofday(&key->free_time); + key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; + pj_time_val_normalize(&key->free_time); + + pj_list_erase(key); + pj_list_push_back(&key->ioqueue->closing_list, key); + + pj_lock_release(key->ioqueue->lock); + } +} +#endif /* - * Internal function to poll the I/O Completion Port, execute callback, + * Poll the I/O Completion Port, execute callback, * and return the key and bytes transfered of the last operation. */ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, @@ -457,16 +593,16 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (p_key) *p_key = key; - /* If size_status is POST_QUIT_LEN, mark the key as quitting */ - if (size_status == POST_QUIT_LEN) { - key->has_quit_signal = 1; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* We shouldn't call callbacks if key is quitting. */ + if (key->closing) return PJ_TRUE; - } - /* We shouldn't call callbacks if key is quitting. - * But this should have been taken care by unregister function - * (the unregister function should have cleared out the callbacks) + /* Increment reference counter to prevent this key from being + * deleted */ + pj_atomic_inc(key->ref_count); +#endif /* Carry out the callback */ switch (pOv->operation) { @@ -504,6 +640,11 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, pj_assert(0); break; } + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + decrement_counter(key); +#endif + return PJ_TRUE; } @@ -516,11 +657,6 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, */ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { - pj_ssize_t polled_len; - pj_ioqueue_key_t *polled_key; - generic_overlapped ov; - BOOL rc; - PJ_ASSERT_RETURN(key, PJ_EINVAL); #if PJ_HAS_TCP @@ -542,53 +678,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) pj_lock_release(ioqueue->lock); } #endif - - - /* Unregistering handle from IOCP is pretty tricky. - * - * Even after the socket has been closed, GetQueuedCompletionStatus - * may still return events for the handle. This will likely to - * cause crash in pjlib, because the key associated with the handle - * most likely will have been destroyed. - * - * The solution is to poll the IOCP until we're sure that there are - * no further events for the handle. + + /* Close handle (the only way to disassociate handle from IOCP). + * We also need to close handle to make sure that no further events + * will come to the handle. */ + CloseHandle(key->hnd); - /* Clear up callbacks for the key. - * We don't want the callback to be called for this key. - */ - key->cb.on_read_complete = NULL; - key->cb.on_write_complete = NULL; + /* Reset callbacks */ key->cb.on_accept_complete = NULL; key->cb.on_connect_complete = NULL; + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; - /* Init overlapped struct */ - pj_memset(&ov, 0, sizeof(ov)); - ov.operation = PJ_IOQUEUE_OP_READ; +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Mark key as closing. */ + key->closing = 1; - /* Post queued completion status with a special length. */ - rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN, - (DWORD)key, &ov.overlapped); + /* Decrement reference counter. */ + decrement_counter(key); - /* Poll IOCP until has_quit_signal is set in the key. - * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN - * is detected. We need to have this flag because POST_QUIT_LEN may be - * detected by other threads. + /* Even after handle is closed, I suspect that IOCP may still try to + * do something with the handle, causing memory corruption when pool + * debugging is enabled. + * + * Forcing context switch seems to have fixed that, but this is quite + * an ugly solution.. */ - do { - polled_len = 0; - polled_key = NULL; - - rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key); - - } while (rc && !key->has_quit_signal); - - - /* Close handle if this is a file. */ - if (key->hnd_type == HND_IS_FILE) { - CloseHandle(key->hnd); - } + pj_thread_sleep(0); +#endif return PJ_SUCCESS; } @@ -602,23 +720,57 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { DWORD dwMsec; int connect_count = 0; - pj_bool_t has_event; + int event_count = 0; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); - /* Check the connecting array. */ -#if PJ_HAS_TCP - connect_count = check_connecting(ioqueue); -#endif - /* Calculate miliseconds timeout for GetQueuedCompletionStatus */ dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE; /* Poll for completion status. */ - has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); + event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL); + +#if PJ_HAS_TCP + /* Check the connecting array, only when there's no activity. */ + if (event_count == 0) { + connect_count = check_connecting(ioqueue); + if (connect_count > 0) + event_count += connect_count; + } +#endif + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check the closing keys only when there's no activity and when there are + * pending closing keys. + */ + if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { + pj_time_val now; + pj_ioqueue_key_t *key; + + pj_gettimeofday(&now); + + /* Move closing keys to free list when they've finished the closing + * idle time. + */ + pj_lock_acquire(ioqueue->lock); + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = key->next; + + pj_assert(key->closing != 0); + + if (PJ_TIME_VAL_GTE(now, key->free_time)) { + pj_list_erase(key); + pj_list_push_back(&ioqueue->free_list, key); + } + key = next; + } + pj_lock_release(ioqueue->lock); + } +#endif /* Return number of events. */ - return connect_count + has_event; + return event_count; } /* @@ -645,6 +797,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; @@ -715,6 +873,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; op_key_rec->overlapped.wsabuf.buf = buffer; op_key_rec->overlapped.wsabuf.len = *length; @@ -799,7 +963,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL); - + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + op_key_rec = (union operation_key*)op_key->internal__; /* @@ -872,6 +1042,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + /* * See if there is a new connection immediately available. */ @@ -962,6 +1138,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, PJ_CHECK_STACK(); PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check key is not closing */ + if (key->closing) + return PJ_ECANCELLED; +#endif + /* Initiate connect() */ if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) { DWORD dwStatus; diff --git a/pjlib/src/pj/pool_dbg_win32.c b/pjlib/src/pj/pool_dbg_win32.c deleted file mode 100644 index a3ccf3ed..00000000 --- a/pjlib/src/pj/pool_dbg_win32.c +++ /dev/null @@ -1,237 +0,0 @@ -/* $Id$ */ -/* - * Copyright (C)2003-2006 Benny Prijono - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include - -/* Only if we ARE debugging memory allocations. */ -#if PJ_POOL_DEBUG - -#include -#include - -#include -#include -#define WIN32_LEAN_AND_MEAN -#include - -typedef struct memory_entry -{ - PJ_DECL_LIST_MEMBER(struct memory_entry) - void *ptr; - char *file; - int line; -} memory_entry; - -struct pj_pool_t -{ - char obj_name[32]; - HANDLE hHeap; - memory_entry first; - pj_size_t initial_size; - pj_size_t increment; - pj_size_t used_size; - char *file; - int line; -}; - -PJ_DEF(void) pj_pool_set_functions( void *(*malloc_func)(pj_size_t), - void (*free_func)(void *ptr, pj_size_t)) -{ - /* Ignored. */ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(malloc_func) - PJ_UNUSED_ARG(free_func) -} - -PJ_DEF(pj_pool_t*) pj_pool_create_dbg( const char *name, - pj_size_t initial_size, - pj_size_t increment_size, - pj_pool_callback *callback, - char *file, int line) -{ - pj_pool_t *pool; - HANDLE hHeap; - - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(callback) - - /* Create Win32 heap for the pool. */ - hHeap = HeapCreate(HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE, - initial_size, 0); - if (!hHeap) { - return NULL; - } - - - /* Create and initialize the pool structure. */ - pool = HeapAlloc(hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE, - sizeof(*pool)); - memset(pool, 0, sizeof(*pool)); - pool->file = file; - pool->line = line; - pool->hHeap = hHeap; - pool->initial_size = initial_size; - pool->increment = increment_size; - pool->used_size = 0; - - /* Set name. */ - if (name) { - if (strchr(name, '%') != NULL) { - sprintf(pool->obj_name, name, pool); - } else { - strncpy(pool->obj_name, name, PJ_MAX_OBJ_NAME); - } - } else { - pool->obj_name[0] = '\0'; - } - - /* List pool's entry. */ - pj_list_init(&pool->first); - - PJ_LOG(3,(pool->obj_name, "Pool created")); - return pool; -} - -PJ_DEF(void) pj_pool_destroy( pj_pool_t *pool ) -{ - memory_entry *entry; - - PJ_CHECK_STACK(); - - PJ_LOG(3,(pool->obj_name, "Destoying pool, init_size=%u, used=%u", - pool->initial_size, pool->used_size)); - - if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, pool)) { - PJ_LOG(2,(pool->obj_name, "Corrupted pool structure, allocated in %s:%d", - pool->file, pool->line)); - } - - /* Validate all memory entries in the pool. */ - for (entry=pool->first.next; entry != &pool->first; entry = entry->next) { - if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, entry)) { - PJ_LOG(2,(pool->obj_name, "Corrupted pool entry, allocated in %s:%d", - entry->file, entry->line)); - } - - if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, entry->ptr)) { - PJ_LOG(2,(pool->obj_name, "Corrupted pool memory, allocated in %s:%d", - entry->file, entry->line)); - } - } - - /* Destroy heap. */ - HeapDestroy(pool->hHeap); -} - -PJ_DEF(void) pj_pool_reset( pj_pool_t *pool ) -{ - /* Do nothing. */ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool) -} - -PJ_DEF(pj_size_t) pj_pool_get_capacity( pj_pool_t *pool ) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool) - return 0; -} - -PJ_DEF(pj_size_t) pj_pool_get_used_size( pj_pool_t *pool ) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool) - return 0; -} - -PJ_DEF(pj_size_t) pj_pool_get_request_count( pj_pool_t *pool ) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool) - return 0; -} - -PJ_DEF(void*) pj_pool_alloc_dbg( pj_pool_t *pool, pj_size_t size, - char *file, int line) -{ - memory_entry *entry; - int entry_size; - - PJ_CHECK_STACK(); - - entry_size = sizeof(*entry); - entry = HeapAlloc(pool->hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE, - entry_size); - entry->file = file; - entry->line = line; - entry->ptr = HeapAlloc(pool->hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE, - size); - pj_list_insert_before( &pool->first, entry); - - pool->used_size += size; - return entry->ptr; -} - -PJ_DEF(void*) pj_pool_calloc_dbg( pj_pool_t *pool, pj_size_t count, pj_size_t elem, - char *file, int line) -{ - void *ptr; - - PJ_CHECK_STACK(); - - ptr = pj_pool_alloc_dbg(pool, count*elem, file, line); - memset(ptr, 0, count*elem); - return ptr; -} - - -PJ_DEF(void) pj_pool_pool_init( pj_pool_pool_t *pool_pool, - pj_size_t max_capacity) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool_pool) - PJ_UNUSED_ARG(max_capacity) -} - -PJ_DEF(void) pj_pool_pool_destroy( pj_pool_pool_t *pool_pool ) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool_pool) -} - -PJ_DEF(pj_pool_t*) pj_pool_pool_create_pool( pj_pool_pool_t *pool_pool, - const char *name, - pj_size_t initial_size, - pj_size_t increment_size, - pj_pool_callback *callback) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool_pool) - return pj_pool_create(name, initial_size, increment_size, callback); -} - -PJ_DEF(void) pj_pool_pool_release_pool( pj_pool_pool_t *pool_pool, - pj_pool_t *pool ) -{ - PJ_CHECK_STACK(); - PJ_UNUSED_ARG(pool_pool) - pj_pool_destroy(pool); -} - - -#endif /* PJ_POOL_DEBUG */ diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index bf0e6273..fea9c184 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -398,8 +398,6 @@ static int perform_test(int sock_type, const char *type_name, for (i=0; i + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" + +#if INCLUDE_IOQUEUE_UNREG_TEST +/* + * This tests the thread safety of ioqueue unregistration operation. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + + +#define THIS_FILE "ioq_unreg.c" + + +enum test_method +{ + UNREGISTER_IN_APP, + UNREGISTER_IN_CALLBACK, +}; + +static int thread_quitting; +static enum test_method test_method; +static pj_time_val time_to_unregister; + +struct sock_data +{ + pj_sock_t sock; + pj_sock_t csock; + pj_pool_t *pool; + pj_ioqueue_key_t *key; + pj_mutex_t *mutex; + pj_ioqueue_op_key_t *op_key; + char *buffer; + pj_size_t bufsize; + pj_bool_t unregistered; + unsigned received; +} sock_data; + +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + pj_ssize_t size; + char *sendbuf = "Hello world"; + pj_status_t status; + + if (sock_data.unregistered) + return; + + pj_mutex_lock(sock_data.mutex); + + if (sock_data.unregistered) { + /* No need to unlock. Mutex may have been destroyed */ + return; + } + + if (bytes_read < 0) { + if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK)) + app_perror("ioqueue reported recv error", -bytes_read); + } else { + sock_data.received += bytes_read; + } + + if (test_method == UNREGISTER_IN_CALLBACK) { + pj_time_val now; + + pj_gettimeofday(&now); + if (PJ_TIME_VAL_GTE(now, time_to_unregister)) { + sock_data.unregistered = 1; + pj_ioqueue_unregister(key); + pj_mutex_destroy(sock_data.mutex); + pj_pool_release(sock_data.pool); + sock_data.pool = NULL; + return; + } + } + + do { + size = sock_data.bufsize; + status = pj_ioqueue_recv(key, op_key, sock_data.buffer, &size, 0); + if (status != PJ_EPENDING && status != PJ_SUCCESS) + app_perror("recv() error", status); + + } while (status == PJ_SUCCESS); + + pj_mutex_unlock(sock_data.mutex); + + size = pj_ansi_strlen(sendbuf); + status = pj_sock_send(sock_data.csock, sendbuf, &size, 0); + if (status != PJ_SUCCESS) + app_perror("send() error", status); + + size = pj_ansi_strlen(sendbuf); + status = pj_sock_send(sock_data.csock, sendbuf, &size, 0); + if (status != PJ_SUCCESS) + app_perror("send() error", status); + +} + +static int worker_thread(void *arg) +{ + pj_ioqueue_t *ioqueue = arg; + + while (!thread_quitting) { + pj_time_val timeout = { 0, 20 }; + pj_ioqueue_poll(ioqueue, &timeout); + } + + return 0; +} + +/* + * Perform unregistration test. + * + * This will create ioqueue and register a server socket. Depending + * on the test method, either the callback or the main thread will + * unregister and destroy the server socket after some period of time. + */ +static int perform_unreg_test(pj_ioqueue_t *ioqueue, + pj_pool_t *test_pool, + const char *title, + pj_bool_t other_socket) +{ + enum { WORKER_CNT = 1, MSEC = 500, QUIT_MSEC = 500 }; + int i; + pj_thread_t *thread[WORKER_CNT]; + struct sock_data osd; + pj_ioqueue_callback callback; + pj_time_val end_time; + pj_status_t status; + + + /* Sometimes its important to have other sockets registered to + * the ioqueue, because when no sockets are registered, the ioqueue + * will return from the poll early. + */ + if (other_socket) { + status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, 56127, &osd.sock); + if (status != PJ_SUCCESS) { + app_perror("Error creating other socket", status); + return -12; + } + + pj_memset(&callback, 0, sizeof(callback)); + status = pj_ioqueue_register_sock(test_pool, ioqueue, osd.sock, + NULL, &callback, &osd.key); + if (status != PJ_SUCCESS) { + app_perror("Error registering other socket", status); + return -13; + } + + } else { + osd.key = NULL; + osd.sock = PJ_INVALID_SOCKET; + } + + /* Init both time duration of testing */ + thread_quitting = 0; + pj_gettimeofday(&time_to_unregister); + time_to_unregister.msec += MSEC; + pj_time_val_normalize(&time_to_unregister); + + end_time = time_to_unregister; + end_time.msec += QUIT_MSEC; + pj_time_val_normalize(&end_time); + + + /* Create polling thread */ + for (i=0; i