summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNanang Izzuddin <nanang@teluu.com>2015-11-06 04:18:46 +0000
committerNanang Izzuddin <nanang@teluu.com>2015-11-06 04:18:46 +0000
commitfa8856aca65908663819c607297271a6bf0500ea (patch)
treed13ac79b7d56e52c1420486431e1392f8cd2e293
parent25f41e95c2de6a280ba4159c73aa97d364c04db6 (diff)
Close #1894: Improve ioqueue performance on multithreadeded environment.
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@5194 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjlib/include/pj/ioqueue.h33
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c59
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c89
-rw-r--r--pjlib/src/pj/ioqueue_select.c95
4 files changed, 186 insertions, 90 deletions
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index 44ff2309..a8a33f7a 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -303,6 +303,28 @@ typedef enum pj_ioqueue_operation_e
# define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16)
#endif
+
+/**
+ * This macro specifies the maximum event candidates collected by each
+ * polling thread to be able to reach maximum number of processed events
+ * (i.e: PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) in each poll cycle.
+ * An event candidate will be dispatched to application as event unless
+ * it is already being dispatched by other polling thread. So in order to
+ * anticipate such race condition, each poll operation should collects its
+ * event candidates more than PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, the
+ * recommended value is (PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL *
+ * number of polling threads).
+ *
+ * The value is only meaningfull when specified during PJLIB build and
+ * is only effective on multiple polling threads environment.
+ */
+#if !defined(PJ_IOQUEUE_MAX_CAND_EVENTS) || \
+ PJ_IOQUEUE_MAX_CAND_EVENTS < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL
+# undef PJ_IOQUEUE_MAX_CAND_EVENTS
+# define PJ_IOQUEUE_MAX_CAND_EVENTS PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL
+#endif
+
+
/**
* When this flag is specified in ioqueue's recv() or send() operations,
* the ioqueue will always mark the operation as asynchronous.
@@ -504,6 +526,17 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key,
PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key);
/**
+ * Try to acquire the key's mutex. When the key's concurrency is disabled,
+ * application may call this function to synchronize its operation
+ * with the key's callback.
+ *
+ * @param key The key that was previously obtained from registration.
+ *
+ * @return PJ_SUCCESS on success or the appropriate error code.
+ */
+PJ_DECL(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key);
+
+/**
* Release the lock previously acquired with pj_ioqueue_lock_key().
*
* @param key The key that was previously obtained from registration.
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index cd13b70b..97fb12ba 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -195,14 +195,20 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
* Report occurence of an event in the key to be processed by the
* framework.
*/
-void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
+pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *h)
{
- /* Lock the key. */
- pj_ioqueue_lock_key(h);
+ pj_status_t rc;
+
+ /* Try lock the key. */
+ rc = pj_ioqueue_trylock_key(h);
+ if (rc != PJ_SUCCESS) {
+ return PJ_FALSE;
+ }
if (IS_CLOSING(h)) {
pj_ioqueue_unlock_key(h);
- return;
+ return PJ_TRUE;
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
@@ -417,19 +423,27 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
* able to process the event.
*/
pj_ioqueue_unlock_key(h);
+
+ return PJ_FALSE;
}
+
+ return PJ_TRUE;
}
-void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
+pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *h )
{
pj_status_t rc;
- /* Lock the key. */
- pj_ioqueue_lock_key(h);
+ /* Try lock the key. */
+ rc = pj_ioqueue_trylock_key(h);
+ if (rc != PJ_SUCCESS) {
+ return PJ_FALSE;
+ }
if (IS_CLOSING(h)) {
pj_ioqueue_unlock_key(h);
- return;
+ return PJ_TRUE;
}
# if PJ_HAS_TCP
@@ -604,16 +618,25 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
* able to process the event.
*/
pj_ioqueue_unlock_key(h);
+
+ return PJ_FALSE;
}
+
+ return PJ_TRUE;
}
-void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
- pj_ioqueue_key_t *h )
+pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
+ pj_ioqueue_key_t *h )
{
pj_bool_t has_lock;
+ pj_status_t rc;
- pj_ioqueue_lock_key(h);
+ /* Try lock the key. */
+ rc = pj_ioqueue_trylock_key(h);
+ if (rc != PJ_SUCCESS) {
+ return PJ_FALSE;
+ }
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
@@ -621,12 +644,12 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
* it has been processed by other thread.
*/
pj_ioqueue_unlock_key(h);
- return;
+ return PJ_TRUE;
}
if (IS_CLOSING(h)) {
pj_ioqueue_unlock_key(h);
- return;
+ return PJ_TRUE;
}
/* Clear operation. */
@@ -668,6 +691,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
if (has_lock) {
pj_ioqueue_unlock_key(h);
}
+
+ return PJ_TRUE;
}
/*
@@ -1324,6 +1349,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key)
return pj_lock_acquire(key->lock);
}
+PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key)
+{
+ if (key->grp_lock)
+ return pj_grp_lock_tryacquire(key->grp_lock);
+ else
+ return pj_lock_tryacquire(key->lock);
+}
+
PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key)
{
if (key->grp_lock)
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index adc2c310..b67a3d17 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -651,12 +651,13 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue)
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
- int i, count, processed;
+ int i, count, event_cnt, processed_cnt;
int msec;
//struct epoll_event *events = ioqueue->events;
//struct queue *queue = ioqueue->queue;
- struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
- struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+ enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
+ struct epoll_event events[MAX_EVENTS];
+ struct queue queue[MAX_EVENTS];
pj_timestamp t1, t2;
PJ_CHECK_STACK();
@@ -667,7 +668,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_get_timestamp(&t1);
//count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
- count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);
+ count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec);
if (count == 0) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
@@ -694,7 +695,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Lock ioqueue. */
pj_lock_acquire(ioqueue->lock);
- for (processed=0, i=0; i<count; ++i) {
+ for (event_cnt=0, i=0; i<count; ++i) {
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
events[i].epoll_data;
@@ -709,9 +710,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = READABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = READABLE_EVENT;
+ ++event_cnt;
continue;
}
@@ -723,9 +724,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = WRITEABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = WRITEABLE_EVENT;
+ ++event_cnt;
continue;
}
@@ -738,9 +739,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = WRITEABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = WRITEABLE_EVENT;
+ ++event_cnt;
continue;
}
#endif /* PJ_HAS_TCP */
@@ -758,21 +759,21 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = EXCEPTION_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = EXCEPTION_EVENT;
+ ++event_cnt;
} else if (key_has_pending_read(h) || key_has_pending_accept(h)) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = READABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = READABLE_EVENT;
+ ++event_cnt;
}
continue;
}
}
- for (i=0; i<processed; ++i) {
+ for (i=0; i<event_cnt; ++i) {
if (queue[i].key->grp_lock)
pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0);
}
@@ -783,22 +784,31 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
PJ_RACE_ME(5);
+ processed_cnt = 0;
+
/* Now process the events. */
- for (i=0; i<processed; ++i) {
- switch (queue[i].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, queue[i].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, queue[i].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
- break;
- case NO_EVENT:
- pj_assert(!"Invalid event!");
- break;
- }
+ for (i=0; i<event_cnt; ++i) {
+
+ /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
+ if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
+ switch (queue[i].event_type) {
+ case READABLE_EVENT:
+ if (ioqueue_dispatch_read_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case WRITEABLE_EVENT:
+ if (ioqueue_dispatch_write_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case EXCEPTION_EVENT:
+ if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case NO_EVENT:
+ pj_assert(!"Invalid event!");
+ break;
+ }
+ }
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(queue[i].key);
@@ -812,14 +822,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Special case:
* When epoll returns > 0 but no descriptors are actually set!
*/
- if (count > 0 && !processed && msec > 0) {
+ if (count > 0 && !event_cnt && msec > 0) {
pj_thread_sleep(msec);
}
+ TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d",
+ count, event_cnt, processed_cnt));
+
pj_get_timestamp(&t1);
TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
processed, pj_elapsed_usec(&t2, &t1)));
- return processed;
+ return processed_cnt;
}
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index b991f98b..a0c964b6 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -334,6 +334,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool,
PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
cb && p_key, PJ_EINVAL);
+ /* On platforms with fd_set containing fd bitmap such as *nix family,
+ * avoid potential memory corruption caused by select() when given
+ * an fd that is higher than FD_SETSIZE.
+ */
+ if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES)
+ return PJ_ETOOBIG;
+
pj_lock_acquire(ioqueue->lock);
if (ioqueue->count >= ioqueue->max) {
@@ -831,13 +838,14 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
pj_fd_set_t rfdset, wfdset, xfdset;
int nfds;
- int count, i, counter;
+ int i, count, event_cnt, processed_cnt;
pj_ioqueue_key_t *h;
+ enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
struct event
{
pj_ioqueue_key_t *key;
enum ioqueue_event_type event_type;
- } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+ } event[MAX_EVENTS];
PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
@@ -889,8 +897,6 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
return 0;
else if (count < 0)
return -pj_get_netos_error();
- else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
- count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
/* Scan descriptor sets for event and add the events in the event
* array to be processed later in this function. We do this so that
@@ -898,13 +904,15 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
*/
pj_lock_acquire(ioqueue->lock);
- counter = 0;
+ event_cnt = 0;
/* Scan for writable sockets first to handle piggy-back data
* coming with accept().
*/
- h = ioqueue->active_list.next;
- for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
+ for (h = ioqueue->active_list.next;
+ h != &ioqueue->active_list && event_cnt < MAX_EVENTS;
+ h = h->next)
+ {
if ( (key_has_pending_write(h) || key_has_pending_connect(h))
&& PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
@@ -912,39 +920,39 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- event[counter].key = h;
- event[counter].event_type = WRITEABLE_EVENT;
- ++counter;
+ event[event_cnt].key = h;
+ event[event_cnt].event_type = WRITEABLE_EVENT;
+ ++event_cnt;
}
/* Scan for readable socket. */
if ((key_has_pending_read(h) || key_has_pending_accept(h))
&& PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
- counter<count)
+ event_cnt < MAX_EVENTS)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- event[counter].key = h;
- event[counter].event_type = READABLE_EVENT;
- ++counter;
+ event[event_cnt].key = h;
+ event[event_cnt].event_type = READABLE_EVENT;
+ ++event_cnt;
}
#if PJ_HAS_TCP
if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
- !IS_CLOSING(h) && counter<count)
+ !IS_CLOSING(h) && event_cnt < MAX_EVENTS)
{
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- event[counter].key = h;
- event[counter].event_type = EXCEPTION_EVENT;
- ++counter;
+ event[event_cnt].key = h;
+ event[event_cnt].event_type = EXCEPTION_EVENT;
+ ++event_cnt;
}
#endif
}
- for (i=0; i<counter; ++i) {
+ for (i=0; i<event_cnt; ++i) {
if (event[i].key->grp_lock)
pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0);
}
@@ -955,37 +963,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
PJ_RACE_ME(5);
- count = counter;
+ processed_cnt = 0;
/* Now process all events. The dispatch functions will take care
* of locking in each of the key
*/
- for (counter=0; counter<count; ++counter) {
- switch (event[counter].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, event[counter].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, event[counter].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
- break;
- case NO_EVENT:
- pj_assert(!"Invalid event!");
- break;
- }
+ for (i=0; i<event_cnt; ++i) {
+
+ /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
+ if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
+ switch (event[i].event_type) {
+ case READABLE_EVENT:
+ if (ioqueue_dispatch_read_event(ioqueue, event[i].key))
+ ++processed_cnt;
+ break;
+ case WRITEABLE_EVENT:
+ if (ioqueue_dispatch_write_event(ioqueue, event[i].key))
+ ++processed_cnt;
+ break;
+ case EXCEPTION_EVENT:
+ if (ioqueue_dispatch_exception_event(ioqueue, event[i].key))
+ ++processed_cnt;
+ break;
+ case NO_EVENT:
+ pj_assert(!"Invalid event!");
+ break;
+ }
+ }
#if PJ_IOQUEUE_HAS_SAFE_UNREG
- decrement_counter(event[counter].key);
+ decrement_counter(event[i].key);
#endif
- if (event[counter].key->grp_lock)
- pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock,
+ if (event[i].key->grp_lock)
+ pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock,
"ioqueue", 0);
}
+ TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d",
+ count, event_cnt, processed_cnt));
- return count;
+ return processed_cnt;
}