From fa8856aca65908663819c607297271a6bf0500ea Mon Sep 17 00:00:00 2001 From: Nanang Izzuddin Date: Fri, 6 Nov 2015 04:18:46 +0000 Subject: Close #1894: Improve ioqueue performance on multithreadeded environment. git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@5194 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/include/pj/ioqueue.h | 33 ++++++++++++++ pjlib/src/pj/ioqueue_common_abs.c | 59 ++++++++++++++++++------ pjlib/src/pj/ioqueue_epoll.c | 89 ++++++++++++++++++++---------------- pjlib/src/pj/ioqueue_select.c | 95 +++++++++++++++++++++++---------------- 4 files changed, 186 insertions(+), 90 deletions(-) (limited to 'pjlib') diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index 44ff2309..a8a33f7a 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -303,6 +303,28 @@ typedef enum pj_ioqueue_operation_e # define PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL (16) #endif + +/** + * This macro specifies the maximum event candidates collected by each + * polling thread to be able to reach maximum number of processed events + * (i.e: PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) in each poll cycle. + * An event candidate will be dispatched to application as event unless + * it is already being dispatched by other polling thread. So in order to + * anticipate such race condition, each poll operation should collects its + * event candidates more than PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, the + * recommended value is (PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL * + * number of polling threads). + * + * The value is only meaningfull when specified during PJLIB build and + * is only effective on multiple polling threads environment. + */ +#if !defined(PJ_IOQUEUE_MAX_CAND_EVENTS) || \ + PJ_IOQUEUE_MAX_CAND_EVENTS < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL +# undef PJ_IOQUEUE_MAX_CAND_EVENTS +# define PJ_IOQUEUE_MAX_CAND_EVENTS PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL +#endif + + /** * When this flag is specified in ioqueue's recv() or send() operations, * the ioqueue will always mark the operation as asynchronous. @@ -503,6 +525,17 @@ PJ_DECL(pj_status_t) pj_ioqueue_set_concurrency(pj_ioqueue_key_t *key, */ PJ_DECL(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key); +/** + * Try to acquire the key's mutex. When the key's concurrency is disabled, + * application may call this function to synchronize its operation + * with the key's callback. + * + * @param key The key that was previously obtained from registration. + * + * @return PJ_SUCCESS on success or the appropriate error code. + */ +PJ_DECL(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key); + /** * Release the lock previously acquired with pj_ioqueue_lock_key(). * diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index cd13b70b..97fb12ba 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -195,14 +195,20 @@ PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key) * Report occurence of an event in the key to be processed by the * framework. */ -void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) +pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h) { - /* Lock the key. */ - pj_ioqueue_lock_key(h); + pj_status_t rc; + + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 @@ -417,19 +423,27 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) * able to process the event. */ pj_ioqueue_unlock_key(h); + + return PJ_FALSE; } + + return PJ_TRUE; } -void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) +pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) { pj_status_t rc; - /* Lock the key. */ - pj_ioqueue_lock_key(h); + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } # if PJ_HAS_TCP @@ -604,16 +618,25 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) * able to process the event. */ pj_ioqueue_unlock_key(h); + + return PJ_FALSE; } + + return PJ_TRUE; } -void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, - pj_ioqueue_key_t *h ) +pj_bool_t ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, + pj_ioqueue_key_t *h ) { pj_bool_t has_lock; + pj_status_t rc; - pj_ioqueue_lock_key(h); + /* Try lock the key. */ + rc = pj_ioqueue_trylock_key(h); + if (rc != PJ_SUCCESS) { + return PJ_FALSE; + } if (!h->connecting) { /* It is possible that more than one thread was woken up, thus @@ -621,12 +644,12 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, * it has been processed by other thread. */ pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } if (IS_CLOSING(h)) { pj_ioqueue_unlock_key(h); - return; + return PJ_TRUE; } /* Clear operation. */ @@ -668,6 +691,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue, if (has_lock) { pj_ioqueue_unlock_key(h); } + + return PJ_TRUE; } /* @@ -1324,6 +1349,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_lock_key(pj_ioqueue_key_t *key) return pj_lock_acquire(key->lock); } +PJ_DEF(pj_status_t) pj_ioqueue_trylock_key(pj_ioqueue_key_t *key) +{ + if (key->grp_lock) + return pj_grp_lock_tryacquire(key->grp_lock); + else + return pj_lock_tryacquire(key->lock); +} + PJ_DEF(pj_status_t) pj_ioqueue_unlock_key(pj_ioqueue_key_t *key) { if (key->grp_lock) diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index adc2c310..b67a3d17 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -651,12 +651,13 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue) */ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { - int i, count, processed; + int i, count, event_cnt, processed_cnt; int msec; //struct epoll_event *events = ioqueue->events; //struct queue *queue = ioqueue->queue; - struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; - struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; + struct epoll_event events[MAX_EVENTS]; + struct queue queue[MAX_EVENTS]; pj_timestamp t1, t2; PJ_CHECK_STACK(); @@ -667,7 +668,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) pj_get_timestamp(&t1); //count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec); - count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec); + count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec); if (count == 0) { #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Check the closing keys only when there's no activity and when there are @@ -694,7 +695,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) /* Lock ioqueue. */ pj_lock_acquire(ioqueue->lock); - for (processed=0, i=0; igrp_lock) pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0); } @@ -783,22 +784,31 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_RACE_ME(5); + processed_cnt = 0; + /* Now process the events. */ - for (i=0; i 0 but no descriptors are actually set! */ - if (count > 0 && !processed && msec > 0) { + if (count > 0 && !event_cnt && msec > 0) { pj_thread_sleep(msec); } + TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d", + count, event_cnt, processed_cnt)); + pj_get_timestamp(&t1); TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec", processed, pj_elapsed_usec(&t2, &t1))); - return processed; + return processed_cnt; } diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index b991f98b..a0c964b6 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -334,6 +334,13 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock2(pj_pool_t *pool, PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET && cb && p_key, PJ_EINVAL); + /* On platforms with fd_set containing fd bitmap such as *nix family, + * avoid potential memory corruption caused by select() when given + * an fd that is higher than FD_SETSIZE. + */ + if (sizeof(fd_set) < FD_SETSIZE && sock >= PJ_IOQUEUE_MAX_HANDLES) + return PJ_ETOOBIG; + pj_lock_acquire(ioqueue->lock); if (ioqueue->count >= ioqueue->max) { @@ -831,13 +838,14 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) { pj_fd_set_t rfdset, wfdset, xfdset; int nfds; - int count, i, counter; + int i, count, event_cnt, processed_cnt; pj_ioqueue_key_t *h; + enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS }; struct event { pj_ioqueue_key_t *key; enum ioqueue_event_type event_type; - } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL]; + } event[MAX_EVENTS]; PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL); @@ -889,8 +897,6 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) return 0; else if (count < 0) return -pj_get_netos_error(); - else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) - count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL; /* Scan descriptor sets for event and add the events in the event * array to be processed later in this function. We do this so that @@ -898,13 +904,15 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) */ pj_lock_acquire(ioqueue->lock); - counter = 0; + event_cnt = 0; /* Scan for writable sockets first to handle piggy-back data * coming with accept(). */ - h = ioqueue->active_list.next; - for ( ; h!=&ioqueue->active_list && counternext) { + for (h = ioqueue->active_list.next; + h != &ioqueue->active_list && event_cnt < MAX_EVENTS; + h = h->next) + { if ( (key_has_pending_write(h) || key_has_pending_connect(h)) && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h)) @@ -912,39 +920,39 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) #if PJ_IOQUEUE_HAS_SAFE_UNREG increment_counter(h); #endif - event[counter].key = h; - event[counter].event_type = WRITEABLE_EVENT; - ++counter; + event[event_cnt].key = h; + event[event_cnt].event_type = WRITEABLE_EVENT; + ++event_cnt; } /* Scan for readable socket. */ if ((key_has_pending_read(h) || key_has_pending_accept(h)) && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) && - counterfd, &xfdset) && - !IS_CLOSING(h) && countergrp_lock) pj_grp_lock_add_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); } @@ -955,37 +963,46 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout) PJ_RACE_ME(5); - count = counter; + processed_cnt = 0; /* Now process all events. The dispatch functions will take care * of locking in each of the key */ - for (counter=0; countergrp_lock) - pj_grp_lock_dec_ref_dbg(event[counter].key->grp_lock, + if (event[i].key->grp_lock) + pj_grp_lock_dec_ref_dbg(event[i].key->grp_lock, "ioqueue", 0); } + TRACE__((THIS_FILE, " poll: count=%d events=%d processed=%d", + count, event_cnt, processed_cnt)); - return count; + return processed_cnt; } -- cgit v1.2.3