diff options
Diffstat (limited to 'pjlib')
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 208 |
1 files changed, 204 insertions, 4 deletions
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index e91caba0..90dc3b53 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -175,6 +175,13 @@ struct pj_ioqueue_t int epfd; struct epoll_event *events; struct queue *queue; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + pj_mutex_t *ref_cnt_mutex; + pj_ioqueue_key_t active_list; + pj_ioqueue_key_t closing_list; + pj_ioqueue_key_t free_list; +#endif }; /* Include implementation for common abstraction after we declare @@ -182,6 +189,11 @@ struct pj_ioqueue_t */ #include "ioqueue_common_abs.c" +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan closing keys to be put to free list again */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue); +#endif + /* * pj_ioqueue_name() */ @@ -206,6 +218,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, pj_ioqueue_t *ioqueue; pj_status_t rc; pj_lock_t *lock; + int i; /* Check that arguments are valid. */ PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && @@ -223,6 +236,46 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, ioqueue->count = 0; pj_list_init(&ioqueue->hlist); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* When safe unregistration is used (the default), we pre-create + * all keys and put them in the free list. + */ + + /* Mutex to protect key's reference counter + * We don't want to use key's mutex or ioqueue's mutex because + * that would create deadlock situation in some cases. + */ + rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex); + if (rc != PJ_SUCCESS) + return rc; + + + /* Init key list */ + pj_list_init(&ioqueue->free_list); + pj_list_init(&ioqueue->closing_list); + + + /* Pre-create all keys according to max_fd */ + for ( i=0; i<max_fd; ++i) { + pj_ioqueue_key_t *key; + + key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t); + key->ref_count = 0; + rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + if (rc != PJ_SUCCESS) { + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + pj_mutex_destroy(ioqueue->ref_cnt_mutex); + return rc; + } + + pj_list_push_back(&ioqueue->free_list, key); + } +#endif + rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock); if (rc != PJ_SUCCESS) return rc; @@ -256,12 +309,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue) { + pj_ioqueue_key_t *key; + PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL); PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP); pj_lock_acquire(ioqueue->lock); os_close(ioqueue->epfd); ioqueue->epfd = 0; + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Destroy reference counters */ + key = ioqueue->active_list.next; + while (key != &ioqueue->active_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + key = ioqueue->closing_list.next; + while (key != &ioqueue->closing_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + key = ioqueue->free_list.next; + while (key != &ioqueue->free_list) { + pj_mutex_destroy(key->mutex); + key = key->next; + } + + pj_mutex_destroy(ioqueue->ref_cnt_mutex); +#endif return ioqueue_destroy(ioqueue); } @@ -303,8 +381,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, goto on_return; } + /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get + * the key from the free list. Otherwise allocate a new one. + */ +#if PJ_IOQUEUE_HAS_SAFE_UNREG + + /* Scan closing_keys first to let them come back to free_list */ + scan_closing_keys(ioqueue); + + pj_assert(!pj_list_empty(&ioqueue->free_list)); + if (pj_list_empty(&ioqueue->free_list)) { + rc = PJ_ETOOMANY; + goto on_return; + } + + key = ioqueue->free_list.next; + pj_list_erase(key); +#else /* Create key. */ key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t)); +#endif + rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb); if (rc != PJ_SUCCESS) { key = NULL; @@ -312,12 +409,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool, } /* Create key's mutex */ - rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); + /* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex); if (rc != PJ_SUCCESS) { key = NULL; goto on_return; } - +*/ /* os_epoll_ctl. */ ev.events = EPOLLIN | EPOLLERR; ev.epoll_data = (epoll_data_type)key; @@ -345,6 +442,41 @@ on_return: return rc; } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Increment key's reference counter */ +static void increment_counter(pj_ioqueue_key_t *key) +{ + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + ++key->ref_count; + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); +} + +/* Decrement the key's reference counter, and when the counter reach zero, + * destroy the key. + * + * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK. + */ +static void decrement_counter(pj_ioqueue_key_t *key) +{ + pj_lock_acquire(key->ioqueue->lock); + pj_mutex_lock(key->ioqueue->ref_cnt_mutex); + --key->ref_count; + if (key->ref_count == 0) { + + pj_assert(key->closing == 1); + pj_gettimeofday(&key->free_time); + key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY; + pj_time_val_normalize(&key->free_time); + + pj_list_erase(key); + pj_list_push_back(&key->ioqueue->closing_list, key); + + } + pj_mutex_unlock(key->ioqueue->ref_cnt_mutex); + pj_lock_release(key->ioqueue->lock); +} +#endif + /* * pj_ioqueue_unregister() * @@ -363,7 +495,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) pj_assert(ioqueue->count > 0); --ioqueue->count; +#if !PJ_IOQUEUE_HAS_SAFE_UNREG pj_list_erase(key); +#endif ev.events = 0; ev.epoll_data = (epoll_data_type)key; @@ -374,11 +508,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) return rc; } - pj_lock_release(ioqueue->lock); - /* Destroy the key. */ pj_sock_close(key->fd); + + pj_lock_release(ioqueue->lock); + + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Mark key is closing. */ + key->closing = 1; + + /* Decrement counter. */ + decrement_counter(key); + + /* Done. */ + pj_mutex_unlock(key->mutex); +#else pj_mutex_destroy(key->mutex); +#endif return PJ_SUCCESS; } @@ -420,6 +567,29 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue, } } +#if PJ_IOQUEUE_HAS_SAFE_UNREG +/* Scan closing keys to be put to free list again */ +static void scan_closing_keys(pj_ioqueue_t *ioqueue) +{ + pj_time_val now; + pj_ioqueue_key_t *h; + + pj_gettimeofday(&now); + h = ioqueue->closing_list.next; + while (h != &ioqueue->closing_list) { + pj_ioqueue_key_t *next = h->next; + + pj_assert(h->closing != 0); + + if (PJ_TIME_VAL_GTE(now, h->free_time)) { + pj_list_erase(h); + pj_list_push_back(&ioqueue->free_list, h); + } + h = next; + } +} +#endif + /* * pj_ioqueue_poll() * @@ -441,6 +611,16 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); if (count == 0) { +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Check the closing keys only when there's no activity and when there are + * pending closing keys. + */ + if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) { + pj_lock_acquire(ioqueue->lock); + scan_closing_keys(ioqueue); + pj_lock_release(ioqueue->lock); + } +#endif TRACE_((THIS_FILE, "os_epoll_wait timed out")); return count; } @@ -467,6 +647,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) */ if ((events[i].events & EPOLLIN) && (key_has_pending_read(h) || key_has_pending_accept(h))) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif queue[processed].key = h; queue[processed].event_type = READABLE_EVENT; ++processed; @@ -476,6 +660,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) * Check for writeability. */ if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif queue[processed].key = h; queue[processed].event_type = WRITEABLE_EVENT; ++processed; @@ -486,6 +674,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) * Check for completion of connect() operation. */ if ((events[i].events & EPOLLOUT) && (h->connecting)) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif queue[processed].key = h; queue[processed].event_type = WRITEABLE_EVENT; ++processed; @@ -496,6 +688,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) * Check for error condition. */ if (events[i].events & EPOLLERR && (h->connecting)) { + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + increment_counter(h); +#endif queue[processed].key = h; queue[processed].event_type = EXCEPTION_EVENT; ++processed; @@ -519,6 +715,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_assert(!"Invalid event!"); break; } + +#if PJ_IOQUEUE_HAS_SAFE_UNREG + decrement_counter(queue[i].key); +#endif } /* Special case: |