diff options
author | Nanang Izzuddin <nanang@teluu.com> | 2015-11-06 04:18:46 +0000 |
---|---|---|
committer | Nanang Izzuddin <nanang@teluu.com> | 2015-11-06 04:18:46 +0000 |
commit | fa8856aca65908663819c607297271a6bf0500ea (patch) | |
tree | d13ac79b7d56e52c1420486431e1392f8cd2e293 /pjlib/src/pj/ioqueue_epoll.c | |
parent | 25f41e95c2de6a280ba4159c73aa97d364c04db6 (diff) |
Close #1894: Improve ioqueue performance on multithreadeded environment.
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@5194 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/ioqueue_epoll.c')
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 89 |
1 files changed, 51 insertions, 38 deletions
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index adc2c310..b67a3d17 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -651,12 +651,13 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue) */ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { - int i, count, processed; + int i, count, event_cnt, processed_cnt; int msec; //struct epoll_event *events = ioqueue->events; //struct queue *queue = ioqueue->queue; - struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; - struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; + struct epoll_event events[MAX_EVENTS]; + struct queue queue[MAX_EVENTS]; pj_timestamp t1, t2; PJ_CHECK_STACK(); @@ -667,7 +668,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_get_timestamp(&t1); //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); - count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec); + count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); if (count == 0) { #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are @@ -694,7 +695,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Lock ioqueue. */ pj_lock_acquire(ioqueue->lock); - for (processed=0, i=0; i<count; ++i) { + for (event_cnt=0, i=0; i<count; ++i) { pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; @@ -709,9 +710,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = READABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; continue; } @@ -723,9 +724,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = WRITEABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; continue; } @@ -738,9 +739,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = WRITEABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; continue; } #endif /* PJ_HAS_TCP */ @@ -758,21 +759,21 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = EXCEPTION_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = EXCEPTION_EVENT; + ++event_cnt; } else if (key_has_pending_read(h) || key_has_pending_accept(h)) { #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = READABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; } continue; } } - for (i=0; i<processed; ++i) { + for (i=0; i<event_cnt; ++i) { if (queue[i].key->grp_lock) pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); } @@ -783,22 +784,31 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_RACE_ME(5); + processed_cnt = 0; + /* Now process the events. */ - for (i=0; i<processed; ++i) { - switch (queue[i].event_type) { - case READABLE_EVENT: - ioqueue_dispatch_read_event(ioqueue, queue[i].key); - break; - case WRITEABLE_EVENT: - ioqueue_dispatch_write_event(ioqueue, queue[i].key); - break; - case EXCEPTION_EVENT: - ioqueue_dispatch_exception_event(ioqueue, queue[i].key); - break; - case NO_EVENT: - pj_assert(!"Invalid event!"); - break; - } + for (i=0; i<event_cnt; ++i) { + + /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ + if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { + switch (queue[i].event_type) { + case READABLE_EVENT: + if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case WRITEABLE_EVENT: + if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case EXCEPTION_EVENT: + if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(queue[i].key); @@ -812,14 +822,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Special case: * When epoll returns > 0 but no descriptors are actually set! */ - if (count > 0 && !processed && msec > 0) { + if (count > 0 && !event_cnt && msec > 0) { pj_thread_sleep(msec); } + TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d", + count, event_cnt, processed_cnt)); + pj_get_timestamp(&t1); TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec", processed, pj_elapsed_usec(&t2, &t1))); - return processed; + return processed_cnt; } |