summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_common_abs.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
committerBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
commit379f21d67f143af70c85fd9ef2af67cc87d150e3 (patch)
tree4b20c85bf30f3d7aebbd94a3ef529e7ae9f42f59 /pjlib/src/pj/ioqueue_common_abs.c
parenta2ca31f0f6e30a30bf6f6e58ab423b370fbc9bb3 (diff)
Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c132
1 files changed, 123 insertions, 9 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index fdd1afe7..0af9cba5 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -96,6 +96,10 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
key->closing = 0;
#endif
+ rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
+ if (rc != PJ_SUCCESS)
+ return rc;
+
/* Get socket type. When socket type is datagram, some optimization
* will be performed during send to allow parallel send operations.
*/
@@ -193,6 +197,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (h->connecting) {
/* Completion of connect() operation */
pj_ssize_t bytes_transfered;
+ pj_bool_t has_lock;
/* Clear operation. */
h->connecting = 0;
@@ -246,13 +251,28 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
#endif
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, bytes_transfered);
+ /* Unlock if we still hold the lock */
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
/* Done. */
} else
@@ -317,6 +337,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
write_op->written == (pj_ssize_t)write_op->size ||
h->fd_type == pj_SOCK_DGRAM())
{
+ pj_bool_t has_lock;
write_op->op = PJ_IOQUEUE_OP_NONE;
@@ -330,8 +351,18 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
@@ -340,6 +371,10 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
write_op->written);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
} else {
pj_mutex_unlock(h->mutex);
}
@@ -371,6 +406,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (!pj_list_empty(&h->accept_list)) {
struct accept_operation *accept_op;
+ pj_bool_t has_lock;
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
@@ -389,8 +425,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
accept_op->addrlen);
}
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
@@ -399,12 +445,16 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
*accept_op->accept_fd, rc);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
}
else
# endif
if (key_has_pending_read(h)) {
struct read_operation *read_op;
pj_ssize_t bytes_read;
+ pj_bool_t has_lock;
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
@@ -479,8 +529,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read = -rc;
}
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
@@ -489,6 +549,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
} else {
/*
* This is normal; execution may fall here when multiple threads
@@ -503,6 +567,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
+ pj_bool_t has_lock;
+
pj_mutex_lock(h->mutex);
if (!h->connecting) {
@@ -525,7 +591,18 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
@@ -542,6 +619,10 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
(*h->cb.on_connect_complete)(h, status);
}
+
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
}
/*
@@ -1096,3 +1177,36 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
return PJ_EINVALIDOP;
}
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+ ioqueue->default_concurrency = allow;
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+ * disabled.
+ */
+ PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+ key->allow_concurrent = allow;
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+ return pj_mutex_lock(key->mutex);
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+ return pj_mutex_unlock(key->mutex);
+}
+