summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_common_abs.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-03-30 16:32:18 +0000
committerBenny Prijono <bennylp@teluu.com>2006-03-30 16:32:18 +0000
commit974fbe67d6d62efadd129cc81b9072faf3b2f029 (patch)
tree82a44cd7c10d447766280047e035928166833348 /pjlib/src/pj/ioqueue_common_abs.c
parent3cf609b42e573adf8e7183070176a450a7b4959e (diff)
Fixed race condition bug in ioqueue unregistration for select and Win32 IOCP backend
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@365 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c214
1 files changed, 89 insertions, 125 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 30e2602e..b128d810 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -27,25 +27,10 @@
* This file is NOT supposed to be compiled as stand-alone source.
*/
-static long ioqueue_tls_id = -1;
-
-typedef struct key_lock_data {
- struct key_lock_data *prev;
- pj_ioqueue_key_t *key;
- int is_alive;
-} key_lock_data;
-
-
static void ioqueue_init( pj_ioqueue_t *ioqueue )
{
ioqueue->lock = NULL;
ioqueue->auto_delete_lock = 0;
-
- if (ioqueue_tls_id == -1) {
- pj_status_t status;
- status = pj_thread_local_alloc(&ioqueue_tls_id);
- pj_thread_local_set(ioqueue_tls_id, NULL);
- }
}
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
@@ -93,11 +78,20 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_list_init(&key->write_list);
#if PJ_HAS_TCP
pj_list_init(&key->accept_list);
+ key->connecting = 0;
#endif
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Set initial reference count to 1 */
+ pj_assert(key->ref_count == 0);
+ ++key->ref_count;
+
+ key->closing = 0;
+#endif
+
/* Get socket type. When socket type is datagram, some optimization
* will be performed during send to allow parallel send operations.
*/
@@ -107,68 +101,14 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
if (rc != PJ_SUCCESS)
key->fd_type = PJ_SOCK_STREAM;
- key->inside_callback = 0;
- key->destroy_requested = 0;
-
/* Create mutex for the key. */
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+#if !PJ_IOQUEUE_HAS_SAFE_UNREG
+ rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+#endif
return rc;
}
-/* Lock the key and also keep the lock data in thread local storage.
- * The lock data is used to detect if the key is deleted by application
- * when we call its callback.
- */
-static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
- struct key_lock_data *prev_data;
-
- pj_mutex_lock(key->mutex);
- prev_data = (struct key_lock_data *)
- pj_thread_local_get(ioqueue_tls_id);
- lck->prev = prev_data;
- lck->key = key;
- lck->is_alive = 1;
- pj_thread_local_set(ioqueue_tls_id, lck);
-}
-
-/* Unlock the key only if it is still valid. */
-static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
- pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);
- pj_assert( lck->key == key );
- pj_thread_local_set(ioqueue_tls_id, lck->prev);
- if (lck->is_alive)
- pj_mutex_unlock(key->mutex);
-}
-
-/* Destroy key */
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- key_lock_data *lck;
-
- /* Make sure that no other threads are doing something with
- * the key.
- */
- pj_mutex_lock(key->mutex);
-
- /* Check if this function is called within a callback context.
- * If so, then we need to inform the callback that the key has
- * been destroyed so that it doesn't attempt to unlock the
- * key's mutex.
- */
- lck = pj_thread_local_get(ioqueue_tls_id);
- while (lck) {
- if (lck->key == key) {
- lck->is_alive = 0;
- }
- lck = lck->prev;
- }
-
- pj_mutex_destroy(key->mutex);
-}
-
/*
* pj_ioqueue_get_user_data()
*
@@ -221,6 +161,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+# define IS_CLOSING(key) (key->closing)
+#else
+# define IS_CLOSING(key) (0)
+#endif
+
+
/*
* ioqueue_dispatch_event()
*
@@ -229,10 +176,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
*/
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
- key_lock_data lck_data;
-
/* Lock the key. */
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
+
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
if (h->connecting) {
@@ -245,8 +195,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //pj_mutex_unlock(h->mutex);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
/* from connect(2):
@@ -293,8 +241,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
#endif
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_connect_complete)
+ if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, bytes_transfered);
/* Done. */
@@ -319,7 +270,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- //pj_mutex_unlock(h->mutex);
}
/* Send the data.
@@ -365,19 +315,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- /* No need to hold mutex anymore */
- //pj_mutex_unlock(h->mutex);
}
+ /* No need to hold mutex anymore */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_write_complete) {
+ if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
} else {
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
/* Done. */
@@ -387,20 +338,21 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
-
- /* Finally unlock key */
- unlock_key(h, &lck_data);
}
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
{
- key_lock_data lck_data;
pj_status_t rc;
/* Lock the key. */
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
+
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
# if PJ_HAS_TCP
if (!pj_list_empty(&h->accept_list)) {
@@ -416,9 +368,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (pj_list_empty(&h->accept_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //pj_mutex_unlock(h->mutex);
-
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
if (rc==PJ_SUCCESS && accept_op->local_addr) {
@@ -427,8 +376,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
accept_op->addrlen);
}
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_accept_complete) {
+ if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
(*h->cb.on_accept_complete)(h,
(pj_ioqueue_op_key_t*)accept_op,
*accept_op->accept_fd, rc);
@@ -449,11 +401,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (pj_list_empty(&h->read_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //Crash as of revision 353 (since we added pjmedia socket to
- //main ioqueue).
- //pj_mutex_unlock(h->mutex);
-
bytes_read = read_op->size;
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
@@ -516,8 +463,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read = -rc;
}
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_read_complete) {
+ if (h->cb.on_read_complete && !IS_CLOSING(h)) {
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
@@ -529,44 +479,41 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
-
- /* Unlock handle. */
- unlock_key(h, &lck_data);
}
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
- key_lock_data lck_data;
-
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
- //pj_mutex_unlock(h->mutex);
- unlock_key(h, &lck_data);
+ pj_mutex_unlock(h->mutex);
return;
}
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
+
/* Clear operation. */
h->connecting = 0;
- //pj_mutex_unlock(h->mutex);
-
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_connect_complete)
+ if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, -1);
-
- unlock_key(h, &lck_data);
}
/*
@@ -588,6 +535,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
read_op = (struct read_operation*)op_key;
read_op->op = 0;
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
/* Try to see if there's data immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
@@ -646,6 +597,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
read_op = (struct read_operation*)op_key;
read_op->op = 0;
@@ -710,6 +665,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
write_op = (struct write_operation*)op_key;
write_op->op = 0;
@@ -788,6 +747,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
write_op = (struct write_operation*)op_key;
write_op->op = 0;
@@ -869,6 +832,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
accept_op = (struct accept_operation*)op_key;
accept_op->op = 0;
@@ -930,6 +897,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
/* Check if socket has not been marked for connecting */
if (key->connecting != 0)
return PJ_EPENDING;
@@ -986,13 +957,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ssize_t bytes_status )
{
struct generic_operation *op_rec;
- key_lock_data lck_data;
/*
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
- lock_key(key, &lck_data);
+ pj_mutex_lock(key->mutex);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
@@ -1000,11 +970,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -1016,11 +984,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -1032,19 +998,17 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
- unlock_key(key, &lck_data);
+ pj_mutex_unlock(key->mutex);
return PJ_EINVALIDOP;
}