diff options
Diffstat (limited to 'pjlib/src/pj/ioqueue_select.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 81 |
1 files changed, 70 insertions, 11 deletions
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 603b7ee8..0d97c0a6 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -39,6 +39,7 @@ #include <pj/sock_select.h> #include <pj/sock_qos.h> #include <pj/errno.h> +#include <pj/rand.h> /* Now that we have access to OS'es <sys/select>, lets check again that * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE @@ -237,11 +238,11 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); key->ref_count = 0; - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + rc = pj_lock_create_recursive_mutex(pool, NULL, &key->lock); if (rc != PJ_SUCCESS) { key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } pj_mutex_destroy(ioqueue->ref_cnt_mutex); @@ -284,19 +285,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) /* Destroy reference counters */ key = ioqueue->active_list.next; while (key != &ioqueue->active_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->closing_list.next; while (key != &ioqueue->closing_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } key = ioqueue->free_list.next; while (key != &ioqueue->free_list) { - pj_mutex_destroy(key->mutex); + pj_lock_destroy(key->lock); key = key->next; } @@ -312,9 +313,10 @@ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) * * Register socket handle to ioqueue. */ -PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, +PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_sock_t sock, + pj_grp_lock_t *grp_lock, void *user_data, const pj_ioqueue_callback *cb, pj_ioqueue_key_t **p_key) @@ -358,7 +360,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); #endif - rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); + rc = ioqueue_init_key(pool, ioqueue, key, sock, grp_lock, user_data, cb); if (rc != PJ_SUCCESS) { key = NULL; goto on_return; @@ -386,12 +388,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, on_return: /* On error, socket may be left in non-blocking mode. */ + if (rc != PJ_SUCCESS) { + if (key->grp_lock) + pj_grp_lock_dec_ref_dbg(key->grp_lock, "ioqueue", 0); + } *p_key = key; pj_lock_release(ioqueue->lock); return rc; } +PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, + pj_ioqueue_t *ioqueue, + pj_sock_t sock, + void *user_data, + const pj_ioqueue_callback *cb, + pj_ioqueue_key_t **p_key) +{ + return pj_ioqueue_register_sock2(pool, ioqueue, sock, NULL, user_data, + cb, p_key); +} + #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Increment key's reference counter */ static void increment_counter(pj_ioqueue_key_t *key) @@ -446,7 +463,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) * the key. We need to lock the key before ioqueue here to prevent * deadlock. */ - pj_mutex_lock(key->mutex); + pj_ioqueue_lock_key(key); /* Also lock ioqueue */ pj_lock_acquire(ioqueue->lock); @@ -485,9 +502,34 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) decrement_counter(key); /* Done. */ - pj_mutex_unlock(key->mutex); + if (key->grp_lock) { + /* just dec_ref and unlock. we will set grp_lock to NULL + * elsewhere */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } #else - pj_mutex_destroy(key->mutex); + if (key->grp_lock) { + /* set grp_lock to NULL and unlock */ + pj_grp_lock_t *grp_lock = key->grp_lock; + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //key->grp_lock = NULL; + pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); + pj_grp_lock_release(grp_lock); + } else { + pj_ioqueue_unlock_key(key); + } + + pj_lock_destroy(key->lock); #endif return PJ_SUCCESS; @@ -620,6 +662,10 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue) if (PJ_TIME_VAL_GTE(now, h->free_time)) { pj_list_erase(h); + // Don't set grp_lock to NULL otherwise the other thread + // will crash. Just leave it as dangling pointer, but this + // should be safe + //h->grp_lock = NULL; pj_list_push_back(&ioqueue->free_list, h); } h = next; @@ -781,7 +827,7 @@ on_error: PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; - int count, counter; + int count, i, counter; pj_ioqueue_key_t *h; struct event { @@ -892,8 +938,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #endif } + for (i=0; i<counter; ++i) { + if (event[i].key->grp_lock) + pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); + } + + PJ_RACE_ME(5); + pj_lock_release(ioqueue->lock); + PJ_RACE_ME(5); + count = counter; /* Now process all events. The dispatch functions will take care @@ -918,6 +973,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(event[counter].key); #endif + + if (event[counter].key->grp_lock) + pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, + "ioqueue", 0); } |