summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-09-18 21:22:16 +0000
committerBenny Prijono <bennylp@teluu.com>2008-09-18 21:22:16 +0000
commit1ae2ddfa0c18f4f22cfd7d06d8e7e6ab2622b8c7 (patch)
tree94d607e80e0e6bdd3a6ac4185e2b34f64088fae2 /pjlib
parente7130aeee4d10c1da69568d32a78784f412adc3f (diff)
Ticket #622: initial integration of ioqueue_epoll patch, updated the configure script
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2295 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c208
1 files changed, 204 insertions, 4 deletions
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index e91caba0..90dc3b53 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -175,6 +175,13 @@ struct pj_ioqueue_t
int epfd;
struct epoll_event *events;
struct queue *queue;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ pj_mutex_t *ref_cnt_mutex;
+ pj_ioqueue_key_t active_list;
+ pj_ioqueue_key_t closing_list;
+ pj_ioqueue_key_t free_list;
+#endif
};
/* Include implementation for common abstraction after we declare
@@ -182,6 +189,11 @@ struct pj_ioqueue_t
*/
#include "ioqueue_common_abs.c"
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan closing keys to be put to free list again */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue);
+#endif
+
/*
* pj_ioqueue_name()
*/
@@ -206,6 +218,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
pj_ioqueue_t *ioqueue;
pj_status_t rc;
pj_lock_t *lock;
+ int i;
/* Check that arguments are valid. */
PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL &&
@@ -223,6 +236,46 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
ioqueue->count = 0;
pj_list_init(&ioqueue->hlist);
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* When safe unregistration is used (the default), we pre-create
+ * all keys and put them in the free list.
+ */
+
+ /* Mutex to protect key's reference counter
+ * We don't want to use key's mutex or ioqueue's mutex because
+ * that would create deadlock situation in some cases.
+ */
+ rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
+ if (rc != PJ_SUCCESS)
+ return rc;
+
+
+ /* Init key list */
+ pj_list_init(&ioqueue->free_list);
+ pj_list_init(&ioqueue->closing_list);
+
+
+ /* Pre-create all keys according to max_fd */
+ for ( i=0; i<max_fd; ++i) {
+ pj_ioqueue_key_t *key;
+
+ key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
+ key->ref_count = 0;
+ rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ if (rc != PJ_SUCCESS) {
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+ pj_mutex_destroy(ioqueue->ref_cnt_mutex);
+ return rc;
+ }
+
+ pj_list_push_back(&ioqueue->free_list, key);
+ }
+#endif
+
rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
if (rc != PJ_SUCCESS)
return rc;
@@ -256,12 +309,37 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
+ pj_ioqueue_key_t *key;
+
PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
PJ_ASSERT_RETURN(ioqueue->epfd > 0, PJ_EINVALIDOP);
pj_lock_acquire(ioqueue->lock);
os_close(ioqueue->epfd);
ioqueue->epfd = 0;
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Destroy reference counters */
+ key = ioqueue->active_list.next;
+ while (key != &ioqueue->active_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->closing_list.next;
+ while (key != &ioqueue->closing_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ key = ioqueue->free_list.next;
+ while (key != &ioqueue->free_list) {
+ pj_mutex_destroy(key->mutex);
+ key = key->next;
+ }
+
+ pj_mutex_destroy(ioqueue->ref_cnt_mutex);
+#endif
return ioqueue_destroy(ioqueue);
}
@@ -303,8 +381,27 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
goto on_return;
}
+ /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
+ * the key from the free list. Otherwise allocate a new one.
+ */
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+
+ /* Scan closing_keys first to let them come back to free_list */
+ scan_closing_keys(ioqueue);
+
+ pj_assert(!pj_list_empty(&ioqueue->free_list));
+ if (pj_list_empty(&ioqueue->free_list)) {
+ rc = PJ_ETOOMANY;
+ goto on_return;
+ }
+
+ key = ioqueue->free_list.next;
+ pj_list_erase(key);
+#else
/* Create key. */
key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
+#endif
+
rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
if (rc != PJ_SUCCESS) {
key = NULL;
@@ -312,12 +409,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* Create key's mutex */
- rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
+ /* rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
if (rc != PJ_SUCCESS) {
key = NULL;
goto on_return;
}
-
+*/
/* os_epoll_ctl. */
ev.events = EPOLLIN | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
@@ -345,6 +442,41 @@ on_return:
return rc;
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Increment key's reference counter */
+static void increment_counter(pj_ioqueue_key_t *key)
+{
+ pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
+ ++key->ref_count;
+ pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
+}
+
+/* Decrement the key's reference counter, and when the counter reach zero,
+ * destroy the key.
+ *
+ * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
+ */
+static void decrement_counter(pj_ioqueue_key_t *key)
+{
+ pj_lock_acquire(key->ioqueue->lock);
+ pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
+ --key->ref_count;
+ if (key->ref_count == 0) {
+
+ pj_assert(key->closing == 1);
+ pj_gettimeofday(&key->free_time);
+ key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
+ pj_time_val_normalize(&key->free_time);
+
+ pj_list_erase(key);
+ pj_list_push_back(&key->ioqueue->closing_list, key);
+
+ }
+ pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
+ pj_lock_release(key->ioqueue->lock);
+}
+#endif
+
/*
* pj_ioqueue_unregister()
*
@@ -363,7 +495,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
pj_assert(ioqueue->count > 0);
--ioqueue->count;
+#if !PJ_IOQUEUE_HAS_SAFE_UNREG
pj_list_erase(key);
+#endif
ev.events = 0;
ev.epoll_data = (epoll_data_type)key;
@@ -374,11 +508,24 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
return rc;
}
- pj_lock_release(ioqueue->lock);
-
/* Destroy the key. */
pj_sock_close(key->fd);
+
+ pj_lock_release(ioqueue->lock);
+
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Mark key is closing. */
+ key->closing = 1;
+
+ /* Decrement counter. */
+ decrement_counter(key);
+
+ /* Done. */
+ pj_mutex_unlock(key->mutex);
+#else
pj_mutex_destroy(key->mutex);
+#endif
return PJ_SUCCESS;
}
@@ -420,6 +567,29 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
}
}
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+/* Scan closing keys to be put to free list again */
+static void scan_closing_keys(pj_ioqueue_t *ioqueue)
+{
+ pj_time_val now;
+ pj_ioqueue_key_t *h;
+
+ pj_gettimeofday(&now);
+ h = ioqueue->closing_list.next;
+ while (h != &ioqueue->closing_list) {
+ pj_ioqueue_key_t *next = h->next;
+
+ pj_assert(h->closing != 0);
+
+ if (PJ_TIME_VAL_GTE(now, h->free_time)) {
+ pj_list_erase(h);
+ pj_list_push_back(&ioqueue->free_list, h);
+ }
+ h = next;
+ }
+}
+#endif
+
/*
* pj_ioqueue_poll()
*
@@ -441,6 +611,16 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
if (count == 0) {
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ /* Check the closing keys only when there's no activity and when there are
+ * pending closing keys.
+ */
+ if (count == 0 && !pj_list_empty(&ioqueue->closing_list)) {
+ pj_lock_acquire(ioqueue->lock);
+ scan_closing_keys(ioqueue);
+ pj_lock_release(ioqueue->lock);
+ }
+#endif
TRACE_((THIS_FILE, "os_epoll_wait timed out"));
return count;
}
@@ -467,6 +647,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
*/
if ((events[i].events & EPOLLIN) &&
(key_has_pending_read(h) || key_has_pending_accept(h))) {
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
queue[processed].key = h;
queue[processed].event_type = READABLE_EVENT;
++processed;
@@ -476,6 +660,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for writeability.
*/
if ((events[i].events & EPOLLOUT) && key_has_pending_write(h)) {
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
queue[processed].key = h;
queue[processed].event_type = WRITEABLE_EVENT;
++processed;
@@ -486,6 +674,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for completion of connect() operation.
*/
if ((events[i].events & EPOLLOUT) && (h->connecting)) {
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
queue[processed].key = h;
queue[processed].event_type = WRITEABLE_EVENT;
++processed;
@@ -496,6 +688,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
* Check for error condition.
*/
if (events[i].events & EPOLLERR && (h->connecting)) {
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ increment_counter(h);
+#endif
queue[processed].key = h;
queue[processed].event_type = EXCEPTION_EVENT;
++processed;
@@ -519,6 +715,10 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_assert(!"Invalid event!");
break;
}
+
+#if PJ_IOQUEUE_HAS_SAFE_UNREG
+ decrement_counter(queue[i].key);
+#endif
}
/* Special case: