diff options
author | Benny Prijono <bennylp@teluu.com> | 2008-02-13 15:17:28 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2008-02-13 15:17:28 +0000 |
commit | 379f21d67f143af70c85fd9ef2af67cc87d150e3 (patch) | |
tree | 4b20c85bf30f3d7aebbd94a3ef529e7ae9f42f59 /pjlib/src/pj/ioqueue_common_abs.c | |
parent | a2ca31f0f6e30a30bf6f6e58ab423b370fbc9bb3 (diff) |
Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_common_abs.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 132 |
1 files changed, 123 insertions, 9 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index fdd1afe7..0af9cba5 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -96,6 +96,10 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, key->closing = 0; #endif + rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); + if (rc != PJ_SUCCESS) + return rc; + /* Get socket type. When socket type is datagram, some optimization * will be performed during send to allow parallel send operations. */ @@ -193,6 +197,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (h->connecting) { /* Completion of connect() operation */ pj_ssize_t bytes_transfered; + pj_bool_t has_lock; /* Clear operation. */ h->connecting = 0; @@ -246,13 +251,28 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } #endif - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, bytes_transfered); + /* Unlock if we still hold the lock */ + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + /* Done. */ } else @@ -317,6 +337,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) write_op->written == (pj_ssize_t)write_op->size || h->fd_type == pj_SOCK_DGRAM()) { + pj_bool_t has_lock; write_op->op = PJ_IOQUEUE_OP_NONE; @@ -330,8 +351,18 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } - /* No need to hold mutex anymore */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_write_complete && !IS_CLOSING(h)) { @@ -340,6 +371,10 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) write_op->written); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + } else { pj_mutex_unlock(h->mutex); } @@ -371,6 +406,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (!pj_list_empty(&h->accept_list)) { struct accept_operation *accept_op; + pj_bool_t has_lock; /* Get one accept operation from the list. */ accept_op = h->accept_list.next; @@ -389,8 +425,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) accept_op->addrlen); } - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_accept_complete && !IS_CLOSING(h)) { @@ -399,12 +445,16 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) *accept_op->accept_fd, rc); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } } else # endif if (key_has_pending_read(h)) { struct read_operation *read_op; pj_ssize_t bytes_read; + pj_bool_t has_lock; /* Get one pending read operation from the list. */ read_op = h->read_list.next; @@ -479,8 +529,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read = -rc; } - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_read_complete && !IS_CLOSING(h)) { @@ -489,6 +549,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + } else { /* * This is normal; execution may fall here when multiple threads @@ -503,6 +567,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { + pj_bool_t has_lock; + pj_mutex_lock(h->mutex); if (!h->connecting) { @@ -525,7 +591,18 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_connect_complete && !IS_CLOSING(h)) { @@ -542,6 +619,10 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, (*h->cb.on_connect_complete)(h, status); } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } } /* @@ -1096,3 +1177,36 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, return PJ_EINVALIDOP; } +PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); + ioqueue->default_concurrency = allow; + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is + * disabled. + */ + PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); + + key->allow_concurrent = allow; + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_lock(key->mutex); +} + +PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_unlock(key->mutex); +} + |