summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-25 10:06:00 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-25 10:06:00 +0000
commite985f59aabf893535e0d12c5867e44d69ccb83fe (patch)
tree49cb9c58fe740d576c5b2f61a13bdb595397cdb9 /pjlib
parentd40f548adc55a8c79e1ff21f349535ac2f921895 (diff)
Fixed bug in ioqueue: crashed when key is unregistered while another thread is running a callback
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@363 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/build/pjlib.dsp9
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c127
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h2
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c3
4 files changed, 112 insertions, 29 deletions
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp
index f336459f..7aa56669 100644
--- a/pjlib/build/pjlib.dsp
+++ b/pjlib/build/pjlib.dsp
@@ -233,18 +233,11 @@ SOURCE=..\src\pj\ioqueue_common_abs.h
# Begin Source File
SOURCE=..\src\pj\ioqueue_select.c
-# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
SOURCE=..\src\pj\ioqueue_winnt.c
-
-!IF "$(CFG)" == "pjlib - Win32 Release"
-
-!ELSEIF "$(CFG)" == "pjlib - Win32 Debug"
-
-!ENDIF
-
+# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 168b4f1e..30e2602e 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -27,10 +27,25 @@
* This file is NOT supposed to be compiled as stand-alone source.
*/
+static long ioqueue_tls_id = -1;
+
+typedef struct key_lock_data {
+ struct key_lock_data *prev;
+ pj_ioqueue_key_t *key;
+ int is_alive;
+} key_lock_data;
+
+
static void ioqueue_init( pj_ioqueue_t *ioqueue )
{
ioqueue->lock = NULL;
ioqueue->auto_delete_lock = 0;
+
+ if (ioqueue_tls_id == -1) {
+ pj_status_t status;
+ status = pj_thread_local_alloc(&ioqueue_tls_id);
+ pj_thread_local_set(ioqueue_tls_id, NULL);
+ }
}
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
@@ -92,14 +107,65 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
if (rc != PJ_SUCCESS)
key->fd_type = PJ_SOCK_STREAM;
+ key->inside_callback = 0;
+ key->destroy_requested = 0;
+
/* Create mutex for the key. */
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+ rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
return rc;
}
+/* Lock the key and also keep the lock data in thread local storage.
+ * The lock data is used to detect if the key is deleted by application
+ * when we call its callback.
+ */
+static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
+{
+ struct key_lock_data *prev_data;
+
+ pj_mutex_lock(key->mutex);
+ prev_data = (struct key_lock_data *)
+ pj_thread_local_get(ioqueue_tls_id);
+ lck->prev = prev_data;
+ lck->key = key;
+ lck->is_alive = 1;
+ pj_thread_local_set(ioqueue_tls_id, lck);
+}
+
+/* Unlock the key only if it is still valid. */
+static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
+{
+ pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);
+ pj_assert( lck->key == key );
+ pj_thread_local_set(ioqueue_tls_id, lck->prev);
+ if (lck->is_alive)
+ pj_mutex_unlock(key->mutex);
+}
+
+/* Destroy key */
static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
{
+ key_lock_data *lck;
+
+ /* Make sure that no other threads are doing something with
+ * the key.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Check if this function is called within a callback context.
+ * If so, then we need to inform the callback that the key has
+ * been destroyed so that it doesn't attempt to unlock the
+ * key's mutex.
+ */
+ lck = pj_thread_local_get(ioqueue_tls_id);
+ while (lck) {
+ if (lck->key == key) {
+ lck->is_alive = 0;
+ }
+ lck = lck->prev;
+ }
+
pj_mutex_destroy(key->mutex);
}
@@ -163,8 +229,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
*/
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
+ key_lock_data lck_data;
+
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ lock_key(h, &lck_data);
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
if (h->connecting) {
@@ -178,7 +246,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
/* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
/* from connect(2):
@@ -251,7 +319,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
}
/* Send the data.
@@ -298,7 +366,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
/* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
}
/* Call callback. */
@@ -309,7 +377,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
} else {
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
}
/* Done. */
@@ -319,16 +387,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
}
+
+ /* Finally unlock key */
+ unlock_key(h, &lck_data);
}
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
{
+ key_lock_data lck_data;
pj_status_t rc;
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ lock_key(h, &lck_data);
# if PJ_HAS_TCP
if (!pj_list_empty(&h->accept_list)) {
@@ -345,7 +417,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
/* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
@@ -378,7 +450,9 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
/* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ //Crash as of revision 353 (since we added pjmedia socket to
+ //main ioqueue).
+ //pj_mutex_unlock(h->mutex);
bytes_read = read_op->size;
@@ -455,29 +529,35 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
}
+
+ /* Unlock handle. */
+ unlock_key(h, &lck_data);
}
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
- pj_mutex_lock(h->mutex);
+ key_lock_data lck_data;
+
+ lock_key(h, &lck_data);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
+ unlock_key(h, &lck_data);
return;
}
/* Clear operation. */
h->connecting = 0;
- pj_mutex_unlock(h->mutex);
+ //pj_mutex_unlock(h->mutex);
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
@@ -485,6 +565,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
/* Call callback. */
if (h->cb.on_connect_complete)
(*h->cb.on_connect_complete)(h, -1);
+
+ unlock_key(h, &lck_data);
}
/*
@@ -904,12 +986,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ssize_t bytes_status )
{
struct generic_operation *op_rec;
+ key_lock_data lck_data;
/*
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
- pj_mutex_lock(key->mutex);
+ lock_key(key, &lck_data);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
@@ -917,9 +1000,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
+ //pj_mutex_unlock(key->mutex);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
+
+ unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -931,9 +1016,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
+ //pj_mutex_unlock(key->mutex);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
+
+ unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -945,17 +1032,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- pj_mutex_unlock(key->mutex);
+ //pj_mutex_unlock(key->mutex);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
bytes_status);
+
+ unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
- pj_mutex_unlock(key->mutex);
+ unlock_key(key, &lck_data);
return PJ_EINVALIDOP;
}
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index d75c2d0a..1ca70aa8 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -91,6 +91,8 @@ union operation_key
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
pj_ioqueue_t *ioqueue; \
pj_mutex_t *mutex; \
+ pj_bool_t inside_callback; \
+ pj_bool_t destroy_requested; \
pj_sock_t fd; \
int fd_type; \
void *user_data; \
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index e5453f27..bf0e6273 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -391,7 +391,6 @@ static int perform_test(int sock_type, const char *type_name,
for (i=0; i<thread_cnt; ++i) {
TRACE_((THIS_FILE, " join thread %d..", i));
pj_thread_join(thread[i]);
- pj_thread_destroy(thread[i]);
}
/* Close all sockets. */
@@ -428,7 +427,7 @@ static int perform_test(int sock_type, const char *type_name,
*p_bandwidth = (pj_uint32_t)bandwidth;
- PJ_LOG(3,(THIS_FILE, " %.4s %d %d %8d KB/s",
+ PJ_LOG(3,(THIS_FILE, " %.4s %2d %2d %8d KB/s",
type_name, thread_cnt, sockpair_cnt,
*p_bandwidth));