diff options
author | Nanang Izzuddin <nanang@teluu.com> | 2015-11-06 04:18:46 +0000 |
---|---|---|
committer | Nanang Izzuddin <nanang@teluu.com> | 2015-11-06 04:18:46 +0000 |
commit | fa8856aca65908663819c607297271a6bf0500ea (patch) | |
tree | d13ac79b7d56e52c1420486431e1392f8cd2e293 /pjlib/src | |
parent | 25f41e95c2de6a280ba4159c73aa97d364c04db6 (diff) |
Close #1894: Improve ioqueue performance on multithreadeded environment.
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@5194 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src')
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 59 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_epoll.c | 89 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_select.c | 95 |
3 files changed, 153 insertions, 90 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index cd13b70b..97fb12ba 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -195,14 +195,20 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) * Report occurence of an event in the key to be processed by the * framework. */ -void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h) { - /* Lock the key. */ - pj_ioqueue_lock_key(h); + pj_status_t rc; + + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 @@ -417,19 +423,27 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * able to process the event. */ pj_ioqueue_unlock_key(h); + + return PJ_FALSE; } + + return PJ_TRUE; } -void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) { pj_status_t rc; - /* Lock the key. */ - pj_ioqueue_lock_key(h); + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } # if PJ_HAS_TCP @@ -604,16 +618,25 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * able to process the event. */ pj_ioqueue_unlock_key(h); + + return PJ_FALSE; } + + return PJ_TRUE; } -void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *h ) +pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) { pj_bool_t has_lock; + pj_status_t rc; - pj_ioqueue_lock_key(h); + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (!h->connecting) { /* It is possible that more than one thread was woken up, thus @@ -621,12 +644,12 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, * it has been processed by other thread. */ pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } /* Clear operation. */ @@ -668,6 +691,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, if (has_lock) { pj_ioqueue_unlock_key(h); } + + return PJ_TRUE; } /* @@ -1324,6 +1349,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) return pj_lock_acquire(key->lock); } +PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key) +{ + if (key->grp_lock) + return pj_grp_lock_tryacquire(key->grp_lock); + else + return pj_lock_tryacquire(key->lock); +} + PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) { if (key->grp_lock) diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index adc2c310..b67a3d17 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -651,12 +651,13 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue) */ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { - int i, count, processed; + int i, count, event_cnt, processed_cnt; int msec; //struct epoll_event *events = ioqueue->events; //struct queue *queue = ioqueue->queue; - struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; - struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; + struct epoll_event events[MAX_EVENTS]; + struct queue queue[MAX_EVENTS]; pj_timestamp t1, t2; PJ_CHECK_STACK(); @@ -667,7 +668,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_get_timestamp(&t1); //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); - count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec); + count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); if (count == 0) { #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are @@ -694,7 +695,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Lock ioqueue. */ pj_lock_acquire(ioqueue->lock); - for (processed=0, i=0; i<count; ++i) { + for (event_cnt=0, i=0; i<count; ++i) { pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type) events[i].epoll_data; @@ -709,9 +710,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = READABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; continue; } @@ -723,9 +724,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = WRITEABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; continue; } @@ -738,9 +739,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = WRITEABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; continue; } #endif /* PJ_HAS_TCP */ @@ -758,21 +759,21 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = EXCEPTION_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = EXCEPTION_EVENT; + ++event_cnt; } else if (key_has_pending_read(h) || key_has_pending_accept(h)) { #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - queue[processed].key = h; - queue[processed].event_type = READABLE_EVENT; - ++processed; + queue[event_cnt].key = h; + queue[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; } continue; } } - for (i=0; i<processed; ++i) { + for (i=0; i<event_cnt; ++i) { if (queue[i].key->grp_lock) pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); } @@ -783,22 +784,31 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_RACE_ME(5); + processed_cnt = 0; + /* Now process the events. */ - for (i=0; i<processed; ++i) { - switch (queue[i].event_type) { - case READABLE_EVENT: - ioqueue_dispatch_read_event(ioqueue, queue[i].key); - break; - case WRITEABLE_EVENT: - ioqueue_dispatch_write_event(ioqueue, queue[i].key); - break; - case EXCEPTION_EVENT: - ioqueue_dispatch_exception_event(ioqueue, queue[i].key); - break; - case NO_EVENT: - pj_assert(!"Invalid event!"); - break; - } + for (i=0; i<event_cnt; ++i) { + + /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ + if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { + switch (queue[i].event_type) { + case READABLE_EVENT: + if (ioqueue_dispatch_read_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case WRITEABLE_EVENT: + if (ioqueue_dispatch_write_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case EXCEPTION_EVENT: + if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key)) + ++processed_cnt; + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } #if PJ_IOQUEUE_HAS_SAFE_UNREG decrement_counter(queue[i].key); @@ -812,14 +822,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Special case: * When epoll returns > 0 but no descriptors are actually set! */ - if (count > 0 && !processed && msec > 0) { + if (count > 0 && !event_cnt && msec > 0) { pj_thread_sleep(msec); } + TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d", + count, event_cnt, processed_cnt)); + pj_get_timestamp(&t1); TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec", processed, pj_elapsed_usec(&t2, &t1))); - return processed; + return processed_cnt; } diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index b991f98b..a0c964b6 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -334,6 +334,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); + /* On platforms with fd_set containing fd bitmap such as *nix family, + * avoid potential memory corruption caused by select() when given + * an fd that is higher than FD_SETSIZE. + */ + if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES) + return PJ_ETOOBIG; + pj_lock_acquire(ioqueue->lock); if (ioqueue->count >= ioqueue->max) { @@ -831,13 +838,14 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; int nfds; - int count, i, counter; + int i, count, event_cnt, processed_cnt; pj_ioqueue_key_t *h; + enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; struct event { pj_ioqueue_key_t *key; enum ioqueue_event_type event_type; - } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + } event[MAX_EVENTS]; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); @@ -889,8 +897,6 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) return 0; else if (count < 0) return -pj_get_netos_error(); - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) - count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; /* Scan descriptor sets for event and add the events in the event * array to be processed later in this function. We do this so that @@ -898,13 +904,15 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) */ pj_lock_acquire(ioqueue->lock); - counter = 0; + event_cnt = 0; /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ - h = ioqueue->active_list.next; - for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) { + for (h = ioqueue->active_list.next; + h != &ioqueue->active_list && event_cnt < MAX_EVENTS; + h = h->next) + { if ( (key_has_pending_write(h) || key_has_pending_connect(h)) && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h)) @@ -912,39 +920,39 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - event[counter].key = h; - event[counter].event_type = WRITEABLE_EVENT; - ++counter; + event[event_cnt].key = h; + event[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; } /* Scan for readable socket. */ if ((key_has_pending_read(h) || key_has_pending_accept(h)) && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && - counter<count) + event_cnt < MAX_EVENTS) { #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - event[counter].key = h; - event[counter].event_type = READABLE_EVENT; - ++counter; + event[event_cnt].key = h; + event[event_cnt].event_type = READABLE_EVENT; + ++event_cnt; } #if PJ_HAS_TCP if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) && - !IS_CLOSING(h) && counter<count) + !IS_CLOSING(h) && event_cnt < MAX_EVENTS) { #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - event[counter].key = h; - event[counter].event_type = EXCEPTION_EVENT; - ++counter; + event[event_cnt].key = h; + event[event_cnt].event_type = EXCEPTION_EVENT; + ++event_cnt; } #endif } - for (i=0; i<counter; ++i) { + for (i=0; i<event_cnt; ++i) { if (event[i].key->grp_lock) pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); } @@ -955,37 +963,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_RACE_ME(5); - count = counter; + processed_cnt = 0; /* Now process all events. The dispatch functions will take care * of locking in each of the key */ - for (counter=0; counter<count; ++counter) { - switch (event[counter].event_type) { - case READABLE_EVENT: - ioqueue_dispatch_read_event(ioqueue, event[counter].key); - break; - case WRITEABLE_EVENT: - ioqueue_dispatch_write_event(ioqueue, event[counter].key); - break; - case EXCEPTION_EVENT: - ioqueue_dispatch_exception_event(ioqueue, event[counter].key); - break; - case NO_EVENT: - pj_assert(!"Invalid event!"); - break; - } + for (i=0; i<event_cnt; ++i) { + + /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */ + if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) { + switch (event[i].event_type) { + case READABLE_EVENT: + if (ioqueue_dispatch_read_event(ioqueue, event[i].key)) + ++processed_cnt; + break; + case WRITEABLE_EVENT: + if (ioqueue_dispatch_write_event(ioqueue, event[i].key)) + ++processed_cnt; + break; + case EXCEPTION_EVENT: + if (ioqueue_dispatch_exception_event(ioqueue, event[i].key)) + ++processed_cnt; + break; + case NO_EVENT: + pj_assert(!"Invalid event!"); + break; + } + } #if PJ_IOQUEUE_HAS_SAFE_UNREG - decrement_counter(event[counter].key); + decrement_counter(event[i].key); #endif - if (event[counter].key->grp_lock) - pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, + if (event[i].key->grp_lock) + pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); } + TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d", + count, event_cnt, processed_cnt)); - return count; + return processed_cnt; } |