diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 214 |
1 files changed, 89 insertions, 125 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 30e2602e..b128d810 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -27,25 +27,10 @@ * This file is NOT supposed to be compiled as stand-alone source. */ -static long ioqueue_tls_id = -1; - -typedef struct key_lock_data { - struct key_lock_data *prev; - pj_ioqueue_key_t *key; - int is_alive; -} key_lock_data; - - static void ioqueue_init( pj_ioqueue_t *ioqueue ) { ioqueue->lock = NULL; ioqueue->auto_delete_lock = 0; - - if (ioqueue_tls_id == -1) { - pj_status_t status; - status = pj_thread_local_alloc(&ioqueue_tls_id); - pj_thread_local_set(ioqueue_tls_id, NULL); - } } static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue) @@ -93,11 +78,20 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, pj_list_init(&key->write_list); #if PJ_HAS_TCP pj_list_init(&key->accept_list); + key->connecting = 0; #endif /* Save callback. */ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback)); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Set initial reference count to 1 */ + pj_assert(key->ref_count == 0); + ++key->ref_count; + + key->closing = 0; +#endif + /* Get socket type. When socket type is datagram, some optimization * will be performed during send to allow parallel send operations. */ @@ -107,68 +101,14 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, if (rc != PJ_SUCCESS) key->fd_type = PJ_SOCK_STREAM; - key->inside_callback = 0; - key->destroy_requested = 0; - /* Create mutex for the key. */ - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); +#if !PJ_IOQUEUE_HAS_SAFE_UNREG + rc = pj_mutex_create_simple(pool, NULL, &key->mutex); +#endif return rc; } -/* Lock the key and also keep the lock data in thread local storage. - * The lock data is used to detect if the key is deleted by application - * when we call its callback. - */ -static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck) -{ - struct key_lock_data *prev_data; - - pj_mutex_lock(key->mutex); - prev_data = (struct key_lock_data *) - pj_thread_local_get(ioqueue_tls_id); - lck->prev = prev_data; - lck->key = key; - lck->is_alive = 1; - pj_thread_local_set(ioqueue_tls_id, lck); -} - -/* Unlock the key only if it is still valid. */ -static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck) -{ - pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck); - pj_assert( lck->key == key ); - pj_thread_local_set(ioqueue_tls_id, lck->prev); - if (lck->is_alive) - pj_mutex_unlock(key->mutex); -} - -/* Destroy key */ -static void ioqueue_destroy_key( pj_ioqueue_key_t *key ) -{ - key_lock_data *lck; - - /* Make sure that no other threads are doing something with - * the key. - */ - pj_mutex_lock(key->mutex); - - /* Check if this function is called within a callback context. - * If so, then we need to inform the callback that the key has - * been destroyed so that it doesn't attempt to unlock the - * key's mutex. - */ - lck = pj_thread_local_get(ioqueue_tls_id); - while (lck) { - if (lck->key == key) { - lck->is_alive = 0; - } - lck = lck->prev; - } - - pj_mutex_destroy(key->mutex); -} - /* * pj_ioqueue_get_user_data() * @@ -221,6 +161,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +# define IS_CLOSING(key) (key->closing) +#else +# define IS_CLOSING(key) (0) +#endif + + /* * ioqueue_dispatch_event() * @@ -229,10 +176,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) */ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { - key_lock_data lck_data; - /* Lock the key. */ - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); + + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 if (h->connecting) { @@ -245,8 +195,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //pj_mutex_unlock(h->mutex); #if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) /* from connect(2): @@ -293,8 +241,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } #endif + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_connect_complete) + if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, bytes_transfered); /* Done. */ @@ -319,7 +270,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - //pj_mutex_unlock(h->mutex); } /* Send the data. @@ -365,19 +315,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); - /* No need to hold mutex anymore */ - //pj_mutex_unlock(h->mutex); } + /* No need to hold mutex anymore */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_write_complete) { + if (h->cb.on_write_complete && !IS_CLOSING(h)) { (*h->cb.on_write_complete)(h, (pj_ioqueue_op_key_t*)write_op, write_op->written); } } else { - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } /* Done. */ @@ -387,20 +338,21 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * are signalled for the same event, but only one thread eventually * able to process the event. */ - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } - - /* Finally unlock key */ - unlock_key(h, &lck_data); } void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { - key_lock_data lck_data; pj_status_t rc; /* Lock the key. */ - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); + + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } # if PJ_HAS_TCP if (!pj_list_empty(&h->accept_list)) { @@ -416,9 +368,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (pj_list_empty(&h->accept_list)) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //pj_mutex_unlock(h->mutex); - rc=pj_sock_accept(h->fd, accept_op->accept_fd, accept_op->rmt_addr, accept_op->addrlen); if (rc==PJ_SUCCESS && accept_op->local_addr) { @@ -427,8 +376,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) accept_op->addrlen); } + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_accept_complete) { + if (h->cb.on_accept_complete && !IS_CLOSING(h)) { (*h->cb.on_accept_complete)(h, (pj_ioqueue_op_key_t*)accept_op, *accept_op->accept_fd, rc); @@ -449,11 +401,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (pj_list_empty(&h->read_list)) ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT); - /* Unlock; from this point we don't need to hold key's mutex. */ - //Crash as of revision 353 (since we added pjmedia socket to - //main ioqueue). - //pj_mutex_unlock(h->mutex); - bytes_read = read_op->size; if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) { @@ -516,8 +463,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read = -rc; } + /* Unlock; from this point we don't need to hold key's mutex. */ + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_read_complete) { + if (h->cb.on_read_complete && !IS_CLOSING(h)) { (*h->cb.on_read_complete)(h, (pj_ioqueue_op_key_t*)read_op, bytes_read); @@ -529,44 +479,41 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * are signalled for the same event, but only one thread eventually * able to process the event. */ - //pj_mutex_unlock(h->mutex); + pj_mutex_unlock(h->mutex); } - - /* Unlock handle. */ - unlock_key(h, &lck_data); } void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { - key_lock_data lck_data; - - lock_key(h, &lck_data); + pj_mutex_lock(h->mutex); if (!h->connecting) { /* It is possible that more than one thread was woken up, thus * the remaining thread will see h->connecting as zero because * it has been processed by other thread. */ - //pj_mutex_unlock(h->mutex); - unlock_key(h, &lck_data); + pj_mutex_unlock(h->mutex); return; } + if (h->closing) { + pj_mutex_unlock(h->mutex); + return; + } + /* Clear operation. */ h->connecting = 0; - //pj_mutex_unlock(h->mutex); - ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT); + pj_mutex_unlock(h->mutex); + /* Call callback. */ - if (h->cb.on_connect_complete) + if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, -1); - - unlock_key(h, &lck_data); } /* @@ -588,6 +535,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, read_op = (struct read_operation*)op_key; read_op->op = 0; + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + /* Try to see if there's data immediately available. */ if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) { @@ -646,6 +597,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + read_op = (struct read_operation*)op_key; read_op->op = 0; @@ -710,6 +665,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + write_op = (struct write_operation*)op_key; write_op->op = 0; @@ -788,6 +747,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL); PJ_CHECK_STACK(); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + write_op = (struct write_operation*)op_key; write_op->op = 0; @@ -869,6 +832,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + accept_op = (struct accept_operation*)op_key; accept_op->op = 0; @@ -930,6 +897,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, /* check parameters. All must be specified! */ PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL); + /* Check if key is closing. */ + if (key->closing) + return PJ_ECANCELLED; + /* Check if socket has not been marked for connecting */ if (key->connecting != 0) return PJ_EPENDING; @@ -986,13 +957,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, pj_ssize_t bytes_status ) { struct generic_operation *op_rec; - key_lock_data lck_data; /* * Find the operation key in all pending operation list to * really make sure that it's still there; then call the callback. */ - lock_key(key, &lck_data); + pj_mutex_lock(key->mutex); /* Find the operation in the pending read list. */ op_rec = (struct generic_operation*)key->read_list.next; @@ -1000,11 +970,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_read_complete)(key, op_key, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -1016,11 +984,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_write_complete)(key, op_key, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; @@ -1032,19 +998,17 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = 0; - //pj_mutex_unlock(key->mutex); + pj_mutex_unlock(key->mutex); (*key->cb.on_accept_complete)(key, op_key, PJ_INVALID_SOCKET, bytes_status); - - unlock_key(key, &lck_data); return PJ_SUCCESS; } op_rec = op_rec->next; } - unlock_key(key, &lck_data); + pj_mutex_unlock(key->mutex); return PJ_EINVALIDOP; } |