summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2009-07-02 08:24:22 +0000
committerBenny Prijono <bennylp@teluu.com>2009-07-02 08:24:22 +0000
commite9ad8e85add5d5efce32e1efb539c71a88c9c021 (patch)
treeab22e8d6d0ac9bfdf54a29f044dbe0c93ebb8455 /pjlib
parent5b976613819b6e2eb8c84fcf419633b33709f9ed (diff)
Ticket #913: Concurrency problem in select ioqueue may corrupt descriptor set
- fixed the concurrency problem - also fixed ioqueue unregister test in pjlib-test git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2826 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c47
-rw-r--r--pjlib/src/pjlib-test/ioq_unreg.c39
2 files changed, 67 insertions, 19 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index e965b149..cccabc8b 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -687,6 +687,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
read_op->flags = flags;
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous check
+ * in multithreaded app. If we add bad handle to the set it will
+ * corrupt the ioqueue set. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
@@ -755,6 +763,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
read_op->rmt_addrlen = addrlen;
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous check
+ * in multithreaded app. If we add bad handle to the set it will
+ * corrupt the ioqueue set. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
@@ -861,6 +877,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
write_op->flags = flags;
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous check
+ * in multithreaded app. If we add bad handle to the set it will
+ * corrupt the ioqueue set. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
@@ -978,6 +1002,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
write_op->rmt_addrlen = addrlen;
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous check
+ * in multithreaded app. If we add bad handle to the set it will
+ * corrupt the ioqueue set. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
@@ -1047,6 +1079,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
accept_op->local_addr = local;
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous check
+ * in multithreaded app. If we add bad handle to the set it will
+ * corrupt the ioqueue set. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
pj_list_insert_before(&key->accept_list, accept_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
@@ -1083,6 +1123,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
/* Pending! */
pj_mutex_lock(key->mutex);
+ /* Check again. Handle may have been closed after the previous
+ * check in multithreaded app. See #913
+ */
+ if (IS_CLOSING(key)) {
+ pj_mutex_unlock(key->mutex);
+ return PJ_ECANCELLED;
+ }
key->connecting = PJ_TRUE;
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c
index 5b320964..6b19ed1f 100644
--- a/pjlib/src/pjlib-test/ioq_unreg.c
+++ b/pjlib/src/pjlib-test/ioq_unreg.c
@@ -93,9 +93,6 @@ static void on_read_complete(pj_ioqueue_key_t *key,
if (PJ_TIME_VAL_GTE(now, time_to_unregister)) {
sock_data.unregistered = 1;
pj_ioqueue_unregister(key);
- pj_mutex_destroy(sock_data.mutex);
- pj_pool_release(sock_data.pool);
- sock_data.pool = NULL;
return;
}
}
@@ -243,31 +240,30 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue,
/* Loop until test time ends */
for (;;) {
pj_time_val now, timeout;
+ int n;
pj_gettimeofday(&now);
if (test_method == UNREGISTER_IN_APP &&
PJ_TIME_VAL_GTE(now, time_to_unregister) &&
- sock_data.pool)
+ !sock_data.unregistered)
{
- //Can't do this otherwise it'll deadlock
- //pj_mutex_lock(sock_data.mutex);
-
sock_data.unregistered = 1;
+ /* Wait (as much as possible) for callback to complete */
+ pj_mutex_lock(sock_data.mutex);
+ pj_mutex_unlock(sock_data.mutex);
pj_ioqueue_unregister(sock_data.key);
- //pj_mutex_unlock(sock_data.mutex);
- pj_mutex_destroy(sock_data.mutex);
- pj_pool_release(sock_data.pool);
- sock_data.pool = NULL;
}
if (PJ_TIME_VAL_GT(now, end_time) && sock_data.unregistered)
break;
timeout.sec = 0; timeout.msec = 10;
- pj_ioqueue_poll(ioqueue, &timeout);
- //pj_thread_sleep(1);
-
+ n = pj_ioqueue_poll(ioqueue, &timeout);
+ if (n < 0) {
+ app_perror("pj_ioqueue_poll error", -n);
+ pj_thread_sleep(1);
+ }
}
thread_quitting = 1;
@@ -277,6 +273,11 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue,
pj_thread_destroy(thread[i]);
}
+ /* Destroy data */
+ pj_mutex_destroy(sock_data.mutex);
+ pj_pool_release(sock_data.pool);
+ sock_data.pool = NULL;
+
if (other_socket) {
pj_ioqueue_unregister(osd.key);
}
@@ -314,7 +315,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
return -12;
}
- PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)",
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3, unregister in app (%s)",
pj_ioqueue_name()));
for (i=0; i<LOOP; ++i) {
pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -324,7 +325,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
}
- PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3 (%s)",
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3, unregister in app (%s)",
pj_ioqueue_name()));
for (i=0; i<LOOP; ++i) {
pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -335,7 +336,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
test_method = UNREGISTER_IN_CALLBACK;
- PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3 (%s)",
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3, unregister in cb (%s)",
pj_ioqueue_name()));
for (i=0; i<LOOP; ++i) {
pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -345,7 +346,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur)
}
- PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3 (%s)",
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3, unregister in cb (%s)",
pj_ioqueue_name()));
for (i=0; i<LOOP; ++i) {
pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
@@ -366,7 +367,7 @@ int udp_ioqueue_unreg_test(void)
rc = udp_ioqueue_unreg_test_imp(PJ_TRUE);
if (rc != 0)
- return rc;
+ return rc;
rc = udp_ioqueue_unreg_test_imp(PJ_FALSE);
if (rc != 0)