From 379f21d67f143af70c85fd9ef2af67cc87d150e3 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Wed, 13 Feb 2008 15:17:28 +0000 Subject: Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls) git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pj/ioqueue_winnt.c | 185 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 159 insertions(+), 26 deletions(-) (limited to 'pjlib/src/pj/ioqueue_winnt.c') diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 3e8253e1..3c6b9fb6 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -114,6 +114,7 @@ struct pj_ioqueue_key_t void *user_data; enum handle_type hnd_type; pj_ioqueue_callback cb; + pj_bool_t allow_concurrent; #if PJ_HAS_TCP int connecting; @@ -123,6 +124,7 @@ struct pj_ioqueue_key_t pj_atomic_t *ref_count; pj_bool_t closing; pj_time_val free_time; + pj_mutex_t *mutex; #endif }; @@ -135,6 +137,7 @@ struct pj_ioqueue_t HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock; + pj_bool_t default_concurrency; #if PJ_IOQUEUE_HAS_SAFE_UNREG pj_ioqueue_key_t active_list; @@ -153,6 +156,12 @@ struct pj_ioqueue_t }; +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Prototype */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue); +#endif + + #if PJ_HAS_TCP /* * Process the socket when the overlapped accept() completed. @@ -315,13 +324,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, return PJ_RETURN_OS_ERROR(GetLastError()); /* Create IOCP mutex */ - rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); + rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { CloseHandle(ioqueue->iocp); return rc; } ioqueue->auto_delete_lock = PJ_TRUE; + ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; #if PJ_IOQUEUE_HAS_SAFE_UNREG /* @@ -344,14 +354,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } CloseHandle(ioqueue->iocp); return rc; } - pj_list_push_back(&ioqueue->free_list, key); + rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); + if (rc != PJ_SUCCESS) { + pj_atomic_destroy(key->ref_count); + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); + key = key->next; + } + CloseHandle(ioqueue->iocp); + return rc; + } + pj_list_push_back(&ioqueue->free_list, key); } #endif @@ -392,18 +415,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } #endif @@ -414,6 +440,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) return PJ_SUCCESS; } + +PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); + ioqueue->default_concurrency = allow; + return PJ_SUCCESS; +} + /* * pj_ioqueue_set_lock() */ @@ -453,6 +488,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_lock_acquire(ioqueue->lock); #if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Scan closing list first to release unused keys. + * Must do this with lock acquired. + */ + scan_closing_keys(ioqueue); + /* If safe unregistration is used, then get the key record from * the free list. */ @@ -481,6 +521,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); + /* Set concurrency for this handle */ + rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); + if (rc != PJ_SUCCESS) { + pj_lock_release(ioqueue->lock); + return rc; + } + #if PJ_HAS_TCP rec->connecting = 0; #endif @@ -585,6 +632,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, * - zero and pOv!=NULL if event for failed I/O was dequeued. */ if (pOv) { + pj_bool_t has_lock; + /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; @@ -600,10 +649,30 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (key->closing) return PJ_TRUE; + /* If concurrency is disabled, lock the key + * (and save the lock status to local var since app may change + * concurrency setting while in the callback) */ + if (key->allow_concurrent == PJ_FALSE) { + pj_mutex_lock(key->mutex); + has_lock = PJ_TRUE; + } else { + has_lock = PJ_FALSE; + } + + /* Now that we get the lock, check again that key is not closing */ + if (key->closing) { + if (has_lock) { + pj_mutex_unlock(key->mutex); + } + return PJ_TRUE; + } + /* Increment reference counter to prevent this key from being * deleted */ pj_atomic_inc(key->ref_count); +#else + PJ_UNUSED_ARG(has_lock); #endif /* Carry out the callback */ @@ -654,6 +723,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(key); + if (has_lock) + pj_mutex_unlock(key->mutex); #endif return PJ_TRUE; @@ -669,6 +740,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { unsigned i; + pj_bool_t has_lock; enum { RETRY = 10 }; PJ_ASSERT_RETURN(key, PJ_EINVAL); @@ -696,6 +768,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Mark key as closing before closing handle. */ key->closing = 1; + + /* If concurrency is disabled, wait until the key has finished + * processing the callback + */ + if (key->allow_concurrent == PJ_FALSE) { + pj_mutex_lock(key->mutex); + has_lock = PJ_TRUE; + } else { + has_lock = PJ_FALSE; + } +#else + PJ_UNUSED_ARG(has_lock); #endif /* Close handle (the only way to disassociate handle from IOCP). @@ -717,6 +801,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) * * Forcing context switch seems to have fixed that, but this is quite * an ugly solution.. + * + * Update 2008/02/13: + * This should not happen if concurrency is disallowed for the key. + * So at least application has a solution for this (i.e. by disallowing + * concurrency in the key). */ //This will loop forever if unregistration is done on the callback. //Doing this with RETRY I think should solve the IOCP setting the @@ -728,11 +817,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) /* Decrement reference counter to destroy the key. */ decrement_counter(key); + + if (has_lock) + pj_mutex_unlock(key->mutex); #endif return PJ_SUCCESS; } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan the closing list, and put pending closing keys to free list. + * Must do this with ioqueue mutex held. + */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue) +{ + if (!pj_list_empty(&ioqueue->closing_list)) { + pj_time_val now; + pj_ioqueue_key_t *key; + + pj_gettimeofday(&now); + + /* Move closing keys to free list when they've finished the closing + * idle time. + */ + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = key->next; + + pj_assert(key->closing != 0); + + if (PJ_TIME_VAL_GTE(now, key->free_time)) { + pj_list_erase(key); + pj_list_push_back(&ioqueue->free_list, key); + } + key = next; + } + } +} +#endif + /* * pj_ioqueue_poll() * @@ -766,32 +889,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are * pending closing keys. - * blp: - * no, always check the list. Otherwise on busy activity, this will cause - * ioqueue to reject new registration. */ - if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { - pj_time_val now; - pj_ioqueue_key_t *key; - - pj_gettimeofday(&now); - - /* Move closing keys to free list when they've finished the closing - * idle time. - */ + if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { pj_lock_acquire(ioqueue->lock); - key = ioqueue->closing_list.next; - while (key != &ioqueue->closing_list) { - pj_ioqueue_key_t *next = key->next; - - pj_assert(key->closing != 0); - - if (PJ_TIME_VAL_GTE(now, key->free_time)) { - pj_list_erase(key); - pj_list_push_back(&ioqueue->free_list, key); - } - key = next; - } + scan_closing_keys(ioqueue); pj_lock_release(ioqueue->lock); } #endif @@ -1268,3 +1369,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, return PJ_SUCCESS; } +PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is + * disabled. + */ + PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); + + key->allow_concurrent = allow; + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) +{ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + return pj_mutex_lock(key->mutex); +#else + PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); +#endif +} + +PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) +{ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + return pj_mutex_unlock(key->mutex); +#else + PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); +#endif +} + -- cgit v1.2.3