summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
committerBenny Prijono <bennylp@teluu.com>2008-02-13 15:17:28 +0000
commit379f21d67f143af70c85fd9ef2af67cc87d150e3 (patch)
tree4b20c85bf30f3d7aebbd94a3ef529e7ae9f42f59
parenta2ca31f0f6e30a30bf6f6e58ab423b370fbc9bb3 (diff)
Ticket #474: option in ioqueue to control concurrency (to allow/disallow simultaneous/multiple callback calls)
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1789 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjlib/include/pj/config.h28
-rw-r--r--pjlib/include/pj/ioqueue.h146
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c132
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h4
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c185
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c36
-rw-r--r--pjlib/src/pjlib-test/ioq_tcp.c51
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c65
-rw-r--r--pjlib/src/pjlib-test/ioq_unreg.c24
9 files changed, 602 insertions, 69 deletions
diff --git a/pjlib/include/pj/config.h b/pjlib/include/pj/config.h
index bce870de..9a291608 100644
--- a/pjlib/include/pj/config.h
+++ b/pjlib/include/pj/config.h
@@ -285,6 +285,8 @@
# undef PJ_ENABLE_EXTRA_CHECK
# undef PJ_EXCEPTION_USE_WIN32_SEH
# undef PJ_HAS_ERROR_STRING
+
+# define PJ_HAS_IPV6 1
#endif
/**
@@ -513,6 +515,32 @@
/**
+ * Default concurrency setting for sockets/handles registered to ioqueue.
+ * This controls whether the ioqueue is allowed to call the key's callback
+ * concurrently/in parallel. The default is yes, which means that if there
+ * are more than one pending operations complete simultaneously, more
+ * than one threads may call the key's callback at the same time. This
+ * generally would promote good scalability for application, at the
+ * expense of more complexity to manage the concurrent accesses.
+ *
+ * Please see the ioqueue documentation for more info.
+ */
+#ifndef PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY
+# define PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY 1
+#endif
+
+
+/* Sanity check:
+ * if ioqueue concurrency is disallowed, PJ_IOQUEUE_HAS_SAFE_UNREG
+ * must be enabled.
+ */
+#if (PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY==0) && (PJ_IOQUEUE_HAS_SAFE_UNREG==0)
+# error PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if ioqueue concurrency \
+ is disabled
+#endif
+
+
+/**
* When safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is configured in
* ioqueue, the PJ_IOQUEUE_KEY_FREE_DELAY macro specifies how long the
* ioqueue key is kept in closing state before it can be reused.
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index ded4aea1..747eb215 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -101,21 +101,88 @@ PJ_BEGIN_DECL
*
* \section pj_ioqueue_concurrency_sec Concurrency Rules
*
- * The items below describe rules that must be obeyed when using the I/O
- * queue, with regard to concurrency:
- * - simultaneous operations (by different threads) to different key is safe.
- * - simultaneous operations to the same key is also safe, except
- * <b>unregistration</b>, which is described below.
- * - <b>care must be taken when unregistering a key</b> from the
+ * The ioqueue has been fine tuned to allow multiple threads to poll the
+ * handles simultaneously, to maximize scalability when the application is
+ * running on multiprocessor systems. When more than one threads are polling
+ * the ioqueue and there are more than one handles are signaled, more than
+ * one threads will execute the callback simultaneously to serve the events.
+ * These parallel executions are completely safe when the events happen for
+ * two different handles.
+ *
+ * However, with multithreading, care must be taken when multiple events
+ * happen on the same handle, or when event is happening on a handle (and
+ * the callback is being executed) and application is performing
+ * unregistration to the handle at the same time.
+ *
+ * The treatments of above scenario differ according to the concurrency
+ * setting that are applied to the handle.
+ *
+ * \subsection pj_ioq_concur_set Concurrency Settings for Handles
+ *
+ * Concurrency can be set on per handle (key) basis, by using
+ * #pj_ioqueue_set_concurrency() function. The default key concurrency value
+ * for the handle is inherited from the key concurrency setting of the ioqueue,
+ * and the key concurrency setting for the ioqueue can be changed by using
+ * #pj_ioqueue_set_default_concurrency(). The default key concurrency setting
+ * for ioqueue itself is controlled by compile time setting
+ * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY.
+ *
+ * Note that this key concurrency setting only controls whether multiple
+ * threads are allowed to operate <b>on the same key</b> at the same time.
+ * The ioqueue itself always allows multiple threads to enter the ioqeuue at
+ * the same time, and also simultaneous callback calls to <b>differrent
+ * keys</b> is always allowed regardless to the key concurrency setting.
+ *
+ * \subsection pj_ioq_parallel Parallel Callback Executions for the Same Handle
+ *
+ * Note that when key concurrency is enabled (i.e. parallel callback calls on
+ * the same key is allowed; this is the default setting), the ioqueue will only
+ * perform simultaneous callback executions on the same key when the key has
+ * invoked multiple pending operations. This could be done for example by
+ * calling #pj_ioqueue_recvfrom() more than once on the same key, each with
+ * the same key but different operation key (pj_ioqueue_op_key_t). With this
+ * scenario, when multiple packets arrive on the key at the same time, more
+ * than one threads may execute the callback simultaneously, each with the
+ * same key but different operation key.
+ *
+ * When there is only one pending operation on the key (e.g. there is only one
+ * #pj_ioqueue_recvfrom() invoked on the key), then events occuring to the
+ * same key will be queued by the ioqueue, thus no simultaneous callback calls
+ * will be performed.
+ *
+ * \subsection pj_ioq_allow_concur Concurrency is Enabled (Default Value)
+ *
+ * The default setting for the ioqueue is to allow multiple threads to
+ * execute callbacks for the same handle/key. This setting is selected to
+ * promote good performance and scalability for application.
+ *
+ * However this setting has a major drawback with regard to synchronization,
+ * and application MUST carefully follow the following guidelines to ensure
+ * that parallel access to the key does not cause problems:
+ *
+ * - Always note that callback may be called simultaneously for the same
+ * key.
+ * - <b>Care must be taken when unregistering a key</b> from the
* ioqueue. Application must take care that when one thread is issuing
- * an unregistration, other thread is not simultaneously invoking an
- * operation <b>to the same key</b>.
+ * an unregistration, other thread is not simultaneously invoking the
+ * callback <b>to the same key</b>.
*\n
* This happens because the ioqueue functions are working with a pointer
* to the key, and there is a possible race condition where the pointer
* has been rendered invalid by other threads before the ioqueue has a
* chance to acquire mutex on it.
*
+ * \subsection pj_ioq_disallow_concur Concurrency is Disabled
+ *
+ * Alternatively, application may disable key concurrency to make
+ * synchronization easier. As noted above, there are three ways to control
+ * key concurrency setting:
+ * - by controlling on per handle/key basis, with #pj_ioqueue_set_concurrency().
+ * - by changing default key concurrency setting on the ioqueue, with
+ * #pj_ioqueue_set_default_concurrency().
+ * - by changing the default concurrency on compile time, by declaring
+ * PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY macro to zero in your config_site.h
+ *
* \section pj_ioqeuue_examples_sec Examples
*
* For some examples on how to use the I/O Queue, please see:
@@ -291,6 +358,24 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioque,
pj_bool_t auto_delete );
/**
+ * Set default concurrency policy for this ioqueue. If this function is not
+ * called, the default concurrency policy for the ioqueue is controlled by
+ * compile time setting PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY.
+ *
+ * Note that changing the concurrency setting to the ioqueue will only affect
+ * subsequent key registrations. To modify the concurrency setting for
+ * individual key, use #pj_ioqueue_set_concurrency().
+ *
+ * @param ioqueue The ioqueue instance.
+ * @param allow Non-zero to allow concurrent callback calls, or
+ * PJ_FALSE to disallow it.
+ *
+ * @return PJ_SUCCESS on success or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
+ pj_bool_t allow);
+
+/**
* Register a socket to the I/O queue framework.
* When a socket is registered to the IOQueue, it may be modified to use
* non-blocking IO. If it is modified, there is no guarantee that this
@@ -366,6 +451,51 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data);
+/**
+ * Configure whether the ioqueue is allowed to call the key's callback
+ * concurrently/in parallel. The default concurrency setting for the key
+ * is controlled by ioqueue's default concurrency value, which can be
+ * changed by calling #pj_ioqueue_set_default_concurrency().
+ *
+ * If concurrency is allowed for the key, it means that if there are more
+ * than one pending operations complete simultaneously, more than one
+ * threads may call the key's callback at the same time. This generally
+ * would promote good scalability for application, at the expense of more
+ * complexity to manage the concurrent accesses in application's code.
+ *
+ * Alternatively application may disable the concurrent access by
+ * setting the \a allow flag to false. With concurrency disabled, only
+ * one thread can call the key's callback at one time.
+ *
+ * @param key The key that was previously obtained from registration.
+ * @param allow Set this to non-zero to allow concurrent callback calls
+ * and zero (PJ_FALSE) to disallow it.
+ *
+ * @return PJ_SUCCESS on success or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow);
+
+/**
+ * Acquire the key's mutex. When the key's concurrency is disabled,
+ * application may call this function to synchronize its operation
+ * with the key's callback (i.e. this function will block until the
+ * key's callback returns).
+ *
+ * @param key The key that was previously obtained from registration.
+ *
+ * @return PJ_SUCCESS on success or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key);
+
+/**
+ * Release the lock previously acquired with pj_ioqueue_lock_key().
+ *
+ * @param key The key that was previously obtained from registration.
+ *
+ * @return PJ_SUCCESS on success or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key);
/**
* Initialize operation key.
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index fdd1afe7..0af9cba5 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -96,6 +96,10 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
key->closing = 0;
#endif
+ rc = pj_ioqueue_set_concurrency(key, ioqueue->default_concurrency);
+ if (rc != PJ_SUCCESS)
+ return rc;
+
/* Get socket type. When socket type is datagram, some optimization
* will be performed during send to allow parallel send operations.
*/
@@ -193,6 +197,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (h->connecting) {
/* Completion of connect() operation */
pj_ssize_t bytes_transfered;
+ pj_bool_t has_lock;
/* Clear operation. */
h->connecting = 0;
@@ -246,13 +251,28 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
#endif
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, bytes_transfered);
+ /* Unlock if we still hold the lock */
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
/* Done. */
} else
@@ -317,6 +337,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
write_op->written == (pj_ssize_t)write_op->size ||
h->fd_type == pj_SOCK_DGRAM())
{
+ pj_bool_t has_lock;
write_op->op = PJ_IOQUEUE_OP_NONE;
@@ -330,8 +351,18 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
- /* No need to hold mutex anymore */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
@@ -340,6 +371,10 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
write_op->written);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
} else {
pj_mutex_unlock(h->mutex);
}
@@ -371,6 +406,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (!pj_list_empty(&h->accept_list)) {
struct accept_operation *accept_op;
+ pj_bool_t has_lock;
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
@@ -389,8 +425,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
accept_op->addrlen);
}
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
@@ -399,12 +445,16 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
*accept_op->accept_fd, rc);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
}
else
# endif
if (key_has_pending_read(h)) {
struct read_operation *read_op;
pj_ssize_t bytes_read;
+ pj_bool_t has_lock;
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
@@ -479,8 +529,18 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read = -rc;
}
- /* Unlock; from this point we don't need to hold key's mutex. */
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
@@ -489,6 +549,10 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read);
}
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
+
} else {
/*
* This is normal; execution may fall here when multiple threads
@@ -503,6 +567,8 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
+ pj_bool_t has_lock;
+
pj_mutex_lock(h->mutex);
if (!h->connecting) {
@@ -525,7 +591,18 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
- pj_mutex_unlock(h->mutex);
+ /* Unlock; from this point we don't need to hold key's mutex
+ * (unless concurrency is disabled, which in this case we should
+ * hold the mutex while calling the callback) */
+ if (h->allow_concurrent) {
+ /* concurrency may be changed while we're in the callback, so
+ * save it to a flag.
+ */
+ has_lock = PJ_FALSE;
+ pj_mutex_unlock(h->mutex);
+ } else {
+ has_lock = PJ_TRUE;
+ }
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
@@ -542,6 +619,10 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
(*h->cb.on_connect_complete)(h, status);
}
+
+ if (has_lock) {
+ pj_mutex_unlock(h->mutex);
+ }
}
/*
@@ -1096,3 +1177,36 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
return PJ_EINVALIDOP;
}
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency( pj_ioqueue_t *ioqueue,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+ ioqueue->default_concurrency = allow;
+ return PJ_SUCCESS;
+}
+
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+ * disabled.
+ */
+ PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+ key->allow_concurrent = allow;
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+ return pj_mutex_lock(key->mutex);
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+ return pj_mutex_unlock(key->mutex);
+}
+
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index 194f884d..4d35632c 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -103,6 +103,7 @@ union operation_key
pj_mutex_t *mutex; \
pj_bool_t inside_callback; \
pj_bool_t destroy_requested; \
+ pj_bool_t allow_concurrent; \
pj_sock_t fd; \
int fd_type; \
void *user_data; \
@@ -116,7 +117,8 @@ union operation_key
#define DECLARE_COMMON_IOQUEUE \
pj_lock_t *lock; \
- pj_bool_t auto_delete_lock;
+ pj_bool_t auto_delete_lock; \
+ pj_bool_t default_concurrency;
enum ioqueue_event_type
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 3e8253e1..3c6b9fb6 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -114,6 +114,7 @@ struct pj_ioqueue_key_t
void *user_data;
enum handle_type hnd_type;
pj_ioqueue_callback cb;
+ pj_bool_t allow_concurrent;
#if PJ_HAS_TCP
int connecting;
@@ -123,6 +124,7 @@ struct pj_ioqueue_key_t
pj_atomic_t *ref_count;
pj_bool_t closing;
pj_time_val free_time;
+ pj_mutex_t *mutex;
#endif
};
@@ -135,6 +137,7 @@ struct pj_ioqueue_t
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
+ pj_bool_t default_concurrency;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
pj_ioqueue_key_t active_list;
@@ -153,6 +156,12 @@ struct pj_ioqueue_t
};
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Prototype */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue);
+#endif
+
+
#if PJ_HAS_TCP
/*
* Process the socket when the overlapped accept() completed.
@@ -315,13 +324,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
return PJ_RETURN_OS_ERROR(GetLastError());
/* Create IOCP mutex */
- rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
return rc;
}
ioqueue->auto_delete_lock = PJ_TRUE;
+ ioqueue->default_concurrency = PJ_IOQUEUE_DEFAULT_ALLOW_CONCURRENCY;
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/*
@@ -344,14 +354,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
CloseHandle(ioqueue->iocp);
return rc;
}
- pj_list_push_back(&ioqueue->free_list, key);
+ rc = pj_mutex_create_recursive(pool, "ioqkey", &key->mutex);
+ if (rc != PJ_SUCCESS) {
+ pj_atomic_destroy(key->ref_count);
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+ pj_list_push_back(&ioqueue->free_list, key);
}
#endif
@@ -392,18 +415,21 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
pj_atomic_destroy(key->ref_count);
+ pj_mutex_destroy(key->mutex);
key = key->next;
}
#endif
@@ -414,6 +440,15 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
return PJ_SUCCESS;
}
+
+PJ_DEF(pj_status_t) pj_ioqueue_set_default_concurrency(pj_ioqueue_t *ioqueue,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(ioqueue != NULL, PJ_EINVAL);
+ ioqueue->default_concurrency = allow;
+ return PJ_SUCCESS;
+}
+
/*
* pj_ioqueue_set_lock()
*/
@@ -453,6 +488,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_lock_acquire(ioqueue->lock);
#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Scan closing list first to release unused keys.
+ * Must do this with lock acquired.
+ */
+ scan_closing_keys(ioqueue);
+
/* If safe unregistration is used, then get the key record from
* the free list.
*/
@@ -481,6 +521,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
+ /* Set concurrency for this handle */
+ rc = pj_ioqueue_set_concurrency(rec, ioqueue->default_concurrency);
+ if (rc != PJ_SUCCESS) {
+ pj_lock_release(ioqueue->lock);
+ return rc;
+ }
+
#if PJ_HAS_TCP
rec->connecting = 0;
#endif
@@ -585,6 +632,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
* - zero and pOv!=NULL if event for failed I/O was dequeued.
*/
if (pOv) {
+ pj_bool_t has_lock;
+
/* Event was dequeued for either successfull or failed I/O */
key = (pj_ioqueue_key_t*)dwKey;
size_status = dwBytesTransfered;
@@ -600,10 +649,30 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (key->closing)
return PJ_TRUE;
+ /* If concurrency is disabled, lock the key
+ * (and save the lock status to local var since app may change
+ * concurrency setting while in the callback) */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+
+ /* Now that we get the lock, check again that key is not closing */
+ if (key->closing) {
+ if (has_lock) {
+ pj_mutex_unlock(key->mutex);
+ }
+ return PJ_TRUE;
+ }
+
/* Increment reference counter to prevent this key from being
* deleted
*/
pj_atomic_inc(key->ref_count);
+#else
+ PJ_UNUSED_ARG(has_lock);
#endif
/* Carry out the callback */
@@ -654,6 +723,8 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(key);
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
#endif
return PJ_TRUE;
@@ -669,6 +740,7 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
unsigned i;
+ pj_bool_t has_lock;
enum { RETRY = 10 };
PJ_ASSERT_RETURN(key, PJ_EINVAL);
@@ -696,6 +768,18 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Mark key as closing before closing handle. */
key->closing = 1;
+
+ /* If concurrency is disabled, wait until the key has finished
+ * processing the callback
+ */
+ if (key->allow_concurrent == PJ_FALSE) {
+ pj_mutex_lock(key->mutex);
+ has_lock = PJ_TRUE;
+ } else {
+ has_lock = PJ_FALSE;
+ }
+#else
+ PJ_UNUSED_ARG(has_lock);
#endif
/* Close handle (the only way to disassociate handle from IOCP).
@@ -717,6 +801,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
*
* Forcing context switch seems to have fixed that, but this is quite
* an ugly solution..
+ *
+ * Update 2008/02/13:
+ * This should not happen if concurrency is disallowed for the key.
+ * So at least application has a solution for this (i.e. by disallowing
+ * concurrency in the key).
*/
//This will loop forever if unregistration is done on the callback.
//Doing this with RETRY I think should solve the IOCP setting the
@@ -728,11 +817,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
/* Decrement reference counter to destroy the key. */
decrement_counter(key);
+
+ if (has_lock)
+ pj_mutex_unlock(key->mutex);
#endif
return PJ_SUCCESS;
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan the closing list, and put pending closing keys to free list.
+ * Must do this with ioqueue mutex held.
+ */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+ if (!pj_list_empty(&ioqueue->closing_list)) {
+ pj_time_val now;
+ pj_ioqueue_key_t *key;
+
+ pj_gettimeofday(&now);
+
+ /* Move closing keys to free list when they've finished the closing
+ * idle time.
+ */
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = key->next;
+
+ pj_assert(key->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+ pj_list_erase(key);
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+ key = next;
+ }
+ }
+}
+#endif
+
/*
* pj_ioqueue_poll()
*
@@ -766,32 +889,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
* pending closing keys.
- * blp:
- * no, always check the list. Otherwise on busy activity, this will cause
- * ioqueue to reject new registration.
*/
- if (/*event_count == 0 &&*/ !pj_list_empty(&ioqueue->closing_list)) {
- pj_time_val now;
- pj_ioqueue_key_t *key;
-
- pj_gettimeofday(&now);
-
- /* Move closing keys to free list when they've finished the closing
- * idle time.
- */
+ if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
pj_lock_acquire(ioqueue->lock);
- key = ioqueue->closing_list.next;
- while (key != &ioqueue->closing_list) {
- pj_ioqueue_key_t *next = key->next;
-
- pj_assert(key->closing != 0);
-
- if (PJ_TIME_VAL_GTE(now, key->free_time)) {
- pj_list_erase(key);
- pj_list_push_back(&ioqueue->free_list, key);
- }
- key = next;
- }
+ scan_closing_keys(ioqueue);
pj_lock_release(ioqueue->lock);
}
#endif
@@ -1268,3 +1369,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
return PJ_SUCCESS;
}
+PJ_DEF(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
+ pj_bool_t allow)
+{
+ PJ_ASSERT_RETURN(key, PJ_EINVAL);
+
+ /* PJ_IOQUEUE_HAS_SAFE_UNREG must be enabled if concurrency is
+ * disabled.
+ */
+ PJ_ASSERT_RETURN(allow || PJ_IOQUEUE_HAS_SAFE_UNREG, PJ_EINVAL);
+
+ key->allow_concurrent = allow;
+ return PJ_SUCCESS;
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_lock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+
+PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
+{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ return pj_mutex_unlock(key->mutex);
+#else
+ PJ_ASSERT_RETURN(!"PJ_IOQUEUE_HAS_SAFE_UNREG is disabled", PJ_EINVALIDOP);
+#endif
+}
+
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index 92d4540d..e7ddf1f4 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -221,7 +221,8 @@ static int worker_thread(void *p)
* - measure the total bytes received by all consumers during a
* period of time.
*/
-static int perform_test(int sock_type, const char *type_name,
+static int perform_test(pj_bool_t allow_concur,
+ int sock_type, const char *type_name,
unsigned thread_cnt, unsigned sockpair_cnt,
pj_size_t buffer_size,
pj_size_t *p_bandwidth)
@@ -260,6 +261,12 @@ static int perform_test(int sock_type, const char *type_name,
return -15;
}
+ rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
+ return -16;
+ }
+
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
pj_ssize_t bytes;
@@ -437,10 +444,7 @@ static int perform_test(int sock_type, const char *type_name,
return 0;
}
-/*
- * main test entry.
- */
-int ioqueue_perf_test(void)
+static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
{
enum { BUF_SIZE = 512 };
int i, rc;
@@ -500,6 +504,7 @@ int ioqueue_perf_test(void)
int best_index = 0;
PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
+ PJ_LOG(3,(THIS_FILE, " Testing with concurency=%d", allow_concur));
PJ_LOG(3,(THIS_FILE, " ======================================="));
PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Bandwidth"));
PJ_LOG(3,(THIS_FILE, " ======================================="));
@@ -508,7 +513,8 @@ int ioqueue_perf_test(void)
for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
pj_size_t bandwidth;
- rc = perform_test(test_param[i].type,
+ rc = perform_test(allow_concur,
+ test_param[i].type,
test_param[i].type_name,
test_param[i].thread_cnt,
test_param[i].sockpair_cnt,
@@ -537,6 +543,24 @@ int ioqueue_perf_test(void)
return 0;
}
+/*
+ * main test entry.
+ */
+int ioqueue_perf_test(void)
+{
+ int rc;
+
+ rc = ioqueue_perf_test_imp(PJ_TRUE);
+ if (rc != 0)
+ return rc;
+
+ rc = ioqueue_perf_test_imp(PJ_FALSE);
+ if (rc != 0)
+ return rc;
+
+ return 0;
+}
+
#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index c6e117db..106a9645 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -232,7 +232,7 @@ static int send_recv_test(pj_ioqueue_t *ioque,
/*
* Compliance test for success scenario.
*/
-static int compliance_test_0(void)
+static int compliance_test_0(pj_bool_t allow_concur)
{
pj_sock_t ssock=-1, csock0=-1, csock1=-1;
pj_sockaddr_in addr, client_addr, rmt_addr;
@@ -292,6 +292,13 @@ static int compliance_test_0(void)
status=-20; goto on_error;
}
+ // Concurrency
+ rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc);
+ status=-21; goto on_error;
+ }
+
// Register server socket and client socket.
rc = pj_ioqueue_register_sock(pool, ioque, ssock, NULL, &test_cb, &skey);
if (rc == PJ_SUCCESS)
@@ -458,7 +465,7 @@ on_error:
* Compliance test for failed scenario.
* In this case, the client connects to a non-existant service.
*/
-static int compliance_test_1(void)
+static int compliance_test_1(pj_bool_t allow_concur)
{
pj_sock_t csock1=PJ_INVALID_SOCKET;
pj_sockaddr_in addr;
@@ -479,6 +486,12 @@ static int compliance_test_1(void)
status=-20; goto on_error;
}
+ // Concurrency
+ rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ status=-21; goto on_error;
+ }
+
// Create client socket
rc = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &csock1);
if (rc != PJ_SUCCESS) {
@@ -581,7 +594,7 @@ on_error:
/*
* Repeated connect/accept on the same listener socket.
*/
-static int compliance_test_2(void)
+static int compliance_test_2(pj_bool_t allow_concur)
{
#if defined(PJ_SYMBIAN) && PJ_SYMBIAN!=0
enum { MAX_PAIR = 1, TEST_LOOP = 2 };
@@ -648,6 +661,13 @@ static int compliance_test_2(void)
}
+ // Concurrency
+ rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...ERROR in pj_ioqueue_set_default_concurrency()", rc);
+ return -11;
+ }
+
// Allocate buffers for send and receive.
send_buf = (char*)pj_pool_alloc(pool, bufsize);
recv_buf = (char*)pj_pool_alloc(pool, bufsize);
@@ -887,26 +907,28 @@ on_error:
}
-int tcp_ioqueue_test()
+static int tcp_ioqueue_test_impl(pj_bool_t allow_concur)
{
int status;
+ PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
PJ_LOG(3, (THIS_FILE, "..%s compliance test 0 (success scenario)",
pj_ioqueue_name()));
- if ((status=compliance_test_0()) != 0) {
+ if ((status=compliance_test_0(allow_concur)) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE, "..%s compliance test 1 (failed scenario)",
pj_ioqueue_name()));
- if ((status=compliance_test_1()) != 0) {
+ if ((status=compliance_test_1(allow_concur)) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
PJ_LOG(3, (THIS_FILE, "..%s compliance test 2 (repeated accept)",
pj_ioqueue_name()));
- if ((status=compliance_test_2()) != 0) {
+ if ((status=compliance_test_2(allow_concur)) != 0) {
PJ_LOG(1, (THIS_FILE, "....FAILED (status=%d)\n", status));
return status;
}
@@ -914,6 +936,21 @@ int tcp_ioqueue_test()
return 0;
}
+int tcp_ioqueue_test()
+{
+ int rc;
+
+ rc = tcp_ioqueue_test_impl(PJ_TRUE);
+ if (rc != 0)
+ return rc;
+
+ rc = tcp_ioqueue_test_impl(PJ_FALSE);
+ if (rc != 0)
+ return rc;
+
+ return 0;
+}
+
#endif /* PJ_HAS_TCP */
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index 1bbe494f..e7e1ae52 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -125,7 +125,7 @@ static pj_ioqueue_callback test_cb =
* To test that the basic IOQueue functionality works. It will just exchange
* data between two sockets.
*/
-static int compliance_test(void)
+static int compliance_test(pj_bool_t allow_concur)
{
pj_sock_t ssock=-1, csock=-1;
pj_sockaddr_in addr, dst_addr;
@@ -178,6 +178,13 @@ static int compliance_test(void)
status=-20; goto on_error;
}
+ // Set concurrency
+ TRACE_("set concurrency...");
+ rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ status=-21; goto on_error;
+ }
+
// Register server and client socket.
// We put this after inactivity socket, hopefully this can represent the
// worst waiting time.
@@ -351,7 +358,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
* Check if callback is still called after socket has been unregistered or
* closed.
*/
-static int unregister_test(void)
+static int unregister_test(pj_bool_t allow_concur)
{
enum { RPORT = 50000, SPORT = 50001 };
pj_pool_t *pool;
@@ -381,6 +388,13 @@ static int unregister_test(void)
return -110;
}
+ // Set concurrency
+ TRACE_("set concurrency...");
+ status = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+ if (status != PJ_SUCCESS) {
+ return -112;
+ }
+
/* Create sender socket */
status = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0, SPORT, &ssock);
if (status != PJ_SUCCESS) {
@@ -512,7 +526,7 @@ static int unregister_test(void)
* This will just test registering PJ_IOQUEUE_MAX_HANDLES count
* of sockets to the ioqueue.
*/
-static int many_handles_test(void)
+static int many_handles_test(pj_bool_t allow_concur)
{
enum { MAX = PJ_IOQUEUE_MAX_HANDLES };
pj_pool_t *pool;
@@ -539,6 +553,12 @@ static int many_handles_test(void)
return -10;
}
+ // Set concurrency
+ rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ return -11;
+ }
+
/* Register as many sockets. */
for (count=0; count<MAX; ++count) {
sock[count] = PJ_INVALID_SOCKET;
@@ -600,7 +620,8 @@ static int many_handles_test(void)
/*
* Benchmarking IOQueue
*/
-static int bench_test(int bufsize, int inactive_sock_count)
+static int bench_test(pj_bool_t allow_concur, int bufsize,
+ int inactive_sock_count)
{
pj_sock_t ssock=-1, csock=-1;
pj_sockaddr_in addr;
@@ -651,6 +672,13 @@ static int bench_test(int bufsize, int inactive_sock_count)
goto on_error;
}
+ // Set concurrency
+ rc = pj_ioqueue_set_default_concurrency(ioque, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
+ goto on_error;
+ }
+
// Allocate inactive sockets, and bind them to some arbitrary address.
// Then register them to the I/O queue, and start a read operation.
inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
@@ -839,27 +867,29 @@ on_error:
return -1;
}
-int udp_ioqueue_test()
+static int udp_ioqueue_test_imp(pj_bool_t allow_concur)
{
int status;
int bufsize, sock_count;
+ PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
//goto pass1;
PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));
- if ((status=compliance_test()) != 0) {
+ if ((status=compliance_test(allow_concur)) != 0) {
return status;
}
PJ_LOG(3, (THIS_FILE, "....compliance test ok"));
PJ_LOG(3, (THIS_FILE, "...unregister test (%s)", pj_ioqueue_name()));
- if ((status=unregister_test()) != 0) {
+ if ((status=unregister_test(allow_concur)) != 0) {
return status;
}
PJ_LOG(3, (THIS_FILE, "....unregister test ok"));
- if ((status=many_handles_test()) != 0) {
+ if ((status=many_handles_test(allow_concur)) != 0) {
return status;
}
@@ -879,7 +909,7 @@ int udp_ioqueue_test()
//goto pass2;
for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
- if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0)
+ if ((status=bench_test(allow_concur, bufsize, SOCK_INACTIVE_MIN)) != 0)
return status;
}
//pass2:
@@ -889,12 +919,27 @@ int udp_ioqueue_test()
sock_count *= 2)
{
//PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
- if ((status=bench_test(bufsize, sock_count-2)) != 0)
+ if ((status=bench_test(allow_concur, bufsize, sock_count-2)) != 0)
return status;
}
return 0;
}
+int udp_ioqueue_test()
+{
+ int rc;
+
+ rc = udp_ioqueue_test_imp(PJ_TRUE);
+ if (rc != 0)
+ return rc;
+
+ rc = udp_ioqueue_test_imp(PJ_FALSE);
+ if (rc != 0)
+ return rc;
+
+ return 0;
+}
+
#else
/* To prevent warning about "translation unit is empty"
* when this test is disabled.
diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c
index 33e86270..a1e80753 100644
--- a/pjlib/src/pjlib-test/ioq_unreg.c
+++ b/pjlib/src/pjlib-test/ioq_unreg.c
@@ -286,14 +286,16 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue,
return 0;
}
-int udp_ioqueue_unreg_test(void)
+static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
{
enum { LOOP = 10 };
int i, rc;
char title[30];
pj_ioqueue_t *ioqueue;
pj_pool_t *test_pool;
-
+
+ PJ_LOG(3,(THIS_FILE, "..testing with concurency=%d", allow_concur));
+
test_method = UNREGISTER_IN_APP;
test_pool = pj_pool_create(mem, "unregtest", 4000, 4000, NULL);
@@ -304,6 +306,11 @@ int udp_ioqueue_unreg_test(void)
return -10;
}
+ rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
+ if (rc != PJ_SUCCESS) {
+ app_perror("Error in pj_ioqueue_set_default_concurrency()", rc);
+ return -12;
+ }
PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)",
pj_ioqueue_name()));
@@ -351,7 +358,20 @@ int udp_ioqueue_unreg_test(void)
return 0;
}
+int udp_ioqueue_unreg_test(void)
+{
+ int rc;
+
+ rc = udp_ioqueue_unreg_test_imp(PJ_TRUE);
+ if (rc != 0)
+ return rc;
+
+ rc = udp_ioqueue_unreg_test_imp(PJ_FALSE);
+ if (rc != 0)
+ return rc;
+ return 0;
+}
#else
/* To prevent warning about "translation unit is empty"