summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_winnt.c
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
committerBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
commit379f21d67f143af70c85fd9ef2af67cc87d150e3 (patch)
tree4b20c85bf30f3d7aebbd94a3ef529e7ae9f42f59 /pjlib/src/pj/ioqueue_winnt.c
parenta2ca31f0f6e30a30bf6f6e58ab423b370fbc9bb3 (diff)
Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_winnt.c')
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c185
1 files changed, 159 insertions, 26 deletions
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 3e8253e1..3c6b9fb6 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -114,6 +114,7 @@ struct pj_ioqueue_key_t
void *user_data;
enum handle_type hnd_type;
pj_ioqueue_callback cb;
+ pj_bool_t allow_concurrent;
#if PJ_HAS_TCP
int connecting;
@@ -123,6 +124,7 @@ struct pj_ioqueue_key_t
pj_atomic_t *ref_count;
pj_bool_t closing;
pj_time_val free_time;
+ pj_mutex_t *mutex;
#endif
};
@@ -135,6 +137,7 @@ struct pj_ioqueue_t
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
+ pj_bool_t default_concurrency;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_ioqueue_key_t active_list;
@@ -153,6 +156,12 @@ struct pj_ioqueue_t
};
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Prototype */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue);
+#endif
+
+
#if PJ_HAS_TCP
/*
* Process the socket when the overlapped accept() completed.
@@ -315,13 +324,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
return PJ_RETURN_OS_ERROR(GetLastError());
/* Create IOCP mutex */
- rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
return rc;
}
ioqueue->auto_delete_lock = PJ_TRUE;
+ ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/*
@@ -344,14 +354,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
CloseHandle(ioqueue->iocp);
return rc;
}
- pj_list_push_back(&ioqueue->free_list, key);
+ rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
+ if (rc != PJ_SUCCESS) {
+ pj_atomic_destroy(key->ref_count);
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+ pj_list_push_back(&ioqueue->free_list, key);
}
#endif
@@ -392,18 +415,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
#endif
@@ -414,6 +440,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
return PJ_SUCCESS;
}
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+ ioqueue->default_concurrency = allow;
+ return PJ_SUCCESS;
+}
+
/*
* pj_ioqueue_set_lock()
*/
@@ -453,6 +488,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_lock_acquire(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Scan closing list first to release unused keys.
+ * Must do this with lock acquired.
+ */
+ scan_closing_keys(ioqueue);
+
/* If safe unregistration is used, then get the key record from
* the free list.
*/
@@ -481,6 +521,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
+ /* Set concurrency for this handle */
+ rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
+ if (rc != PJ_SUCCESS) {
+ pj_lock_release(ioqueue->lock);
+ return rc;
+ }
+
#if PJ_HAS_TCP
rec->connecting = 0;
#endif
@@ -585,6 +632,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
* - zero and pOv!=NULL if event for failed I/O was dequeued.
*/
if (pOv) {
+ pj_bool_t has_lock;
+
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
@@ -600,10 +649,30 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (key->closing)
return PJ_TRUE;
+ /* If concurrency is disabled, lock the key
+ * (and save the lock status to local var since app may change
+ * concurrency setting while in the callback) */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+
+ /* Now that we get the lock, check again that key is not closing */
+ if (key->closing) {
+ if (has_lock) {
+ pj_mutex_unlock(key->mutex);
+ }
+ return PJ_TRUE;
+ }
+
/* Increment reference counter to prevent this key from being
* deleted
*/
pj_atomic_inc(key->ref_count);
+#else
+ PJ_UNUSED_ARG(has_lock);
#endif
/* Carry out the callback */
@@ -654,6 +723,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(key);
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
#endif
return PJ_TRUE;
@@ -669,6 +740,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
unsigned i;
+ pj_bool_t has_lock;
enum { RETRY = 10 };
PJ_ASSERT_RETURN(key, PJ_EINVAL);
@@ -696,6 +768,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key as closing before closing handle. */
key->closing = 1;
+
+ /* If concurrency is disabled, wait until the key has finished
+ * processing the callback
+ */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+#else
+ PJ_UNUSED_ARG(has_lock);
#endif
/* Close handle (the only way to disassociate handle from IOCP).
@@ -717,6 +801,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
*
* Forcing context switch seems to have fixed that, but this is quite
* an ugly solution..
+ *
+ * Update 2008/02/13:
+ * This should not happen if concurrency is disallowed for the key.
+ * So at least application has a solution for this (i.e. by disallowing
+ * concurrency in the key).
*/
//This will loop forever if unregistration is done on the callback.
//Doing this with RETRY I think should solve the IOCP setting the
@@ -728,11 +817,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
/* Decrement reference counter to destroy the key. */
decrement_counter(key);
+
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
#endif
return PJ_SUCCESS;
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan the closing list, and put pending closing keys to free list.
+ * Must do this with ioqueue mutex held.
+ */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+ if (!pj_list_empty(&ioqueue->closing_list)) {
+ pj_time_val now;
+ pj_ioqueue_key_t *key;
+
+ pj_gettimeofday(&now);
+
+ /* Move closing keys to free list when they've finished the closing
+ * idle time.
+ */
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = key->next;
+
+ pj_assert(key->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+ pj_list_erase(key);
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+ key = next;
+ }
+ }
+}
+#endif
+
/*
* pj_ioqueue_poll()
*
@@ -766,32 +889,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
* pending closing keys.
- * blp:
- * no, always check the list. Otherwise on busy activity, this will cause
- * ioqueue to reject new registration.
*/
- if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
- pj_time_val now;
- pj_ioqueue_key_t *key;
-
- pj_gettimeofday(&now);
-
- /* Move closing keys to free list when they've finished the closing
- * idle time.
- */
+ if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
pj_lock_acquire(ioqueue->lock);
- key = ioqueue->closing_list.next;
- while (key != &ioqueue->closing_list) {
- pj_ioqueue_key_t *next = key->next;
-
- pj_assert(key->closing != 0);
-
- if (PJ_TIME_VAL_GTE(now, key->free_time)) {
- pj_list_erase(key);
- pj_list_push_back(&ioqueue->free_list, key);
- }
- key = next;
- }
+ scan_closing_keys(ioqueue);
pj_lock_release(ioqueue->lock);
}
#endif
@@ -1268,3 +1369,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
return PJ_SUCCESS;
}
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+ * disabled.
+ */
+ PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+ key->allow_concurrent = allow;
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_lock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_unlock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+