diff options
Diffstat (limited to 'pjlib/src')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 47 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_unreg.c | 39 |
2 files changed, 67 insertions, 19 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index e965b149..cccabc8b 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -687,6 +687,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key, read_op->flags = flags; pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); @@ -755,6 +763,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key, read_op->rmt_addrlen = addrlen; pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } pj_list_insert_before(&key->read_list, read_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); @@ -861,6 +877,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, write_op->flags = flags; pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); pj_mutex_unlock(key->mutex); @@ -978,6 +1002,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, write_op->rmt_addrlen = addrlen; pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } pj_list_insert_before(&key->write_list, write_op); ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); pj_mutex_unlock(key->mutex); @@ -1047,6 +1079,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, accept_op->local_addr = local; pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous check + * in multithreaded app. If we add bad handle to the set it will + * corrupt the ioqueue set. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } pj_list_insert_before(&key->accept_list, accept_op); ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT); pj_mutex_unlock(key->mutex); @@ -1083,6 +1123,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key, if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) { /* Pending! */ pj_mutex_lock(key->mutex); + /* Check again. Handle may have been closed after the previous + * check in multithreaded app. See #913 + */ + if (IS_CLOSING(key)) { + pj_mutex_unlock(key->mutex); + return PJ_ECANCELLED; + } key->connecting = PJ_TRUE; ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT); ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT); diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c index 5b320964..6b19ed1f 100644 --- a/pjlib/src/pjlib-test/ioq_unreg.c +++ b/pjlib/src/pjlib-test/ioq_unreg.c @@ -93,9 +93,6 @@ static void on_read_complete(pj_ioqueue_key_t *key, if (PJ_TIME_VAL_GTE(now, time_to_unregister)) { sock_data.unregistered = 1; pj_ioqueue_unregister(key); - pj_mutex_destroy(sock_data.mutex); - pj_pool_release(sock_data.pool); - sock_data.pool = NULL; return; } } @@ -243,31 +240,30 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue, /* Loop until test time ends */ for (;;) { pj_time_val now, timeout; + int n; pj_gettimeofday(&now); if (test_method == UNREGISTER_IN_APP && PJ_TIME_VAL_GTE(now, time_to_unregister) && - sock_data.pool) + !sock_data.unregistered) { - //Can't do this otherwise it'll deadlock - //pj_mutex_lock(sock_data.mutex); - sock_data.unregistered = 1; + /* Wait (as much as possible) for callback to complete */ + pj_mutex_lock(sock_data.mutex); + pj_mutex_unlock(sock_data.mutex); pj_ioqueue_unregister(sock_data.key); - //pj_mutex_unlock(sock_data.mutex); - pj_mutex_destroy(sock_data.mutex); - pj_pool_release(sock_data.pool); - sock_data.pool = NULL; } if (PJ_TIME_VAL_GT(now, end_time) && sock_data.unregistered) break; timeout.sec = 0; timeout.msec = 10; - pj_ioqueue_poll(ioqueue, &timeout); - //pj_thread_sleep(1); - + n = pj_ioqueue_poll(ioqueue, &timeout); + if (n < 0) { + app_perror("pj_ioqueue_poll error", -n); + pj_thread_sleep(1); + } } thread_quitting = 1; @@ -277,6 +273,11 @@ static int perform_unreg_test(pj_ioqueue_t *ioqueue, pj_thread_destroy(thread[i]); } + /* Destroy data */ + pj_mutex_destroy(sock_data.mutex); + pj_pool_release(sock_data.pool); + sock_data.pool = NULL; + if (other_socket) { pj_ioqueue_unregister(osd.key); } @@ -314,7 +315,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) return -12; } - PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)", + PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3, unregister in app (%s)", pj_ioqueue_name())); for (i=0; i<LOOP; ++i) { pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP); @@ -324,7 +325,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) } - PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3 (%s)", + PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3, unregister in app (%s)", pj_ioqueue_name())); for (i=0; i<LOOP; ++i) { pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP); @@ -335,7 +336,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) test_method = UNREGISTER_IN_CALLBACK; - PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3 (%s)", + PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3, unregister in cb (%s)", pj_ioqueue_name())); for (i=0; i<LOOP; ++i) { pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP); @@ -345,7 +346,7 @@ static int udp_ioqueue_unreg_test_imp(pj_bool_t allow_concur) } - PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3 (%s)", + PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3, unregister in cb (%s)", pj_ioqueue_name())); for (i=0; i<LOOP; ++i) { pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP); @@ -366,7 +367,7 @@ int udp_ioqueue_unreg_test(void) rc = udp_ioqueue_unreg_test_imp(PJ_TRUE); if (rc != 0) - return rc; + return rc; rc = udp_ioqueue_unreg_test_imp(PJ_FALSE); if (rc != 0) |