summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_select.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r--pjlib/src/pj/ioqueue_select.c83
1 files changed, 71 insertions, 12 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 74b87d8..1b08d28 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -1,4 +1,4 @@
-/* $Id: ioqueue_select.c 3553 2011-05-05 06:14:19Z nanang $ */
+/* $Id: ioqueue_select.c 4359 2013-02-21 11:18:36Z bennylp $ */
/*
* Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
* Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
@@ -39,6 +39,7 @@
#include <pj/sock_select.h>
#include <pj/sock_qos.h>
#include <pj/errno.h>
+#include <pj/rand.h>
/* Now that we have access to OS'es <sys/select>, lets check again that
* PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
@@ -237,11 +238,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
key->ref_count = 0;
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock);
if (rc != PJ_SUCCESS) {
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
pj_mutex_destroy(ioqueue->ref_cnt_mutex);
@@ -284,19 +285,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
/* Destroy reference counters */
key = ioqueue->active_list.next;
while (key != &ioqueue->active_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->closing_list.next;
while (key != &ioqueue->closing_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
key = ioqueue->free_list.next;
while (key != &ioqueue->free_list) {
- pj_mutex_destroy(key->mutex);
+ pj_lock_destroy(key->lock);
key = key->next;
}
@@ -312,9 +313,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
*
* Register socket handle to ioqueue.
*/
-PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_sock_t sock,
+ pj_grp_lock_t *grp_lock,
void *user_data,
const pj_ioqueue_callback *cb,
pj_ioqueue_key_t **p_key)
@@ -358,7 +360,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
#endif
- rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
+ rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
@@ -386,12 +388,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
on_return:
/* On error, socket may be left in non-blocking mode. */
+ if (rc != PJ_SUCCESS) {
+ if (key->grp_lock)
+ pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0);
+ }
*p_key = key;
pj_lock_release(ioqueue->lock);
return rc;
}
+PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
+ pj_ioqueue_t *ioqueue,
+ pj_sock_t sock,
+ void *user_data,
+ const pj_ioqueue_callback *cb,
+ pj_ioqueue_key_t **p_key)
+{
+ return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data,
+ cb, p_key);
+}
+
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Increment key's reference counter */
static void increment_counter(pj_ioqueue_key_t *key)
@@ -446,7 +463,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
* the key. We need to lock the key before ioqueue here to prevent
* deadlock.
*/
- pj_mutex_lock(key->mutex);
+ pj_ioqueue_lock_key(key);
/* Also lock ioqueue */
pj_lock_acquire(ioqueue->lock);
@@ -485,9 +502,34 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
decrement_counter(key);
/* Done. */
- pj_mutex_unlock(key->mutex);
+ if (key->grp_lock) {
+ /* just dec_ref and unlock. we will set grp_lock to NULL
+ * elsewhere */
+ pj_grp_lock_t *grp_lock = key->grp_lock;
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //key->grp_lock = NULL;
+ pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
+ pj_grp_lock_release(grp_lock);
+ } else {
+ pj_ioqueue_unlock_key(key);
+ }
#else
- pj_mutex_destroy(key->mutex);
+ if (key->grp_lock) {
+ /* set grp_lock to NULL and unlock */
+ pj_grp_lock_t *grp_lock = key->grp_lock;
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //key->grp_lock = NULL;
+ pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0);
+ pj_grp_lock_release(grp_lock);
+ } else {
+ pj_ioqueue_unlock_key(key);
+ }
+
+ pj_lock_destroy(key->lock);
#endif
return PJ_SUCCESS;
@@ -620,6 +662,10 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue)
if (PJ_TIME_VAL_GTE(now, h->free_time)) {
pj_list_erase(h);
+ // Don't set grp_lock to NULL otherwise the other thread
+ // will crash. Just leave it as dangling pointer, but this
+ // should be safe
+ //h->grp_lock = NULL;
pj_list_push_back(&ioqueue->free_list, h);
}
h = next;
@@ -781,7 +827,7 @@ on_error:
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
- int count, counter;
+ int count, i, counter;
pj_ioqueue_key_t *h;
struct event
{
@@ -892,8 +938,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#endif
}
+ for (i=0; i<counter; ++i) {
+ if (event[i].key->grp_lock)
+ pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
+ }
+
+ PJ_RACE_ME(5);
+
pj_lock_release(ioqueue->lock);
+ PJ_RACE_ME(5);
+
count = counter;
/* Now process all events. The dispatch functions will take care
@@ -918,6 +973,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(event[counter].key);
#endif
+
+ if (event[counter].key->grp_lock)
+ pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock,
+ "ioqueue", 0);
}