diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 114 |
1 files changed, 67 insertions, 47 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 296aea90..a5dc3f95 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -70,6 +70,7 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, pj_sock_t sock, + pj_grp_lock_t *grp_lock, void *user_data, const pj_ioqueue_callback *cb) { @@ -114,10 +115,18 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, /* Create mutex for the key. */ #if !PJ_IOQUEUE_HAS_SAFE_UNREG - rc = pj_mutex_create_simple(pool, NULL, &key->mutex); + rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock); #endif + if (rc != PJ_SUCCESS) + return rc; + + /* Group lock */ + key->grp_lock = grp_lock; + if (key->grp_lock) { + pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0); + } - return rc; + return PJ_SUCCESS; } /* @@ -189,10 +198,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) { /* Lock the key. */ - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -261,7 +270,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } else { has_lock = PJ_TRUE; } @@ -272,7 +281,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) /* Unlock if we still hold the lock */ if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } /* Done. */ @@ -379,7 +388,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -392,11 +402,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } /* Done. */ @@ -406,7 +416,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -415,10 +425,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) pj_status_t rc; /* Lock the key. */ - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -453,7 +463,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -466,7 +477,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else @@ -567,7 +578,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -580,7 +592,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } else { @@ -589,7 +601,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * are signalled for the same event, but only one thread eventually * able to process the event. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -599,19 +611,19 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, { pj_bool_t has_lock; - pj_mutex_lock(h->mutex); + pj_ioqueue_lock_key(h); if (!h->connecting) { /* It is possible that more than one thread was woken up, thus * the remaining thread will see h->connecting as zero because * it has been processed by other thread. */ - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } if (IS_CLOSING(h)) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); return; } @@ -629,7 +641,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, * save it to a flag. */ has_lock = PJ_FALSE; - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); + PJ_RACE_ME(5); } else { has_lock = PJ_TRUE; } @@ -651,7 +664,7 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, } if (has_lock) { - pj_mutex_unlock(h->mutex); + pj_ioqueue_unlock_key(h); } } @@ -713,18 +726,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, read_op->size = *length; read_op->flags = flags; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -789,18 +802,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, read_op->rmt_addr = addr; read_op->rmt_addrlen = addrlen; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -903,18 +916,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, write_op->written = 0; write_op->flags = flags; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1050,18 +1063,18 @@ retry_on_restart: pj_memcpy(&write_op->rmt_addr, addr, addrlen); write_op->rmt_addrlen = addrlen; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1127,18 +1140,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, accept_op->addrlen= addrlen; accept_op->local_addr = local; - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check * in multithreaded app. If we add bad handle to the set it will * corrupt the ioqueue set. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } pj_list_insert_before(&key->accept_list, accept_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } @@ -1171,18 +1184,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, } else { if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { /* Pending! */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous * check in multithreaded app. See #913 */ if (IS_CLOSING(key)) { - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_ECANCELLED; } key->connecting = PJ_TRUE; ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EPENDING; } else { /* Error! */ @@ -1228,7 +1241,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, * Find the operation key in all pending operation list to * really make sure that it's still there; then call the callback. */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Find the operation in the pending read list. */ op_rec = (struct generic_operation*)key->read_list.next; @@ -1236,7 +1249,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_read_complete)(key, op_key, bytes_status); return PJ_SUCCESS; @@ -1250,7 +1263,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_write_complete)(key, op_key, bytes_status); return PJ_SUCCESS; @@ -1264,7 +1277,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, if (op_rec == (void*)op_key) { pj_list_erase(op_rec); op_rec->op = PJ_IOQUEUE_OP_NONE; - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); (*key->cb.on_accept_complete)(key, op_key, PJ_INVALID_SOCKET, @@ -1274,7 +1287,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, op_rec = op_rec->next; } - pj_mutex_unlock(key->mutex); + pj_ioqueue_unlock_key(key); return PJ_EINVALIDOP; } @@ -1304,11 +1317,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) { - return pj_mutex_lock(key->mutex); + if (key->grp_lock) + return pj_grp_lock_acquire(key->grp_lock); + else + return pj_lock_acquire(key->lock); } PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) { - return pj_mutex_unlock(key->mutex); + if (key->grp_lock) + return pj_grp_lock_release(key->grp_lock); + else + return pj_lock_release(key->lock); } + |