From e985f59aabf893535e0d12c5867e44d69ccb83fe Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sat, 25 Mar 2006 10:06:00 +0000 Subject: Fixed bug in ioqueue: crashed when key is unregistered while another thread is running a callback git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@363 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/build/pjlib.dsp | 9 +-- pjlib/src/pj/ioqueue_common_abs.c | 127 ++++++++++++++++++++++++++++++++------ pjlib/src/pj/ioqueue_common_abs.h | 2 + pjlib/src/pjlib-test/ioq_perf.c | 3 +- 4 files changed, 112 insertions(+), 29 deletions(-) (limited to 'pjlib') diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp index f336459f..7aa56669 100644 --- a/pjlib/build/pjlib.dsp +++ b/pjlib/build/pjlib.dsp @@ -233,18 +233,11 @@ SOURCE=..\src\pj\ioqueue_common_abs.h # Begin Source File SOURCE=..\src\pj\ioqueue_select.c -# PROP Exclude_From_Build 1 # End Source File # Begin Source File SOURCE=..\src\pj\ioqueue_winnt.c - -!IF "$(CFG)" == "pjlib - Win32 Release" - -!ELSEIF "$(CFG)" == "pjlib - Win32 Debug" - -!ENDIF - +# PROP Exclude_From_Build 1 # End Source File # Begin Source File diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 168b4f1e..30e2602e 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -27,10 +27,25 @@ * This file is NOT supposed to be compiled as stand-alone source. */ +static long ioqueue_tls_id = -1; + +typedef struct key_lock_data { + struct key_lock_data *prev; + pj_ioqueue_key_t *key; + int is_alive; +} key_lock_data; + + static void ioqueue_init( pj_ioqueue_t *ioqueue ) { ioqueue->lock = NULL; ioqueue->auto_delete_lock = 0; + + if (ioqueue_tls_id == -1) { + pj_status_t status; + status = pj_thread_local_alloc(&ioqueue_tls_id); + pj_thread_local_set(ioqueue_tls_id, NULL); + } } static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) @@ -92,14 +107,65 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, if (rc != PJ_SUCCESS) key->fd_type = PJ_SOCK_STREAM; + key->inside_callback = 0; + key->destroy_requested = 0; + /* Create mutex for the key. */ - rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); return rc; } +/* Lock the key and also keep the lock data in thread local storage. + * The lock data is used to detect if the key is deleted by application + * when we call its callback. + */ +static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) +{ + struct key_lock_data *prev_data; + + pj_mutex_lock(key->mutex); + prev_data = (struct key_lock_data *) + pj_thread_local_get(ioqueue_tls_id); + lck->prev = prev_data; + lck->key = key; + lck->is_alive = 1; + pj_thread_local_set(ioqueue_tls_id, lck); +} + +/* Unlock the key only if it is still valid. */ +static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) +{ + pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); + pj_assert( lck->key == key ); + pj_thread_local_set(ioqueue_tls_id, lck->prev); + if (lck->is_alive) + pj_mutex_unlock(key->mutex); +} + +/* Destroy key */ static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) { + key_lock_data *lck; + + /* Make sure that no other threads are doing something with + * the key. + */ + pj_mutex_lock(key->mutex); + + /* Check if this function is called within a callback context. + * If so, then we need to inform the callback that the key has + * been destroyed so that it doesn't attempt to unlock the + * key's mutex. + */ + lck = pj_thread_local_get(ioqueue_tls_id); + while (lck) { + if (lck->key == key) { + lck->is_alive = 0; + } + lck = lck->prev; + } + pj_mutex_destroy(key->mutex); } @@ -163,8 +229,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) */ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { + key_lock_data lck_data; + /* Lock the key. */ - pj_mutex_lock(h->mutex); + lock_key(h, &lck_data); #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 if (h->connecting) { @@ -178,7 +246,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) /* from connect(2): @@ -251,7 +319,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); } /* Send the data. @@ -298,7 +366,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); /* No need to hold mutex anymore */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); } /* Call callback. */ @@ -309,7 +377,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } } else { - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); } /* Done. */ @@ -319,16 +387,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); } + + /* Finally unlock key */ + unlock_key(h, &lck_data); } void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { + key_lock_data lck_data; pj_status_t rc; /* Lock the key. */ - pj_mutex_lock(h->mutex); + lock_key(h, &lck_data); # if PJ_HAS_TCP if (!pj_list_empty(&h->accept_list)) { @@ -345,7 +417,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); rc=pj_sock_accept(h->fd, accept_op->accept_fd, accept_op->rmt_addr, accept_op->addrlen); @@ -378,7 +450,9 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + //Crash as of revision 353 (since we added pjmedia socket to + //main ioqueue). + //pj_mutex_unlock(h->mutex); bytes_read = read_op->size; @@ -455,29 +529,35 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); } + + /* Unlock handle. */ + unlock_key(h, &lck_data); } void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { - pj_mutex_lock(h->mutex); + key_lock_data lck_data; + + lock_key(h, &lck_data); if (!h->connecting) { /* It is possible that more than one thread was woken up, thus * the remaining thread will see h->connecting as zero because * it has been processed by other thread. */ - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); + unlock_key(h, &lck_data); return; } /* Clear operation. */ h->connecting = 0; - pj_mutex_unlock(h->mutex); + //pj_mutex_unlock(h->mutex); ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); @@ -485,6 +565,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, /* Call callback. */ if (h->cb.on_connect_complete) (*h->cb.on_connect_complete)(h, -1); + + unlock_key(h, &lck_data); } /* @@ -904,12 +986,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, pj_ssize_t bytes_status ) { struct generic_operation *op_rec; + key_lock_data lck_data; /* * Find the operation key in all pending operation list to * really make sure that it's still there; then call the callback. */ - pj_mutex_lock(key->mutex); + lock_key(key, &lck_data); /* Find the operation in the pending read list. */ op_rec = (struct generic_operation*)key->read_list.next; @@ -917,9 +1000,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - pj_mutex_unlock(key->mutex); + //pj_mutex_unlock(key->mutex); (*key->cb.on_read_complete)(key, op_key, bytes_status); + + unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -931,9 +1016,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - pj_mutex_unlock(key->mutex); + //pj_mutex_unlock(key->mutex); (*key->cb.on_write_complete)(key, op_key, bytes_status); + + unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -945,17 +1032,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - pj_mutex_unlock(key->mutex); + //pj_mutex_unlock(key->mutex); (*key->cb.on_accept_complete)(key, op_key, PJ_INVALID_SOCKET, bytes_status); + + unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; } - pj_mutex_unlock(key->mutex); + unlock_key(key, &lck_data); return PJ_EINVALIDOP; } diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index d75c2d0a..1ca70aa8 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -91,6 +91,8 @@ union operation_key PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \ pj_ioqueue_t *ioqueue; \ pj_mutex_t *mutex; \ + pj_bool_t inside_callback; \ + pj_bool_t destroy_requested; \ pj_sock_t fd; \ int fd_type; \ void *user_data; \ diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index e5453f27..bf0e6273 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -391,7 +391,6 @@ static int perform_test(int sock_type, const char *type_name, for (i=0; i