summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_common_abs.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c116
1 files changed, 68 insertions, 48 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index ee4506d..b8036a5 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -1,4 +1,4 @@
-/* $Id: ioqueue_common_abs.c 3666 2011-07-19 08:40:20Z nanang $ */
+/* $Id: ioqueue_common_abs.c 4359 2013-02-21 11:18:36Z bennylp $ */
/*
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
@@ -70,6 +70,7 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
pj_sock_t sock,
+ pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb)
{
@@ -114,10 +115,18 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
/* Create mutex for the key. */
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
- rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+ rc = pj_lock_create_simple_mutex(poll, NULL, &key->lock);
#endif
+ if (rc != PJ_SUCCESS)
+ return rc;
+
+ /* Group lock */
+ key->grp_lock = grp_lock;
+ if (key->grp_lock) {
+ pj_grp_lock_add_ref_dbg(key->grp_lock, "ioqueue", 0);
+ }
- return rc;
+ return PJ_SUCCESS;
}
/*
@@ -189,10 +198,10 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -261,7 +270,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
} else {
has_lock = PJ_TRUE;
}
@@ -272,7 +281,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Unlock if we still hold the lock */
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
/* Done. */
@@ -379,7 +388,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -392,11 +402,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
} else {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
/* Done. */
@@ -406,7 +416,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -415,10 +425,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
pj_status_t rc;
/* Lock the key. */
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -453,7 +463,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -466,7 +477,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
else
@@ -567,7 +578,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -580,7 +592,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
} else {
@@ -589,7 +601,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -599,19 +611,19 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
{
pj_bool_t has_lock;
- pj_mutex_lock(h->mutex);
+ pj_ioqueue_lock_key(h);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
if (IS_CLOSING(h)) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
return;
}
@@ -629,7 +641,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
* save it to a flag.
*/
has_lock = PJ_FALSE;
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
+ PJ_RACE_ME(5);
} else {
has_lock = PJ_TRUE;
}
@@ -651,7 +664,7 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
}
if (has_lock) {
- pj_mutex_unlock(h->mutex);
+ pj_ioqueue_unlock_key(h);
}
}
@@ -713,18 +726,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
read_op->size = *length;
read_op->flags = flags;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -789,18 +802,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -903,18 +916,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
write_op->written = 0;
write_op->flags = flags;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1050,18 +1063,18 @@ retry_on_restart:
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
write_op->rmt_addrlen = addrlen;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1127,18 +1140,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
accept_op->addrlen= addrlen;
accept_op->local_addr = local;
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous check
* in multithreaded app. If we add bad handle to the set it will
* corrupt the ioqueue set. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
pj_list_insert_before(&key->accept_list, accept_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
}
@@ -1171,18 +1184,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
} else {
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
/* Pending! */
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Check again. Handle may have been closed after the previous
* check in multithreaded app. See #913
*/
if (IS_CLOSING(key)) {
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_ECANCELLED;
}
key->connecting = PJ_TRUE;
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EPENDING;
} else {
/* Error! */
@@ -1228,7 +1241,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
@@ -1236,7 +1249,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
@@ -1250,7 +1263,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
@@ -1264,7 +1277,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = PJ_IOQUEUE_OP_NONE;
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
@@ -1274,7 +1287,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
op_rec = op_rec->next;
}
- pj_mutex_unlock(key->mutex);
+ pj_ioqueue_unlock_key(key);
return PJ_EINVALIDOP;
}
@@ -1304,11 +1317,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
{
- return pj_mutex_lock(key->mutex);
+ if (key->grp_lock)
+ return pj_grp_lock_acquire(key->grp_lock);
+ else
+ return pj_lock_acquire(key->lock);
}
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
- return pj_mutex_unlock(key->mutex);
+ if (key->grp_lock)
+ return pj_grp_lock_release(key->grp_lock);
+ else
+ return pj_lock_release(key->lock);
}
+