summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pjlib/build/Makefile4
-rw-r--r--pjlib/build/pjlib.dsp13
-rw-r--r--pjlib/build/pjlib_test.dsp4
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c214
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h13
-rw-r--r--pjlib/src/pj/ioqueue_select.c246
-rw-r--r--pjlib/src/pj/ioqueue_winnt.c312
-rw-r--r--pjlib/src/pj/pool_dbg_win32.c237
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c2
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c5
-rw-r--r--pjlib/src/pjlib-test/ioq_unreg.c361
-rw-r--r--pjlib/src/pjlib-test/test.c4
-rw-r--r--pjlib/src/pjlib-test/test.h2
13 files changed, 952 insertions, 465 deletions
diff --git a/pjlib/build/Makefile b/pjlib/build/Makefile
index 0faadc15..6192ca0a 100644
--- a/pjlib/build/Makefile
+++ b/pjlib/build/Makefile
@@ -23,7 +23,7 @@ export PJLIB_SRCDIR = ../src/pj
export PJLIB_OBJS += $(OS_OBJS) $(M_OBJS) $(CC_OBJS) $(HOST_OBJS) \
array.o config.o ctype.o errno.o except.o fifobuf.o guid.o \
hash.o list.o lock.o log.o os_time_common.o \
- pool.o pool_caching.o rand.o \
+ pool.o pool_caching.o pool_dbg.o rand.o \
rbtree.o string.o timer.o \
types.o symbols.o
export PJLIB_CFLAGS += $(_CFLAGS)
@@ -34,7 +34,7 @@ export PJLIB_CFLAGS += $(_CFLAGS)
export TEST_SRCDIR = ../src/pjlib-test
export TEST_OBJS += atomic.o echo_clt.o errno.o exception.o \
fifobuf.o file.o \
- ioq_perf.o ioq_udp.o ioq_tcp.o \
+ ioq_perf.o ioq_udp.o ioq_unreg.o ioq_tcp.o \
list.o mutex.o os.o pool.o pool_perf.o rand.o rbtree.o \
select.o sleep.o sock.o sock_perf.o \
string.o test.o thread.o timer.o timestamp.o \
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp
index 7aa56669..62abfa9c 100644
--- a/pjlib/build/pjlib.dsp
+++ b/pjlib/build/pjlib.dsp
@@ -233,6 +233,13 @@ SOURCE=..\src\pj\ioqueue_common_abs.h
# Begin Source File
SOURCE=..\src\pj\ioqueue_select.c
+
+!IF "$(CFG)" == "pjlib - Win32 Release"
+
+!ELSEIF "$(CFG)" == "pjlib - Win32 Debug"
+
+!ENDIF
+
# End Source File
# Begin Source File
@@ -285,7 +292,7 @@ SOURCE=..\src\pj\pool_caching.c
# End Source File
# Begin Source File
-SOURCE=..\src\pj\pool_dbg_win32.c
+SOURCE=..\src\pj\pool_dbg.c
# End Source File
# Begin Source File
@@ -525,6 +532,10 @@ SOURCE=..\include\pj\pool.h
# End Source File
# Begin Source File
+SOURCE=..\include\pj\pool_alt.h
+# End Source File
+# Begin Source File
+
SOURCE=..\include\pj\rand.h
# End Source File
# Begin Source File
diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp
index 15bf0d2c..a9c41753 100644
--- a/pjlib/build/pjlib_test.dsp
+++ b/pjlib/build/pjlib_test.dsp
@@ -123,6 +123,10 @@ SOURCE="..\src\pjlib-test\ioq_udp.c"
# End Source File
# Begin Source File
+SOURCE="..\src\pjlib-test\ioq_unreg.c"
+# End Source File
+# Begin Source File
+
SOURCE="..\src\pjlib-test\list.c"
# End Source File
# Begin Source File
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 30e2602e..b128d810 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -27,25 +27,10 @@
* This file is NOT supposed to be compiled as stand-alone source.
*/
-static long ioqueue_tls_id = -1;
-
-typedef struct key_lock_data {
- struct key_lock_data *prev;
- pj_ioqueue_key_t *key;
- int is_alive;
-} key_lock_data;
-
-
static void ioqueue_init( pj_ioqueue_t *ioqueue )
{
ioqueue->lock = NULL;
ioqueue->auto_delete_lock = 0;
-
- if (ioqueue_tls_id == -1) {
- pj_status_t status;
- status = pj_thread_local_alloc(&ioqueue_tls_id);
- pj_thread_local_set(ioqueue_tls_id, NULL);
- }
}
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
@@ -93,11 +78,20 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_list_init(&key->write_list);
#if PJ_HAS_TCP
pj_list_init(&key->accept_list);
+ key->connecting = 0;
#endif
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Set initial reference count to 1 */
+ pj_assert(key->ref_count == 0);
+ ++key->ref_count;
+
+ key->closing = 0;
+#endif
+
/* Get socket type. When socket type is datagram, some optimization
* will be performed during send to allow parallel send operations.
*/
@@ -107,68 +101,14 @@ static pj_status_t ioqueue_init_key( pj_pool_t *pool,
if (rc != PJ_SUCCESS)
key->fd_type = PJ_SOCK_STREAM;
- key->inside_callback = 0;
- key->destroy_requested = 0;
-
/* Create mutex for the key. */
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+#if !PJ_IOQUEUE_HAS_SAFE_UNREG
+ rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
+#endif
return rc;
}
-/* Lock the key and also keep the lock data in thread local storage.
- * The lock data is used to detect if the key is deleted by application
- * when we call its callback.
- */
-static void lock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
- struct key_lock_data *prev_data;
-
- pj_mutex_lock(key->mutex);
- prev_data = (struct key_lock_data *)
- pj_thread_local_get(ioqueue_tls_id);
- lck->prev = prev_data;
- lck->key = key;
- lck->is_alive = 1;
- pj_thread_local_set(ioqueue_tls_id, lck);
-}
-
-/* Unlock the key only if it is still valid. */
-static void unlock_key(pj_ioqueue_key_t *key, key_lock_data *lck)
-{
- pj_assert( (void*)pj_thread_local_get(ioqueue_tls_id) == lck);
- pj_assert( lck->key == key );
- pj_thread_local_set(ioqueue_tls_id, lck->prev);
- if (lck->is_alive)
- pj_mutex_unlock(key->mutex);
-}
-
-/* Destroy key */
-static void ioqueue_destroy_key( pj_ioqueue_key_t *key )
-{
- key_lock_data *lck;
-
- /* Make sure that no other threads are doing something with
- * the key.
- */
- pj_mutex_lock(key->mutex);
-
- /* Check if this function is called within a callback context.
- * If so, then we need to inform the callback that the key has
- * been destroyed so that it doesn't attempt to unlock the
- * key's mutex.
- */
- lck = pj_thread_local_get(ioqueue_tls_id);
- while (lck) {
- if (lck->key == key) {
- lck->is_alive = 0;
- }
- lck = lck->prev;
- }
-
- pj_mutex_destroy(key->mutex);
-}
-
/*
* pj_ioqueue_get_user_data()
*
@@ -221,6 +161,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+# define IS_CLOSING(key) (key->closing)
+#else
+# define IS_CLOSING(key) (0)
+#endif
+
+
/*
* ioqueue_dispatch_event()
*
@@ -229,10 +176,13 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
*/
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
- key_lock_data lck_data;
-
/* Lock the key. */
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
+
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
if (h->connecting) {
@@ -245,8 +195,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //pj_mutex_unlock(h->mutex);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
/* from connect(2):
@@ -293,8 +241,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
}
#endif
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_connect_complete)
+ if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, bytes_transfered);
/* Done. */
@@ -319,7 +270,6 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- //pj_mutex_unlock(h->mutex);
}
/* Send the data.
@@ -365,19 +315,20 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- /* No need to hold mutex anymore */
- //pj_mutex_unlock(h->mutex);
}
+ /* No need to hold mutex anymore */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_write_complete) {
+ if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
} else {
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
/* Done. */
@@ -387,20 +338,21 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
-
- /* Finally unlock key */
- unlock_key(h, &lck_data);
}
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
{
- key_lock_data lck_data;
pj_status_t rc;
/* Lock the key. */
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
+
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
# if PJ_HAS_TCP
if (!pj_list_empty(&h->accept_list)) {
@@ -416,9 +368,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (pj_list_empty(&h->accept_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //pj_mutex_unlock(h->mutex);
-
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
if (rc==PJ_SUCCESS && accept_op->local_addr) {
@@ -427,8 +376,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
accept_op->addrlen);
}
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_accept_complete) {
+ if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
(*h->cb.on_accept_complete)(h,
(pj_ioqueue_op_key_t*)accept_op,
*accept_op->accept_fd, rc);
@@ -449,11 +401,6 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
if (pj_list_empty(&h->read_list))
ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
- /* Unlock; from this point we don't need to hold key's mutex. */
- //Crash as of revision 353 (since we added pjmedia socket to
- //main ioqueue).
- //pj_mutex_unlock(h->mutex);
-
bytes_read = read_op->size;
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
@@ -516,8 +463,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
bytes_read = -rc;
}
+ /* Unlock; from this point we don't need to hold key's mutex. */
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_read_complete) {
+ if (h->cb.on_read_complete && !IS_CLOSING(h)) {
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
@@ -529,44 +479,41 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
- //pj_mutex_unlock(h->mutex);
+ pj_mutex_unlock(h->mutex);
}
-
- /* Unlock handle. */
- unlock_key(h, &lck_data);
}
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
- key_lock_data lck_data;
-
- lock_key(h, &lck_data);
+ pj_mutex_lock(h->mutex);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
- //pj_mutex_unlock(h->mutex);
- unlock_key(h, &lck_data);
+ pj_mutex_unlock(h->mutex);
return;
}
+ if (h->closing) {
+ pj_mutex_unlock(h->mutex);
+ return;
+ }
+
/* Clear operation. */
h->connecting = 0;
- //pj_mutex_unlock(h->mutex);
-
ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+ pj_mutex_unlock(h->mutex);
+
/* Call callback. */
- if (h->cb.on_connect_complete)
+ if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, -1);
-
- unlock_key(h, &lck_data);
}
/*
@@ -588,6 +535,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
read_op = (struct read_operation*)op_key;
read_op->op = 0;
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
/* Try to see if there's data immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
@@ -646,6 +597,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
read_op = (struct read_operation*)op_key;
read_op->op = 0;
@@ -710,6 +665,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
write_op = (struct write_operation*)op_key;
write_op->op = 0;
@@ -788,6 +747,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
write_op = (struct write_operation*)op_key;
write_op->op = 0;
@@ -869,6 +832,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
accept_op = (struct accept_operation*)op_key;
accept_op->op = 0;
@@ -930,6 +897,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+ /* Check if key is closing. */
+ if (key->closing)
+ return PJ_ECANCELLED;
+
/* Check if socket has not been marked for connecting */
if (key->connecting != 0)
return PJ_EPENDING;
@@ -986,13 +957,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ssize_t bytes_status )
{
struct generic_operation *op_rec;
- key_lock_data lck_data;
/*
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
- lock_key(key, &lck_data);
+ pj_mutex_lock(key->mutex);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
@@ -1000,11 +970,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -1016,11 +984,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
@@ -1032,19 +998,17 @@ PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
- //pj_mutex_unlock(key->mutex);
+ pj_mutex_unlock(key->mutex);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
bytes_status);
-
- unlock_key(key, &lck_data);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
- unlock_key(key, &lck_data);
+ pj_mutex_unlock(key->mutex);
return PJ_EINVALIDOP;
}
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index 1ca70aa8..e7d05561 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -87,6 +87,16 @@ union operation_key
#endif
};
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+# define UNREG_FIELDS \
+ unsigned ref_count; \
+ pj_bool_t closing; \
+ pj_time_val free_time; \
+
+#else
+# define UNREG_FIELDS
+#endif
+
#define DECLARE_COMMON_KEY \
PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t); \
pj_ioqueue_t *ioqueue; \
@@ -100,7 +110,8 @@ union operation_key
int connecting; \
struct read_operation read_list; \
struct write_operation write_list; \
- struct accept_operation accept_list;
+ struct accept_operation accept_list; \
+ UNREG_FIELDS
#define DECLARE_COMMON_IOQUEUE \
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 16a511a8..4aa4f910 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -109,12 +109,18 @@ struct pj_ioqueue_t
DECLARE_COMMON_IOQUEUE
unsigned max, count;
- pj_ioqueue_key_t key_list;
+ pj_ioqueue_key_t active_list;
pj_fd_set_t rfdset;
pj_fd_set_t wfdset;
#if PJ_HAS_TCP
pj_fd_set_t xfdset;
#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_mutex_t *ref_cnt_mutex;
+ pj_ioqueue_key_t closing_list;
+ pj_ioqueue_key_t free_list;
+#endif
};
/* Include implementation for common abstraction after we declare
@@ -141,6 +147,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
{
pj_ioqueue_t *ioqueue;
pj_lock_t *lock;
+ unsigned i;
pj_status_t rc;
/* Check that arguments are valid. */
@@ -152,8 +159,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
+ /* Create and init common ioqueue stuffs */
ioqueue = pj_pool_alloc(pool, sizeof(pj_ioqueue_t));
-
ioqueue_init(ioqueue);
ioqueue->max = max_fd;
@@ -163,8 +170,49 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
#if PJ_HAS_TCP
PJ_FD_ZERO(&ioqueue->xfdset);
#endif
- pj_list_init(&ioqueue->key_list);
+ pj_list_init(&ioqueue->active_list);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* When safe unregistration is used (the default), we pre-create
+ * all keys and put them in the free list.
+ */
+
+ /* Mutex to protect key's reference counter
+ * We don't want to use key's mutex or ioqueue's mutex because
+ * that would create deadlock situation in some cases.
+ */
+ rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
+ if (rc != PJ_SUCCESS)
+ return rc;
+
+
+ /* Init key list */
+ pj_list_init(&ioqueue->free_list);
+ pj_list_init(&ioqueue->closing_list);
+
+ /* Pre-create all keys according to max_fd */
+ for (i=0; i<max_fd; ++i) {
+ pj_ioqueue_key_t *key;
+
+ key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
+ key->ref_count = 0;
+ rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ if (rc != PJ_SUCCESS) {
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ pj_mutex_destroy(ioqueue->ref_cnt_mutex);
+ return rc;
+ }
+
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+#endif
+
+ /* Create and init ioqueue mutex */
rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
@@ -186,9 +234,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
+ pj_ioqueue_key_t *key;
+
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
pj_lock_acquire(ioqueue->lock);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Destroy reference counters */
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ pj_mutex_destroy(ioqueue->ref_cnt_mutex);
+#endif
+
return ioqueue_destroy(ioqueue);
}
@@ -196,7 +270,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
/*
* pj_ioqueue_register_sock()
*
- * Register a handle to ioqueue.
+ * Register socket handle to ioqueue.
*/
PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
@@ -219,6 +293,28 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
goto on_return;
}
+ /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
+ * the key from the free list. Otherwise allocate a new one.
+ */
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_assert(!pj_list_empty(&ioqueue->free_list));
+ if (pj_list_empty(&ioqueue->free_list)) {
+ rc = PJ_ETOOMANY;
+ goto on_return;
+ }
+
+ key = ioqueue->free_list.next;
+ pj_list_erase(key);
+#else
+ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+#endif
+
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ if (rc != PJ_SUCCESS) {
+ key = NULL;
+ goto on_return;
+ }
+
/* Set socket to nonblocking. */
value = 1;
#if defined(PJ_WIN32) && PJ_WIN32!=0 || \
@@ -231,16 +327,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
goto on_return;
}
- /* Create key. */
- key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
- if (rc != PJ_SUCCESS) {
- key = NULL;
- goto on_return;
- }
- /* Register */
- pj_list_insert_before(&ioqueue->key_list, key);
+ /* Put in active list. */
+ pj_list_insert_before(&ioqueue->active_list, key);
++ioqueue->count;
on_return:
@@ -251,6 +340,41 @@ on_return:
return rc;
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Increment key's reference counter */
+static void increment_counter(pj_ioqueue_key_t *key)
+{
+ pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
+ ++key->ref_count;
+ pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
+}
+
+/* Decrement the key's reference counter, and when the counter reach zero,
+ * destroy the key.
+ *
+ * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
+ */
+static void decrement_counter(pj_ioqueue_key_t *key)
+{
+ pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
+ --key->ref_count;
+ if (key->ref_count == 0) {
+
+ pj_assert(key->closing == 1);
+ pj_gettimeofday(&key->free_time);
+ key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
+ pj_time_val_normalize(&key->free_time);
+
+ pj_lock_acquire(key->ioqueue->lock);
+ pj_list_erase(key);
+ pj_list_push_back(&key->ioqueue->closing_list, key);
+ pj_lock_release(key->ioqueue->lock);
+ }
+ pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
+}
+#endif
+
+
/*
* pj_ioqueue_unregister()
*
@@ -264,6 +388,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
ioqueue = key->ioqueue;
+ /* Lock the key to make sure no callback is simultaneously modifying
+ * the key. We need to lock the key before ioqueue here to prevent
+ * deadlock.
+ */
+ pj_mutex_lock(key->mutex);
+
+ /* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
pj_assert(ioqueue->count > 0);
@@ -275,15 +406,32 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
PJ_FD_CLR(key->fd, &ioqueue->xfdset);
#endif
- /* ioqueue_destroy may try to acquire key's mutex.
- * Since normally the order of locking is to lock key's mutex first
- * then ioqueue's mutex, ioqueue_destroy may deadlock unless we
- * release ioqueue's mutex first.
+ /* Close socket. */
+ pj_sock_close(key->fd);
+
+ /* Clear callback */
+ key->cb.on_accept_complete = NULL;
+ key->cb.on_connect_complete = NULL;
+ key->cb.on_read_complete = NULL;
+ key->cb.on_write_complete = NULL;
+
+ /* Must release ioqueue lock first before decrementing counter, to
+ * prevent deadlock.
*/
pj_lock_release(ioqueue->lock);
- /* Destroy the key. */
- ioqueue_destroy_key(key);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Mark key is closing. */
+ key->closing = 1;
+
+ /* Decrement counter. */
+ decrement_counter(key);
+
+ /* Done. */
+ pj_mutex_unlock(key->mutex);
+#else
+ pj_mutex_destroy(key->mutex);
+#endif
return PJ_SUCCESS;
}
@@ -308,8 +456,8 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
*/
pj_assert(0);
- key = ioqueue->key_list.next;
- while (key != &ioqueue->key_list) {
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
if (!pj_list_empty(&key->read_list)
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
|| !pj_list_empty(&key->accept_list)
@@ -395,6 +543,30 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
pj_lock_release(ioqueue->lock);
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan closing keys to be put to free list again */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+ pj_time_val now;
+ pj_ioqueue_key_t *h;
+
+ pj_gettimeofday(&now);
+ h = ioqueue->closing_list.next;
+ while (h != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = h->next;
+
+ pj_assert(h->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, h->free_time)) {
+ pj_list_erase(h);
+ pj_list_push_back(&ioqueue->free_list, h);
+ }
+ h = next;
+ }
+}
+#endif
+
+
/*
* pj_ioqueue_poll()
*
@@ -435,7 +607,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
PJ_FD_COUNT(&ioqueue->wfdset)==0 &&
PJ_FD_COUNT(&ioqueue->xfdset)==0)
{
- pj_lock_release(ioqueue->lock);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ scan_closing_keys(ioqueue);
+#endif
+ pj_lock_release(ioqueue->lock);
if (timeout)
pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
return 0;
@@ -475,11 +650,15 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
- h = ioqueue->key_list.next;
- for ( ; h!=&ioqueue->key_list && counter<count; h = h->next) {
+ h = ioqueue->active_list.next;
+ for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
+
if ( (key_has_pending_write(h) || key_has_pending_connect(h))
- && PJ_FD_ISSET(h->fd, &wfdset))
+ && PJ_FD_ISSET(h->fd, &wfdset) && !h->closing)
{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
event[counter].key = h;
event[counter].event_type = WRITEABLE_EVENT;
++counter;
@@ -487,15 +666,23 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Scan for readable socket. */
if ((key_has_pending_read(h) || key_has_pending_accept(h))
- && PJ_FD_ISSET(h->fd, &rfdset))
+ && PJ_FD_ISSET(h->fd, &rfdset) && !h->closing)
{
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
event[counter].key = h;
event[counter].event_type = READABLE_EVENT;
++counter;
}
#if PJ_HAS_TCP
- if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset)) {
+ if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
+ !h->closing)
+ {
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
event[counter].key = h;
event[counter].event_type = EXCEPTION_EVENT;
++counter;
@@ -525,8 +712,13 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_assert(!"Invalid event!");
break;
}
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ decrement_counter(event[counter].key);
+#endif
}
+
return count;
}
diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c
index 40dd31bb..4586c5ca 100644
--- a/pjlib/src/pj/ioqueue_winnt.c
+++ b/pjlib/src/pj/ioqueue_winnt.c
@@ -106,15 +106,24 @@ enum { POST_QUIT_LEN = 0xFFFFDEADUL };
*/
struct pj_ioqueue_key_t
{
+ PJ_DECL_LIST_MEMBER(struct pj_ioqueue_key_t);
+
pj_ioqueue_t *ioqueue;
HANDLE hnd;
void *user_data;
enum handle_type hnd_type;
+ pj_ioqueue_callback cb;
+
#if PJ_HAS_TCP
int connecting;
#endif
- pj_ioqueue_callback cb;
- pj_bool_t has_quit_signal;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_atomic_t *ref_count;
+ pj_bool_t closing;
+ pj_time_val free_time;
+#endif
+
};
/*
@@ -125,9 +134,17 @@ struct pj_ioqueue_t
HANDLE iocp;
pj_lock_t *lock;
pj_bool_t auto_delete_lock;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_ioqueue_key_t active_list;
+ pj_ioqueue_key_t free_list;
+ pj_ioqueue_key_t closing_list;
+#endif
+
+ /* These are to keep track of connecting sockets */
+#if PJ_HAS_TCP
unsigned event_count;
HANDLE event_pool[MAXIMUM_WAIT_OBJECTS+1];
-#if PJ_HAS_TCP
unsigned connecting_count;
HANDLE connecting_handles[MAXIMUM_WAIT_OBJECTS+1];
pj_ioqueue_key_t *connecting_keys[MAXIMUM_WAIT_OBJECTS+1];
@@ -279,6 +296,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_ioqueue_t **p_ioqueue)
{
pj_ioqueue_t *ioqueue;
+ unsigned i;
pj_status_t rc;
PJ_UNUSED_ARG(max_fd);
@@ -290,11 +308,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
sizeof(union operation_key), PJ_EBUG);
+ /* Create IOCP */
ioqueue = pj_pool_zalloc(pool, sizeof(*ioqueue));
ioqueue->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ioqueue->iocp == NULL)
return PJ_RETURN_OS_ERROR(GetLastError());
+ /* Create IOCP mutex */
rc = pj_lock_create_simple_mutex(pool, NULL, &ioqueue->lock);
if (rc != PJ_SUCCESS) {
CloseHandle(ioqueue->iocp);
@@ -303,6 +323,38 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
ioqueue->auto_delete_lock = PJ_TRUE;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /*
+ * Create and initialize key pools.
+ */
+ pj_list_init(&ioqueue->active_list);
+ pj_list_init(&ioqueue->free_list);
+ pj_list_init(&ioqueue->closing_list);
+
+ /* Preallocate keys according to max_fd setting, and put them
+ * in free_list.
+ */
+ for (i=0; i<max_fd; ++i) {
+ pj_ioqueue_key_t *key;
+
+ key = pj_pool_alloc(pool, sizeof(pj_ioqueue_key_t));
+
+ rc = pj_atomic_create(pool, 0, &key->ref_count);
+ if (rc != PJ_SUCCESS) {
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+ CloseHandle(ioqueue->iocp);
+ return rc;
+ }
+
+ pj_list_push_back(&ioqueue->free_list, key);
+
+ }
+#endif
+
*p_ioqueue = ioqueue;
PJ_LOG(4, ("pjlib", "WinNT IOCP I/O Queue created (%p)", ioqueue));
@@ -315,19 +367,45 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
PJ_DEF(pj_status_t) pj_ioqueue_destroy( pj_ioqueue_t *ioqueue )
{
unsigned i;
+ pj_ioqueue_key_t *key;
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_HAS_TCP
/* Destroy events in the pool */
for (i=0; i<ioqueue->event_count; ++i) {
CloseHandle(ioqueue->event_pool[i]);
}
ioqueue->event_count = 0;
+#endif
if (CloseHandle(ioqueue->iocp) != TRUE)
return PJ_RETURN_OS_ERROR(GetLastError());
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Destroy reference counters */
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_atomic_destroy(key->ref_count);
+ key = key->next;
+ }
+#endif
+
if (ioqueue->auto_delete_lock)
pj_lock_destroy(ioqueue->lock);
@@ -370,28 +448,64 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
PJ_ASSERT_RETURN(pool && ioqueue && cb && key, PJ_EINVAL);
- /* Build the key for this socket. */
+ pj_lock_acquire(ioqueue->lock);
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* If safe unregistration is used, then get the key record from
+ * the free list.
+ */
+ if (pj_list_empty(&ioqueue->free_list)) {
+ pj_lock_release(ioqueue->lock);
+ return PJ_ETOOMANY;
+ }
+
+ rec = ioqueue->free_list.next;
+ pj_list_erase(rec);
+
+ /* Set initial reference count to 1 */
+ pj_assert(pj_atomic_get(rec->ref_count) == 0);
+ pj_atomic_inc(rec->ref_count);
+
+ rec->closing = 0;
+
+#else
rec = pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+#endif
+
+ /* Build the key for this socket. */
rec->ioqueue = ioqueue;
rec->hnd = (HANDLE)sock;
rec->hnd_type = HND_IS_SOCKET;
rec->user_data = user_data;
pj_memcpy(&rec->cb, cb, sizeof(pj_ioqueue_callback));
+#if PJ_HAS_TCP
+ rec->connecting = 0;
+#endif
+
/* Set socket to nonblocking. */
value = 1;
rc = ioctlsocket(sock, FIONBIO, &value);
if (rc != 0) {
+ pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(WSAGetLastError());
}
/* Associate with IOCP */
hioq = CreateIoCompletionPort((HANDLE)sock, ioqueue->iocp, (DWORD)rec, 0);
if (!hioq) {
+ pj_lock_release(ioqueue->lock);
return PJ_RETURN_OS_ERROR(GetLastError());
}
*key = rec;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_list_push_back(&ioqueue->active_list, rec);
+#endif
+
+ pj_lock_release(ioqueue->lock);
+
return PJ_SUCCESS;
}
@@ -422,9 +536,31 @@ PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Decrement the key's reference counter, and when the counter reach zero,
+ * destroy the key.
+ */
+static void decrement_counter(pj_ioqueue_key_t *key)
+{
+ if (pj_atomic_dec_and_get(key->ref_count) == 0) {
+
+ pj_lock_acquire(key->ioqueue->lock);
+
+ pj_assert(key->closing == 1);
+ pj_gettimeofday(&key->free_time);
+ key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
+ pj_time_val_normalize(&key->free_time);
+
+ pj_list_erase(key);
+ pj_list_push_back(&key->ioqueue->closing_list, key);
+
+ pj_lock_release(key->ioqueue->lock);
+ }
+}
+#endif
/*
- * Internal function to poll the I/O Completion Port, execute callback,
+ * Poll the I/O Completion Port, execute callback,
* and return the key and bytes transfered of the last operation.
*/
static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
@@ -457,16 +593,16 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
if (p_key)
*p_key = key;
- /* If size_status is POST_QUIT_LEN, mark the key as quitting */
- if (size_status == POST_QUIT_LEN) {
- key->has_quit_signal = 1;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* We shouldn't call callbacks if key is quitting. */
+ if (key->closing)
return PJ_TRUE;
- }
- /* We shouldn't call callbacks if key is quitting.
- * But this should have been taken care by unregister function
- * (the unregister function should have cleared out the callbacks)
+ /* Increment reference counter to prevent this key from being
+ * deleted
*/
+ pj_atomic_inc(key->ref_count);
+#endif
/* Carry out the callback */
switch (pOv->operation) {
@@ -504,6 +640,11 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
pj_assert(0);
break;
}
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ decrement_counter(key);
+#endif
+
return PJ_TRUE;
}
@@ -516,11 +657,6 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout,
*/
PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
{
- pj_ssize_t polled_len;
- pj_ioqueue_key_t *polled_key;
- generic_overlapped ov;
- BOOL rc;
-
PJ_ASSERT_RETURN(key, PJ_EINVAL);
#if PJ_HAS_TCP
@@ -542,53 +678,35 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key )
pj_lock_release(ioqueue->lock);
}
#endif
-
-
- /* Unregistering handle from IOCP is pretty tricky.
- *
- * Even after the socket has been closed, GetQueuedCompletionStatus
- * may still return events for the handle. This will likely to
- * cause crash in pjlib, because the key associated with the handle
- * most likely will have been destroyed.
- *
- * The solution is to poll the IOCP until we're sure that there are
- * no further events for the handle.
+
+ /* Close handle (the only way to disassociate handle from IOCP).
+ * We also need to close handle to make sure that no further events
+ * will come to the handle.
*/
+ CloseHandle(key->hnd);
- /* Clear up callbacks for the key.
- * We don't want the callback to be called for this key.
- */
- key->cb.on_read_complete = NULL;
- key->cb.on_write_complete = NULL;
+ /* Reset callbacks */
key->cb.on_accept_complete = NULL;
key->cb.on_connect_complete = NULL;
+ key->cb.on_read_complete = NULL;
+ key->cb.on_write_complete = NULL;
- /* Init overlapped struct */
- pj_memset(&ov, 0, sizeof(ov));
- ov.operation = PJ_IOQUEUE_OP_READ;
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Mark key as closing. */
+ key->closing = 1;
- /* Post queued completion status with a special length. */
- rc = PostQueuedCompletionStatus( key->ioqueue->iocp, (DWORD)POST_QUIT_LEN,
- (DWORD)key, &ov.overlapped);
+ /* Decrement reference counter. */
+ decrement_counter(key);
- /* Poll IOCP until has_quit_signal is set in the key.
- * The has_quit_signal flag is set in poll_iocp() when POST_QUIT_LEN
- * is detected. We need to have this flag because POST_QUIT_LEN may be
- * detected by other threads.
+ /* Even after handle is closed, I suspect that IOCP may still try to
+ * do something with the handle, causing memory corruption when pool
+ * debugging is enabled.
+ *
+ * Forcing context switch seems to have fixed that, but this is quite
+ * an ugly solution..
*/
- do {
- polled_len = 0;
- polled_key = NULL;
-
- rc = poll_iocp(key->ioqueue->iocp, 0, &polled_len, &polled_key);
-
- } while (rc && !key->has_quit_signal);
-
-
- /* Close handle if this is a file. */
- if (key->hnd_type == HND_IS_FILE) {
- CloseHandle(key->hnd);
- }
+ pj_thread_sleep(0);
+#endif
return PJ_SUCCESS;
}
@@ -602,23 +720,57 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
DWORD dwMsec;
int connect_count = 0;
- pj_bool_t has_event;
+ int event_count = 0;
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
- /* Check the connecting array. */
-#if PJ_HAS_TCP
- connect_count = check_connecting(ioqueue);
-#endif
-
/* Calculate miliseconds timeout for GetQueuedCompletionStatus */
dwMsec = timeout ? timeout->sec*1000 + timeout->msec : INFINITE;
/* Poll for completion status. */
- has_event = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+ event_count = poll_iocp(ioqueue->iocp, dwMsec, NULL, NULL);
+
+#if PJ_HAS_TCP
+ /* Check the connecting array, only when there's no activity. */
+ if (event_count == 0) {
+ connect_count = check_connecting(ioqueue);
+ if (connect_count > 0)
+ event_count += connect_count;
+ }
+#endif
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check the closing keys only when there's no activity and when there are
+ * pending closing keys.
+ */
+ if (event_count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
+ pj_time_val now;
+ pj_ioqueue_key_t *key;
+
+ pj_gettimeofday(&now);
+
+ /* Move closing keys to free list when they've finished the closing
+ * idle time.
+ */
+ pj_lock_acquire(ioqueue->lock);
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = key->next;
+
+ pj_assert(key->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, key->free_time)) {
+ pj_list_erase(key);
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+ key = next;
+ }
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
/* Return number of events. */
- return connect_count + has_event;
+ return event_count;
}
/*
@@ -645,6 +797,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
@@ -715,6 +873,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && buffer, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
op_key_rec->overlapped.wsabuf.buf = buffer;
op_key_rec->overlapped.wsabuf.len = *length;
@@ -799,7 +963,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && data, PJ_EINVAL);
-
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
op_key_rec = (union operation_key*)op_key->internal__;
/*
@@ -872,6 +1042,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
/*
* See if there is a new connection immediately available.
*/
@@ -962,6 +1138,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
PJ_CHECK_STACK();
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check key is not closing */
+ if (key->closing)
+ return PJ_ECANCELLED;
+#endif
+
/* Initiate connect() */
if (connect((pj_sock_t)key->hnd, addr, addrlen) != 0) {
DWORD dwStatus;
diff --git a/pjlib/src/pj/pool_dbg_win32.c b/pjlib/src/pj/pool_dbg_win32.c
deleted file mode 100644
index a3ccf3ed..00000000
--- a/pjlib/src/pj/pool_dbg_win32.c
+++ /dev/null
@@ -1,237 +0,0 @@
-/* $Id$ */
-/*
- * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-#include <pj/pool.h>
-
-/* Only if we ARE debugging memory allocations. */
-#if PJ_POOL_DEBUG
-
-#include <pj/list.h>
-#include <pj/log.h>
-
-#include <stdlib.h>
-#include <stdio.h>
-#define WIN32_LEAN_AND_MEAN
-#include <windows.h>
-
-typedef struct memory_entry
-{
- PJ_DECL_LIST_MEMBER(struct memory_entry)
- void *ptr;
- char *file;
- int line;
-} memory_entry;
-
-struct pj_pool_t
-{
- char obj_name[32];
- HANDLE hHeap;
- memory_entry first;
- pj_size_t initial_size;
- pj_size_t increment;
- pj_size_t used_size;
- char *file;
- int line;
-};
-
-PJ_DEF(void) pj_pool_set_functions( void *(*malloc_func)(pj_size_t),
- void (*free_func)(void *ptr, pj_size_t))
-{
- /* Ignored. */
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(malloc_func)
- PJ_UNUSED_ARG(free_func)
-}
-
-PJ_DEF(pj_pool_t*) pj_pool_create_dbg( const char *name,
- pj_size_t initial_size,
- pj_size_t increment_size,
- pj_pool_callback *callback,
- char *file, int line)
-{
- pj_pool_t *pool;
- HANDLE hHeap;
-
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(callback)
-
- /* Create Win32 heap for the pool. */
- hHeap = HeapCreate(HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE,
- initial_size, 0);
- if (!hHeap) {
- return NULL;
- }
-
-
- /* Create and initialize the pool structure. */
- pool = HeapAlloc(hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE,
- sizeof(*pool));
- memset(pool, 0, sizeof(*pool));
- pool->file = file;
- pool->line = line;
- pool->hHeap = hHeap;
- pool->initial_size = initial_size;
- pool->increment = increment_size;
- pool->used_size = 0;
-
- /* Set name. */
- if (name) {
- if (strchr(name, '%') != NULL) {
- sprintf(pool->obj_name, name, pool);
- } else {
- strncpy(pool->obj_name, name, PJ_MAX_OBJ_NAME);
- }
- } else {
- pool->obj_name[0] = '\0';
- }
-
- /* List pool's entry. */
- pj_list_init(&pool->first);
-
- PJ_LOG(3,(pool->obj_name, "Pool created"));
- return pool;
-}
-
-PJ_DEF(void) pj_pool_destroy( pj_pool_t *pool )
-{
- memory_entry *entry;
-
- PJ_CHECK_STACK();
-
- PJ_LOG(3,(pool->obj_name, "Destoying pool, init_size=%u, used=%u",
- pool->initial_size, pool->used_size));
-
- if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, pool)) {
- PJ_LOG(2,(pool->obj_name, "Corrupted pool structure, allocated in %s:%d",
- pool->file, pool->line));
- }
-
- /* Validate all memory entries in the pool. */
- for (entry=pool->first.next; entry != &pool->first; entry = entry->next) {
- if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, entry)) {
- PJ_LOG(2,(pool->obj_name, "Corrupted pool entry, allocated in %s:%d",
- entry->file, entry->line));
- }
-
- if (!HeapValidate( pool->hHeap, HEAP_NO_SERIALIZE, entry->ptr)) {
- PJ_LOG(2,(pool->obj_name, "Corrupted pool memory, allocated in %s:%d",
- entry->file, entry->line));
- }
- }
-
- /* Destroy heap. */
- HeapDestroy(pool->hHeap);
-}
-
-PJ_DEF(void) pj_pool_reset( pj_pool_t *pool )
-{
- /* Do nothing. */
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool)
-}
-
-PJ_DEF(pj_size_t) pj_pool_get_capacity( pj_pool_t *pool )
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool)
- return 0;
-}
-
-PJ_DEF(pj_size_t) pj_pool_get_used_size( pj_pool_t *pool )
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool)
- return 0;
-}
-
-PJ_DEF(pj_size_t) pj_pool_get_request_count( pj_pool_t *pool )
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool)
- return 0;
-}
-
-PJ_DEF(void*) pj_pool_alloc_dbg( pj_pool_t *pool, pj_size_t size,
- char *file, int line)
-{
- memory_entry *entry;
- int entry_size;
-
- PJ_CHECK_STACK();
-
- entry_size = sizeof(*entry);
- entry = HeapAlloc(pool->hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE,
- entry_size);
- entry->file = file;
- entry->line = line;
- entry->ptr = HeapAlloc(pool->hHeap, HEAP_GENERATE_EXCEPTIONS|HEAP_NO_SERIALIZE,
- size);
- pj_list_insert_before( &pool->first, entry);
-
- pool->used_size += size;
- return entry->ptr;
-}
-
-PJ_DEF(void*) pj_pool_calloc_dbg( pj_pool_t *pool, pj_size_t count, pj_size_t elem,
- char *file, int line)
-{
- void *ptr;
-
- PJ_CHECK_STACK();
-
- ptr = pj_pool_alloc_dbg(pool, count*elem, file, line);
- memset(ptr, 0, count*elem);
- return ptr;
-}
-
-
-PJ_DEF(void) pj_pool_pool_init( pj_pool_pool_t *pool_pool,
- pj_size_t max_capacity)
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool_pool)
- PJ_UNUSED_ARG(max_capacity)
-}
-
-PJ_DEF(void) pj_pool_pool_destroy( pj_pool_pool_t *pool_pool )
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool_pool)
-}
-
-PJ_DEF(pj_pool_t*) pj_pool_pool_create_pool( pj_pool_pool_t *pool_pool,
- const char *name,
- pj_size_t initial_size,
- pj_size_t increment_size,
- pj_pool_callback *callback)
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool_pool)
- return pj_pool_create(name, initial_size, increment_size, callback);
-}
-
-PJ_DEF(void) pj_pool_pool_release_pool( pj_pool_pool_t *pool_pool,
- pj_pool_t *pool )
-{
- PJ_CHECK_STACK();
- PJ_UNUSED_ARG(pool_pool)
- pj_pool_destroy(pool);
-}
-
-
-#endif /* PJ_POOL_DEBUG */
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index bf0e6273..fea9c184 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -398,8 +398,6 @@ static int perform_test(int sock_type, const char *type_name,
for (i=0; i<sockpair_cnt; ++i) {
pj_ioqueue_unregister(items[i].server_key);
pj_ioqueue_unregister(items[i].client_key);
- pj_sock_close(items[i].server_fd);
- pj_sock_close(items[i].client_fd);
}
/* Destroy threads */
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index b81764f2..89e2a1e8 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -457,7 +457,6 @@ static int unregister_test(void)
/* Now unregister and close socket. */
pj_ioqueue_unregister(key);
- pj_sock_close(rsock);
/* Poll ioqueue. */
timeout.sec = 1; timeout.msec = 0;
@@ -539,10 +538,6 @@ static int many_handles_test(void)
if (rc != PJ_SUCCESS) {
app_perror("...error in pj_ioqueue_unregister", rc);
}
- rc = pj_sock_close(sock[i]);
- if (rc != PJ_SUCCESS) {
- app_perror("...error in pj_sock_close", rc);
- }
}
rc = pj_ioqueue_destroy(ioqueue);
diff --git a/pjlib/src/pjlib-test/ioq_unreg.c b/pjlib/src/pjlib-test/ioq_unreg.c
new file mode 100644
index 00000000..475d62fe
--- /dev/null
+++ b/pjlib/src/pjlib-test/ioq_unreg.c
@@ -0,0 +1,361 @@
+/* $Id$ */
+/*
+ * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include "test.h"
+
+#if INCLUDE_IOQUEUE_UNREG_TEST
+/*
+ * This tests the thread safety of ioqueue unregistration operation.
+ */
+
+#include <pj/errno.h>
+#include <pj/ioqueue.h>
+#include <pj/log.h>
+#include <pj/os.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/compat/socket.h>
+#include <pj/string.h>
+
+
+#define THIS_FILE "ioq_unreg.c"
+
+
+enum test_method
+{
+ UNREGISTER_IN_APP,
+ UNREGISTER_IN_CALLBACK,
+};
+
+static int thread_quitting;
+static enum test_method test_method;
+static pj_time_val time_to_unregister;
+
+struct sock_data
+{
+ pj_sock_t sock;
+ pj_sock_t csock;
+ pj_pool_t *pool;
+ pj_ioqueue_key_t *key;
+ pj_mutex_t *mutex;
+ pj_ioqueue_op_key_t *op_key;
+ char *buffer;
+ pj_size_t bufsize;
+ pj_bool_t unregistered;
+ unsigned received;
+} sock_data;
+
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ pj_ssize_t size;
+ char *sendbuf = "Hello world";
+ pj_status_t status;
+
+ if (sock_data.unregistered)
+ return;
+
+ pj_mutex_lock(sock_data.mutex);
+
+ if (sock_data.unregistered) {
+ /* No need to unlock. Mutex may have been destroyed */
+ return;
+ }
+
+ if (bytes_read < 0) {
+ if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK))
+ app_perror("ioqueue reported recv error", -bytes_read);
+ } else {
+ sock_data.received += bytes_read;
+ }
+
+ if (test_method == UNREGISTER_IN_CALLBACK) {
+ pj_time_val now;
+
+ pj_gettimeofday(&now);
+ if (PJ_TIME_VAL_GTE(now, time_to_unregister)) {
+ sock_data.unregistered = 1;
+ pj_ioqueue_unregister(key);
+ pj_mutex_destroy(sock_data.mutex);
+ pj_pool_release(sock_data.pool);
+ sock_data.pool = NULL;
+ return;
+ }
+ }
+
+ do {
+ size = sock_data.bufsize;
+ status = pj_ioqueue_recv(key, op_key, sock_data.buffer, &size, 0);
+ if (status != PJ_EPENDING && status != PJ_SUCCESS)
+ app_perror("recv() error", status);
+
+ } while (status == PJ_SUCCESS);
+
+ pj_mutex_unlock(sock_data.mutex);
+
+ size = pj_ansi_strlen(sendbuf);
+ status = pj_sock_send(sock_data.csock, sendbuf, &size, 0);
+ if (status != PJ_SUCCESS)
+ app_perror("send() error", status);
+
+ size = pj_ansi_strlen(sendbuf);
+ status = pj_sock_send(sock_data.csock, sendbuf, &size, 0);
+ if (status != PJ_SUCCESS)
+ app_perror("send() error", status);
+
+}
+
+static int worker_thread(void *arg)
+{
+ pj_ioqueue_t *ioqueue = arg;
+
+ while (!thread_quitting) {
+ pj_time_val timeout = { 0, 20 };
+ pj_ioqueue_poll(ioqueue, &timeout);
+ }
+
+ return 0;
+}
+
+/*
+ * Perform unregistration test.
+ *
+ * This will create ioqueue and register a server socket. Depending
+ * on the test method, either the callback or the main thread will
+ * unregister and destroy the server socket after some period of time.
+ */
+static int perform_unreg_test(pj_ioqueue_t *ioqueue,
+ pj_pool_t *test_pool,
+ const char *title,
+ pj_bool_t other_socket)
+{
+ enum { WORKER_CNT = 1, MSEC = 500, QUIT_MSEC = 500 };
+ int i;
+ pj_thread_t *thread[WORKER_CNT];
+ struct sock_data osd;
+ pj_ioqueue_callback callback;
+ pj_time_val end_time;
+ pj_status_t status;
+
+
+ /* Sometimes its important to have other sockets registered to
+ * the ioqueue, because when no sockets are registered, the ioqueue
+ * will return from the poll early.
+ */
+ if (other_socket) {
+ status = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, 56127, &osd.sock);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error creating other socket", status);
+ return -12;
+ }
+
+ pj_memset(&callback, 0, sizeof(callback));
+ status = pj_ioqueue_register_sock(test_pool, ioqueue, osd.sock,
+ NULL, &callback, &osd.key);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error registering other socket", status);
+ return -13;
+ }
+
+ } else {
+ osd.key = NULL;
+ osd.sock = PJ_INVALID_SOCKET;
+ }
+
+ /* Init both time duration of testing */
+ thread_quitting = 0;
+ pj_gettimeofday(&time_to_unregister);
+ time_to_unregister.msec += MSEC;
+ pj_time_val_normalize(&time_to_unregister);
+
+ end_time = time_to_unregister;
+ end_time.msec += QUIT_MSEC;
+ pj_time_val_normalize(&end_time);
+
+
+ /* Create polling thread */
+ for (i=0; i<WORKER_CNT; ++i) {
+ status = pj_thread_create(test_pool, "unregtest", &worker_thread,
+ ioqueue, 0, 0, &thread[i]);
+ if (status != PJ_SUCCESS) {
+ app_perror("Error creating thread", status);
+ return -20;
+ }
+ }
+
+ /* Create pair of client/server sockets */
+ status = app_socketpair(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
+ &sock_data.sock, &sock_data.csock);
+ if (status != PJ_SUCCESS) {
+ app_perror("app_socketpair error", status);
+ return -30;
+ }
+
+
+ /* Initialize test data */
+ sock_data.pool = pj_pool_create(mem, "sd", 1000, 1000, NULL);
+ sock_data.buffer = pj_pool_alloc(sock_data.pool, 128);
+ sock_data.bufsize = 128;
+ sock_data.op_key = pj_pool_alloc(sock_data.pool,
+ sizeof(*sock_data.op_key));
+ sock_data.received = 0;
+ sock_data.unregistered = 0;
+
+ pj_ioqueue_op_key_init(sock_data.op_key, sizeof(*sock_data.op_key));
+
+ status = pj_mutex_create_simple(sock_data.pool, "sd", &sock_data.mutex);
+ if (status != PJ_SUCCESS) {
+ app_perror("create_mutex() error", status);
+ return -35;
+ }
+
+ /* Register socket to ioqueue */
+ pj_memset(&callback, 0, sizeof(callback));
+ callback.on_read_complete = &on_read_complete;
+ status = pj_ioqueue_register_sock(sock_data.pool, ioqueue, sock_data.sock,
+ NULL, &callback, &sock_data.key);
+ if (status != PJ_SUCCESS) {
+ app_perror("pj_ioqueue_register error", status);
+ return -40;
+ }
+
+ /* Bootstrap the first send/receive */
+ on_read_complete(sock_data.key, sock_data.op_key, 0);
+
+ /* Loop until test time ends */
+ for (;;) {
+ pj_time_val now, timeout;
+
+ pj_gettimeofday(&now);
+
+ if (test_method == UNREGISTER_IN_APP &&
+ PJ_TIME_VAL_GTE(now, time_to_unregister) &&
+ sock_data.pool)
+ {
+ pj_mutex_lock(sock_data.mutex);
+
+ sock_data.unregistered = 1;
+ pj_ioqueue_unregister(sock_data.key);
+ pj_mutex_destroy(sock_data.mutex);
+ pj_pool_release(sock_data.pool);
+ sock_data.pool = NULL;
+ }
+
+ if (PJ_TIME_VAL_GT(now, end_time) && sock_data.unregistered)
+ break;
+
+ timeout.sec = 0; timeout.msec = 10;
+ pj_ioqueue_poll(ioqueue, &timeout);
+ //pj_thread_sleep(1);
+
+ }
+
+ thread_quitting = 1;
+
+ for (i=0; i<WORKER_CNT; ++i) {
+ pj_thread_join(thread[i]);
+ pj_thread_destroy(thread[i]);
+ }
+
+ if (other_socket) {
+ pj_ioqueue_unregister(osd.key);
+ }
+
+ pj_sock_close(sock_data.csock);
+
+ PJ_LOG(3,(THIS_FILE, "....%s: done (%d KB/s)",
+ title, sock_data.received * 1000 / MSEC / 1000));
+ return 0;
+}
+
+int udp_ioqueue_unreg_test(void)
+{
+ enum { LOOP = 10 };
+ int i, rc;
+ char title[30];
+ pj_ioqueue_t *ioqueue;
+ pj_pool_t *test_pool;
+
+ test_method = UNREGISTER_IN_APP;
+
+ test_pool = pj_pool_create(mem, "unregtest", 4000, 4000, NULL);
+
+ rc = pj_ioqueue_create(test_pool, 16, &ioqueue);
+ if (rc != PJ_SUCCESS) {
+ app_perror("Error creating ioqueue", rc);
+ return -10;
+ }
+
+
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 0/3 (%s)",
+ pj_ioqueue_name()));
+ for (i=0; i<LOOP; ++i) {
+ pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
+ rc = perform_unreg_test(ioqueue, test_pool, title, 0);
+ if (rc != 0)
+ return rc;
+ }
+
+
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 1/3 (%s)",
+ pj_ioqueue_name()));
+ for (i=0; i<LOOP; ++i) {
+ pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
+ rc = perform_unreg_test(ioqueue, test_pool, title, 1);
+ if (rc != 0)
+ return rc;
+ }
+
+ test_method = UNREGISTER_IN_CALLBACK;
+
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 2/3 (%s)",
+ pj_ioqueue_name()));
+ for (i=0; i<LOOP; ++i) {
+ pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
+ rc = perform_unreg_test(ioqueue, test_pool, title, 0);
+ if (rc != 0)
+ return rc;
+ }
+
+
+ PJ_LOG(3, (THIS_FILE, "...ioqueue unregister stress test 3/3 (%s)",
+ pj_ioqueue_name()));
+ for (i=0; i<LOOP; ++i) {
+ pj_ansi_sprintf(title, "repeat %d/%d", i, LOOP);
+ rc = perform_unreg_test(ioqueue, test_pool, title, 1);
+ if (rc != 0)
+ return rc;
+ }
+
+ pj_ioqueue_destroy(ioqueue);
+ pj_pool_release(test_pool);
+
+ return 0;
+}
+
+
+
+#else
+/* To prevent warning about "translation unit is empty"
+ * when this test is disabled.
+ */
+int dummy_uiq_unreg;
+#endif /* INCLUDE_IOQUEUE_UNREG_TEST */
+
+
diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c
index 1d504ae5..97ba2991 100644
--- a/pjlib/src/pjlib-test/test.c
+++ b/pjlib/src/pjlib-test/test.c
@@ -145,6 +145,10 @@ int test_inner(void)
DO_TEST( ioqueue_perf_test() );
#endif
+#if INCLUDE_IOQUEUE_UNREG_TEST
+ DO_TEST( udp_ioqueue_unreg_test() );
+#endif
+
#if INCLUDE_FILE_TEST
DO_TEST( file_test() );
#endif
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index 8571d9dd..7a0aeed2 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -48,6 +48,7 @@
#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK
+#define INCLUDE_IOQUEUE_UNREG_TEST GROUP_NETWORK
#define INCLUDE_FILE_TEST GROUP_FILE
#define INCLUDE_ECHO_SERVER 0
@@ -82,6 +83,7 @@ extern int sock_test(void);
extern int sock_perf_test(void);
extern int select_test(void);
extern int udp_ioqueue_test(void);
+extern int udp_ioqueue_unreg_test(void);
extern int tcp_ioqueue_test(void);
extern int ioqueue_perf_test(void);
extern int file_test(void);