diff options
author | Benny Prijono <bennylp@teluu.com> | 2008-02-13 15:17:28 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2008-02-13 15:17:28 +0000 |
commit | 379f21d67f143af70c85fd9ef2af67cc87d150e3 (patch) | |
tree | 4b20c85bf30f3d7aebbd94a3ef529e7ae9f42f59 /pjlib | |
parent | a2ca31f0f6e30a30bf6f6e58ab423b370fbc9bb3 (diff) |
Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r-- | pjlib/include/pj/config.h | 28 | ||||
-rw-r--r-- | pjlib/include/pj/ioqueue.h | 146 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 132 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.h | 4 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_winnt.c | 185 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 36 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_tcp.c | 51 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_udp.c | 65 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_unreg.c | 24 |
9 files changed, 602 insertions, 69 deletions
diff --git a/pjlib/include/pj/config.h b/pjlib/include/pj/config.h index bce870de..9a291608 100644 --- a/pjlib/include/pj/config.h +++ b/pjlib/include/pj/config.h @@ -285,6 +285,8 @@ # undef PJ_ENABLE_EXTRA_CHECK # undef PJ_EXCEPTION_USE_WIN32_SEH # undef PJ_HAS_ERROR_STRING + +# define PJ_HAS_IPV6 1 #endif /** @@ -513,6 +515,32 @@ /** + * Default concurrency setting for sockets/handles registered to ioqueue. + * This controls whether the ioqueue is allowed to call the key's callback + * concurrently/in parallel. The default is yes, which means that if there + * are more than one pending operations complete simultaneously, more + * than one threads may call the key's callback at the same time. This + * generally would promote good scalability for application, at the + * expense of more complexity to manage the concurrent accesses. + * + * Please see the ioqueue documentation for more info. + */ +#ifndef PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY +# define PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY 1 +#endif + + +/* Sanity check: + * if ioqueue concurrency is disallowed, PJ_IOQUEUE_HAS_SAFE_UNREG + * must be enabled. + */ +#if (PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY==0) && (PJ_IOQUEUE_HAS_SAFE_UNREG==0) +# error PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if ioqueue concurrency \ + is disabled +#endif + + +/** * When safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is configured in * ioqueue, the PJ_IOQUEUE_KEY_FREE_DELAY macro specifies how long the * ioqueue key is kept in closing state before it can be reused. diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index ded4aea1..747eb215 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -101,21 +101,88 @@ PJ_BEGIN_DECL * * \section pj_ioqueue_concurrency_sec Concurrency Rules * - * The items below describe rules that must be obeyed when using the I/O - * queue, with regard to concurrency: - * - simultaneous operations (by different threads) to different key is safe. - * - simultaneous operations to the same key is also safe, except - * <b>unregistration</b>, which is described below. - * - <b>care must be taken when unregistering a key</b> from the + * The ioqueue has been fine tuned to allow multiple threads to poll the + * handles simultaneously, to maximize scalability when the application is + * running on multiprocessor systems. When more than one threads are polling + * the ioqueue and there are more than one handles are signaled, more than + * one threads will execute the callback simultaneously to serve the events. + * These parallel executions are completely safe when the events happen for + * two different handles. + * + * However, with multithreading, care must be taken when multiple events + * happen on the same handle, or when event is happening on a handle (and + * the callback is being executed) and application is performing + * unregistration to the handle at the same time. + * + * The treatments of above scenario differ according to the concurrency + * setting that are applied to the handle. + * + * \subsection pj_ioq_concur_set Concurrency Settings for Handles + * + * Concurrency can be set on per handle (key) basis, by using + * #pj_ioqueue_set_concurrency() function. The default key concurrency value + * for the handle is inherited from the key concurrency setting of the ioqueue, + * and the key concurrency setting for the ioqueue can be changed by using + * #pj_ioqueue_set_default_concurrency(). The default key concurrency setting + * for ioqueue itself is controlled by compile time setting + * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. + * + * Note that this key concurrency setting only controls whether multiple + * threads are allowed to operate <b>on the same key</b> at the same time. + * The ioqueue itself always allows multiple threads to enter the ioqeuue at + * the same time, and also simultaneous callback calls to <b>differrent + * keys</b> is always allowed regardless to the key concurrency setting. + * + * \subsection pj_ioq_parallel Parallel Callback Executions for the Same Handle + * + * Note that when key concurrency is enabled (i.e. parallel callback calls on + * the same key is allowed; this is the default setting), the ioqueue will only + * perform simultaneous callback executions on the same key when the key has + * invoked multiple pending operations. This could be done for example by + * calling #pj_ioqueue_recvfrom() more than once on the same key, each with + * the same key but different operation key (pj_ioqueue_op_key_t). With this + * scenario, when multiple packets arrive on the key at the same time, more + * than one threads may execute the callback simultaneously, each with the + * same key but different operation key. + * + * When there is only one pending operation on the key (e.g. there is only one + * #pj_ioqueue_recvfrom() invoked on the key), then events occuring to the + * same key will be queued by the ioqueue, thus no simultaneous callback calls + * will be performed. + * + * \subsection pj_ioq_allow_concur Concurrency is Enabled (Default Value) + * + * The default setting for the ioqueue is to allow multiple threads to + * execute callbacks for the same handle/key. This setting is selected to + * promote good performance and scalability for application. + * + * However this setting has a major drawback with regard to synchronization, + * and application MUST carefully follow the following guidelines to ensure + * that parallel access to the key does not cause problems: + * + * - Always note that callback may be called simultaneously for the same + * key. + * - <b>Care must be taken when unregistering a key</b> from the * ioqueue. Application must take care that when one thread is issuing - * an unregistration, other thread is not simultaneously invoking an - * operation <b>to the same key</b>. + * an unregistration, other thread is not simultaneously invoking the + * callback <b>to the same key</b>. *\n * This happens because the ioqueue functions are working with a pointer * to the key, and there is a possible race condition where the pointer * has been rendered invalid by other threads before the ioqueue has a * chance to acquire mutex on it. * + * \subsection pj_ioq_disallow_concur Concurrency is Disabled + * + * Alternatively, application may disable key concurrency to make + * synchronization easier. As noted above, there are three ways to control + * key concurrency setting: + * - by controlling on per handle/key basis, with #pj_ioqueue_set_concurrency(). + * - by changing default key concurrency setting on the ioqueue, with + * #pj_ioqueue_set_default_concurrency(). + * - by changing the default concurrency on compile time, by declaring + * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY macro to zero in your config_site.h + * * \section pj_ioqeuue_examples_sec Examples * * For some examples on how to use the I/O Queue, please see: @@ -291,6 +358,24 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque, pj_bool_t auto_delete ); /** + * Set default concurrency policy for this ioqueue. If this function is not + * called, the default concurrency policy for the ioqueue is controlled by + * compile time setting PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY. + * + * Note that changing the concurrency setting to the ioqueue will only affect + * subsequent key registrations. To modify the concurrency setting for + * individual key, use #pj_ioqueue_set_concurrency(). + * + * @param ioqueue The ioqueue instance. + * @param allow Non-zero to allow concurrent callback calls, or + * PJ_FALSE to disallow it. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, + pj_bool_t allow); + +/** * Register a socket to the I/O queue framework. * When a socket is registered to the IOQueue, it may be modified to use * non-blocking IO. If it is modified, there is no guarantee that this @@ -366,6 +451,51 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, void *user_data, void **old_data); +/** + * Configure whether the ioqueue is allowed to call the key's callback + * concurrently/in parallel. The default concurrency setting for the key + * is controlled by ioqueue's default concurrency value, which can be + * changed by calling #pj_ioqueue_set_default_concurrency(). + * + * If concurrency is allowed for the key, it means that if there are more + * than one pending operations complete simultaneously, more than one + * threads may call the key's callback at the same time. This generally + * would promote good scalability for application, at the expense of more + * complexity to manage the concurrent accesses in application's code. + * + * Alternatively application may disable the concurrent access by + * setting the \a allow flag to false. With concurrency disabled, only + * one thread can call the key's callback at one time. + * + * @param key The key that was previously obtained from registration. + * @param allow Set this to non-zero to allow concurrent callback calls + * and zero (PJ_FALSE) to disallow it. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow); + +/** + * Acquire the key's mutex. When the key's concurrency is disabled, + * application may call this function to synchronize its operation + * with the key's callback (i.e. this function will block until the + * key's callback returns). + * + * @param key The key that was previously obtained from registration. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); + +/** + * Release the lock previously acquired with pj_ioqueue_lock_key(). + * + * @param key The key that was previously obtained from registration. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key); /** * Initialize operation key. diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index fdd1afe7..0af9cba5 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -96,6 +96,10 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool, key->closing = 0; #endif + rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency); + if (rc != PJ_SUCCESS) + return rc; + /* Get socket type. When socket type is datagram, some optimization * will be performed during send to allow parallel send operations. */ @@ -193,6 +197,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) if (h->connecting) { /* Completion of connect() operation */ pj_ssize_t bytes_transfered; + pj_bool_t has_lock; /* Clear operation. */ h->connecting = 0; @@ -246,13 +251,28 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } #endif - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, bytes_transfered); + /* Unlock if we still hold the lock */ + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + /* Done. */ } else @@ -317,6 +337,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) write_op->written == (pj_ssize_t)write_op->size || h->fd_type == pj_SOCK_DGRAM()) { + pj_bool_t has_lock; write_op->op = PJ_IOQUEUE_OP_NONE; @@ -330,8 +351,18 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) } - /* No need to hold mutex anymore */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_write_complete && !IS_CLOSING(h)) { @@ -340,6 +371,10 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) write_op->written); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + } else { pj_mutex_unlock(h->mutex); } @@ -371,6 +406,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) if (!pj_list_empty(&h->accept_list)) { struct accept_operation *accept_op; + pj_bool_t has_lock; /* Get one accept operation from the list. */ accept_op = h->accept_list.next; @@ -389,8 +425,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) accept_op->addrlen); } - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_accept_complete && !IS_CLOSING(h)) { @@ -399,12 +445,16 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) *accept_op->accept_fd, rc); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } } else # endif if (key_has_pending_read(h)) { struct read_operation *read_op; pj_ssize_t bytes_read; + pj_bool_t has_lock; /* Get one pending read operation from the list. */ read_op = h->read_list.next; @@ -479,8 +529,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read = -rc; } - /* Unlock; from this point we don't need to hold key's mutex. */ - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_read_complete && !IS_CLOSING(h)) { @@ -489,6 +549,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) bytes_read); } + if (has_lock) { + pj_mutex_unlock(h->mutex); + } + } else { /* * This is normal; execution may fall here when multiple threads @@ -503,6 +567,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { + pj_bool_t has_lock; + pj_mutex_lock(h->mutex); if (!h->connecting) { @@ -525,7 +591,18 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT); - pj_mutex_unlock(h->mutex); + /* Unlock; from this point we don't need to hold key's mutex + * (unless concurrency is disabled, which in this case we should + * hold the mutex while calling the callback) */ + if (h->allow_concurrent) { + /* concurrency may be changed while we're in the callback, so + * save it to a flag. + */ + has_lock = PJ_FALSE; + pj_mutex_unlock(h->mutex); + } else { + has_lock = PJ_TRUE; + } /* Call callback. */ if (h->cb.on_connect_complete && !IS_CLOSING(h)) { @@ -542,6 +619,10 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, (*h->cb.on_connect_complete)(h, status); } + + if (has_lock) { + pj_mutex_unlock(h->mutex); + } } /* @@ -1096,3 +1177,36 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, return PJ_EINVALIDOP; } +PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); + ioqueue->default_concurrency = allow; + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is + * disabled. + */ + PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); + + key->allow_concurrent = allow; + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_lock(key->mutex); +} + +PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) +{ + return pj_mutex_unlock(key->mutex); +} + diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h index 194f884d..4d35632c 100644 --- a/pjlib/src/pj/ioqueue_common_abs.h +++ b/pjlib/src/pj/ioqueue_common_abs.h @@ -103,6 +103,7 @@ union operation_key pj_mutex_t *mutex; \ pj_bool_t inside_callback; \ pj_bool_t destroy_requested; \ + pj_bool_t allow_concurrent; \ pj_sock_t fd; \ int fd_type; \ void *user_data; \ @@ -116,7 +117,8 @@ union operation_key #define DECLARE_COMMON_IOQUEUE \ pj_lock_t *lock; \ - pj_bool_t auto_delete_lock; + pj_bool_t auto_delete_lock; \ + pj_bool_t default_concurrency; enum ioqueue_event_type diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 3e8253e1..3c6b9fb6 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -114,6 +114,7 @@ struct pj_ioqueue_key_t void *user_data; enum handle_type hnd_type; pj_ioqueue_callback cb; + pj_bool_t allow_concurrent; #if PJ_HAS_TCP int connecting; @@ -123,6 +124,7 @@ struct pj_ioqueue_key_t pj_atomic_t *ref_count; pj_bool_t closing; pj_time_val free_time; + pj_mutex_t *mutex; #endif }; @@ -135,6 +137,7 @@ struct pj_ioqueue_t HANDLE iocp; pj_lock_t *lock; pj_bool_t auto_delete_lock; + pj_bool_t default_concurrency; #if PJ_IOQUEUE_HAS_SAFE_UNREG pj_ioqueue_key_t active_list; @@ -153,6 +156,12 @@ struct pj_ioqueue_t }; +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Prototype */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue); +#endif + + #if PJ_HAS_TCP /* * Process the socket when the overlapped accept() completed. @@ -315,13 +324,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, return PJ_RETURN_OS_ERROR(GetLastError()); /* Create IOCP mutex */ - rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock); + rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock); if (rc != PJ_SUCCESS) { CloseHandle(ioqueue->iocp); return rc; } ioqueue->auto_delete_lock = PJ_TRUE; + ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY; #if PJ_IOQUEUE_HAS_SAFE_UNREG /* @@ -344,14 +354,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } CloseHandle(ioqueue->iocp); return rc; } - pj_list_push_back(&ioqueue->free_list, key); + rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex); + if (rc != PJ_SUCCESS) { + pj_atomic_destroy(key->ref_count); + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); + key = key->next; + } + CloseHandle(ioqueue->iocp); + return rc; + } + pj_list_push_back(&ioqueue->free_list, key); } #endif @@ -392,18 +415,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { pj_atomic_destroy(key->ref_count); + pj_mutex_destroy(key->mutex); key = key->next; } #endif @@ -414,6 +440,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue ) return PJ_SUCCESS; } + +PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL); + ioqueue->default_concurrency = allow; + return PJ_SUCCESS; +} + /* * pj_ioqueue_set_lock() */ @@ -453,6 +488,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, pj_lock_acquire(ioqueue->lock); #if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Scan closing list first to release unused keys. + * Must do this with lock acquired. + */ + scan_closing_keys(ioqueue); + /* If safe unregistration is used, then get the key record from * the free list. */ @@ -481,6 +521,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, rec->user_data = user_data; pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback)); + /* Set concurrency for this handle */ + rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency); + if (rc != PJ_SUCCESS) { + pj_lock_release(ioqueue->lock); + return rc; + } + #if PJ_HAS_TCP rec->connecting = 0; #endif @@ -585,6 +632,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, * - zero and pOv!=NULL if event for failed I/O was dequeued. */ if (pOv) { + pj_bool_t has_lock; + /* Event was dequeued for either successfull or failed I/O */ key = (pj_ioqueue_key_t*)dwKey; size_status = dwBytesTransfered; @@ -600,10 +649,30 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (key->closing) return PJ_TRUE; + /* If concurrency is disabled, lock the key + * (and save the lock status to local var since app may change + * concurrency setting while in the callback) */ + if (key->allow_concurrent == PJ_FALSE) { + pj_mutex_lock(key->mutex); + has_lock = PJ_TRUE; + } else { + has_lock = PJ_FALSE; + } + + /* Now that we get the lock, check again that key is not closing */ + if (key->closing) { + if (has_lock) { + pj_mutex_unlock(key->mutex); + } + return PJ_TRUE; + } + /* Increment reference counter to prevent this key from being * deleted */ pj_atomic_inc(key->ref_count); +#else + PJ_UNUSED_ARG(has_lock); #endif /* Carry out the callback */ @@ -654,6 +723,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(key); + if (has_lock) + pj_mutex_unlock(key->mutex); #endif return PJ_TRUE; @@ -669,6 +740,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) { unsigned i; + pj_bool_t has_lock; enum { RETRY = 10 }; PJ_ASSERT_RETURN(key, PJ_EINVAL); @@ -696,6 +768,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Mark key as closing before closing handle. */ key->closing = 1; + + /* If concurrency is disabled, wait until the key has finished + * processing the callback + */ + if (key->allow_concurrent == PJ_FALSE) { + pj_mutex_lock(key->mutex); + has_lock = PJ_TRUE; + } else { + has_lock = PJ_FALSE; + } +#else + PJ_UNUSED_ARG(has_lock); #endif /* Close handle (the only way to disassociate handle from IOCP). @@ -717,6 +801,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) * * Forcing context switch seems to have fixed that, but this is quite * an ugly solution.. + * + * Update 2008/02/13: + * This should not happen if concurrency is disallowed for the key. + * So at least application has a solution for this (i.e. by disallowing + * concurrency in the key). */ //This will loop forever if unregistration is done on the callback. //Doing this with RETRY I think should solve the IOCP setting the @@ -728,11 +817,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) /* Decrement reference counter to destroy the key. */ decrement_counter(key); + + if (has_lock) + pj_mutex_unlock(key->mutex); #endif return PJ_SUCCESS; } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan the closing list, and put pending closing keys to free list. + * Must do this with ioqueue mutex held. + */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue) +{ + if (!pj_list_empty(&ioqueue->closing_list)) { + pj_time_val now; + pj_ioqueue_key_t *key; + + pj_gettimeofday(&now); + + /* Move closing keys to free list when they've finished the closing + * idle time. + */ + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = key->next; + + pj_assert(key->closing != 0); + + if (PJ_TIME_VAL_GTE(now, key->free_time)) { + pj_list_erase(key); + pj_list_push_back(&ioqueue->free_list, key); + } + key = next; + } + } +} +#endif + /* * pj_ioqueue_poll() * @@ -766,32 +889,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are * pending closing keys. - * blp: - * no, always check the list. Otherwise on busy activity, this will cause - * ioqueue to reject new registration. */ - if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) { - pj_time_val now; - pj_ioqueue_key_t *key; - - pj_gettimeofday(&now); - - /* Move closing keys to free list when they've finished the closing - * idle time. - */ + if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) { pj_lock_acquire(ioqueue->lock); - key = ioqueue->closing_list.next; - while (key != &ioqueue->closing_list) { - pj_ioqueue_key_t *next = key->next; - - pj_assert(key->closing != 0); - - if (PJ_TIME_VAL_GTE(now, key->free_time)) { - pj_list_erase(key); - pj_list_push_back(&ioqueue->free_list, key); - } - key = next; - } + scan_closing_keys(ioqueue); pj_lock_release(ioqueue->lock); } #endif @@ -1268,3 +1369,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key, return PJ_SUCCESS; } +PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, + pj_bool_t allow) +{ + PJ_ASSERT_RETURN(key, PJ_EINVAL); + + /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is + * disabled. + */ + PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL); + + key->allow_concurrent = allow; + return PJ_SUCCESS; +} + +PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) +{ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + return pj_mutex_lock(key->mutex); +#else + PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); +#endif +} + +PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) +{ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + return pj_mutex_unlock(key->mutex); +#else + PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP); +#endif +} + diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index 92d4540d..e7ddf1f4 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -221,7 +221,8 @@ static int worker_thread(void *p) * - measure the total bytes received by all consumers during a * period of time. */ -static int perform_test(int sock_type, const char *type_name, +static int perform_test(pj_bool_t allow_concur, + int sock_type, const char *type_name, unsigned thread_cnt, unsigned sockpair_cnt, pj_size_t buffer_size, pj_size_t *p_bandwidth) @@ -260,6 +261,12 @@ static int perform_test(int sock_type, const char *type_name, return -15; } + rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); + if (rc != PJ_SUCCESS) { + app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); + return -16; + } + /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { pj_ssize_t bytes; @@ -437,10 +444,7 @@ static int perform_test(int sock_type, const char *type_name, return 0; } -/* - * main test entry. - */ -int ioqueue_perf_test(void) +static int ioqueue_perf_test_imp(pj_bool_t allow_concur) { enum { BUF_SIZE = 512 }; int i, rc; @@ -500,6 +504,7 @@ int ioqueue_perf_test(void) int best_index = 0; PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name())); + PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur)); PJ_LOG(3,(THIS_FILE, " =======================================")); PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth")); PJ_LOG(3,(THIS_FILE, " =======================================")); @@ -508,7 +513,8 @@ int ioqueue_perf_test(void) for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) { pj_size_t bandwidth; - rc = perform_test(test_param[i].type, + rc = perform_test(allow_concur, + test_param[i].type, test_param[i].type_name, test_param[i].thread_cnt, test_param[i].sockpair_cnt, @@ -537,6 +543,24 @@ int ioqueue_perf_test(void) return 0; } +/* + * main test entry. + */ +int ioqueue_perf_test(void) +{ + int rc; + + rc = ioqueue_perf_test_imp(PJ_TRUE); + if (rc != 0) + return rc; + + rc = ioqueue_perf_test_imp(PJ_FALSE); + if (rc != 0) + return rc; + + return 0; +} + #else /* To prevent warning about "translation unit is empty" * when this test is disabled. diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index c6e117db..106a9645 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -232,7 +232,7 @@ static int send_recv_test(pj_ioqueue_t *ioque, /* * Compliance test for success scenario. */ -static int compliance_test_0(void) +static int compliance_test_0(pj_bool_t allow_concur) { pj_sock_t ssock=-1, csock0=-1, csock1=-1; pj_sockaddr_in addr, client_addr, rmt_addr; @@ -292,6 +292,13 @@ static int compliance_test_0(void) status=-20; goto on_error; } + // Concurrency + rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); + if (rc != PJ_SUCCESS) { + app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); + status=-21; goto on_error; + } + // Register server socket and client socket. rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, &test_cb, &skey); if (rc == PJ_SUCCESS) @@ -458,7 +465,7 @@ on_error: * Compliance test for failed scenario. * In this case, the client connects to a non-existant service. */ -static int compliance_test_1(void) +static int compliance_test_1(pj_bool_t allow_concur) { pj_sock_t csock1=PJ_INVALID_SOCKET; pj_sockaddr_in addr; @@ -479,6 +486,12 @@ static int compliance_test_1(void) status=-20; goto on_error; } + // Concurrency + rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); + if (rc != PJ_SUCCESS) { + status=-21; goto on_error; + } + // Create client socket rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &csock1); if (rc != PJ_SUCCESS) { @@ -581,7 +594,7 @@ on_error: /* * Repeated connect/accept on the same listener socket. */ -static int compliance_test_2(void) +static int compliance_test_2(pj_bool_t allow_concur) { #if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0 enum { MAX_PAIR = 1, TEST_LOOP = 2 }; @@ -648,6 +661,13 @@ static int compliance_test_2(void) } + // Concurrency + rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); + if (rc != PJ_SUCCESS) { + app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc); + return -11; + } + // Allocate buffers for send and receive. send_buf = (char*)pj_pool_alloc(pool, bufsize); recv_buf = (char*)pj_pool_alloc(pool, bufsize); @@ -887,26 +907,28 @@ on_error: } -int tcp_ioqueue_test() +static int tcp_ioqueue_test_impl(pj_bool_t allow_concur) { int status; + PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); + PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)", pj_ioqueue_name())); - if ((status=compliance_test_0()) != 0) { + if ((status=compliance_test_0(allow_concur)) != 0) { PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); return status; } PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)", pj_ioqueue_name())); - if ((status=compliance_test_1()) != 0) { + if ((status=compliance_test_1(allow_concur)) != 0) { PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); return status; } PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)", pj_ioqueue_name())); - if ((status=compliance_test_2()) != 0) { + if ((status=compliance_test_2(allow_concur)) != 0) { PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status)); return status; } @@ -914,6 +936,21 @@ int tcp_ioqueue_test() return 0; } +int tcp_ioqueue_test() +{ + int rc; + + rc = tcp_ioqueue_test_impl(PJ_TRUE); + if (rc != 0) + return rc; + + rc = tcp_ioqueue_test_impl(PJ_FALSE); + if (rc != 0) + return rc; + + return 0; +} + #endif /* PJ_HAS_TCP */ diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c index 1bbe494f..e7e1ae52 100644 --- a/pjlib/src/pjlib-test/ioq_udp.c +++ b/pjlib/src/pjlib-test/ioq_udp.c @@ -125,7 +125,7 @@ static pj_ioqueue_callback test_cb = * To test that the basic IOQueue functionality works. It will just exchange * data between two sockets. */ -static int compliance_test(void) +static int compliance_test(pj_bool_t allow_concur) { pj_sock_t ssock=-1, csock=-1; pj_sockaddr_in addr, dst_addr; @@ -178,6 +178,13 @@ static int compliance_test(void) status=-20; goto on_error; } + // Set concurrency + TRACE_("set concurrency..."); + rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); + if (rc != PJ_SUCCESS) { + status=-21; goto on_error; + } + // Register server and client socket. // We put this after inactivity socket, hopefully this can represent the // worst waiting time. @@ -351,7 +358,7 @@ static void on_read_complete(pj_ioqueue_key_t *key, * Check if callback is still called after socket has been unregistered or * closed. */ -static int unregister_test(void) +static int unregister_test(pj_bool_t allow_concur) { enum { RPORT = 50000, SPORT = 50001 }; pj_pool_t *pool; @@ -381,6 +388,13 @@ static int unregister_test(void) return -110; } + // Set concurrency + TRACE_("set concurrency..."); + status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); + if (status != PJ_SUCCESS) { + return -112; + } + /* Create sender socket */ status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock); if (status != PJ_SUCCESS) { @@ -512,7 +526,7 @@ static int unregister_test(void) * This will just test registering PJ_IOQUEUE_MAX_HANDLES count * of sockets to the ioqueue. */ -static int many_handles_test(void) +static int many_handles_test(pj_bool_t allow_concur) { enum { MAX = PJ_IOQUEUE_MAX_HANDLES }; pj_pool_t *pool; @@ -539,6 +553,12 @@ static int many_handles_test(void) return -10; } + // Set concurrency + rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); + if (rc != PJ_SUCCESS) { + return -11; + } + /* Register as many sockets. */ for (count=0; count<MAX; ++count) { sock[count] = PJ_INVALID_SOCKET; @@ -600,7 +620,8 @@ static int many_handles_test(void) /* * Benchmarking IOQueue */ -static int bench_test(int bufsize, int inactive_sock_count) +static int bench_test(pj_bool_t allow_concur, int bufsize, + int inactive_sock_count) { pj_sock_t ssock=-1, csock=-1; pj_sockaddr_in addr; @@ -651,6 +672,13 @@ static int bench_test(int bufsize, int inactive_sock_count) goto on_error; } + // Set concurrency + rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur); + if (rc != PJ_SUCCESS) { + app_perror("...error: pj_ioqueue_set_default_concurrency()", rc); + goto on_error; + } + // Allocate inactive sockets, and bind them to some arbitrary address. // Then register them to the I/O queue, and start a read operation. inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, @@ -839,27 +867,29 @@ on_error: return -1; } -int udp_ioqueue_test() +static int udp_ioqueue_test_imp(pj_bool_t allow_concur) { int status; int bufsize, sock_count; + PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); + //goto pass1; PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name())); - if ((status=compliance_test()) != 0) { + if ((status=compliance_test(allow_concur)) != 0) { return status; } PJ_LOG(3, (THIS_FILE, "....compliance test ok")); PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name())); - if ((status=unregister_test()) != 0) { + if ((status=unregister_test(allow_concur)) != 0) { return status; } PJ_LOG(3, (THIS_FILE, "....unregister test ok")); - if ((status=many_handles_test()) != 0) { + if ((status=many_handles_test(allow_concur)) != 0) { return status; } @@ -879,7 +909,7 @@ int udp_ioqueue_test() //goto pass2; for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) { - if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0) + if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0) return status; } //pass2: @@ -889,12 +919,27 @@ int udp_ioqueue_test() sock_count *= 2) { //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count)); - if ((status=bench_test(bufsize, sock_count-2)) != 0) + if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0) return status; } return 0; } +int udp_ioqueue_test() +{ + int rc; + + rc = udp_ioqueue_test_imp(PJ_TRUE); + if (rc != 0) + return rc; + + rc = udp_ioqueue_test_imp(PJ_FALSE); + if (rc != 0) + return rc; + + return 0; +} + #else /* To prevent warning about "translation unit is empty" * when this test is disabled. diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c index 33e86270..a1e80753 100644 --- a/pjlib/src/pjlib-test/ioq_unreg.c +++ b/pjlib/src/pjlib-test/ioq_unreg.c @@ -286,14 +286,16 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue, return 0; } -int udp_ioqueue_unreg_test(void) +static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) { enum { LOOP = 10 }; int i, rc; char title[30]; pj_ioqueue_t *ioqueue; pj_pool_t *test_pool; - + + PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur)); + test_method = UNREGISTER_IN_APP; test_pool = pj_pool_create(mem, "unregtest", 4000, 4000, NULL); @@ -304,6 +306,11 @@ int udp_ioqueue_unreg_test(void) return -10; } + rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur); + if (rc != PJ_SUCCESS) { + app_perror("Error in pj_ioqueue_set_default_concurrency()", rc); + return -12; + } PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)", pj_ioqueue_name())); @@ -351,7 +358,20 @@ int udp_ioqueue_unreg_test(void) return 0; } +int udp_ioqueue_unreg_test(void) +{ + int rc; + + rc = udp_ioqueue_unreg_test_imp(PJ_TRUE); + if (rc != 0) + return rc; + + rc = udp_ioqueue_unreg_test_imp(PJ_FALSE); + if (rc != 0) + return rc; + return 0; +} #else /* To prevent warning about "translation unit is empty" |