summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/ioqueue_epoll.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/ioqueue_epoll.c')
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c89
1 files changed, 51 insertions, 38 deletions
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index adc2c310..b67a3d17 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -651,12 +651,13 @@ static void scan_closing_keys(pj_ioqueue_t *ioqueue)
*/
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
- int i, count, processed;
+ int i, count, event_cnt, processed_cnt;
int msec;
//struct epoll_event *events = ioqueue->events;
//struct queue *queue = ioqueue->queue;
- struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
- struct queue queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+ enum { MAX_EVENTS = PJ_IOQUEUE_MAX_CAND_EVENTS };
+ struct epoll_event events[MAX_EVENTS];
+ struct queue queue[MAX_EVENTS];
pj_timestamp t1, t2;
PJ_CHECK_STACK();
@@ -667,7 +668,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_get_timestamp(&t1);
//count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
- count = os_epoll_wait( ioqueue->epfd, events, PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL, msec);
+ count = os_epoll_wait( ioqueue->epfd, events, MAX_EVENTS, msec);
if (count == 0) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Check the closing keys only when there's no activity and when there are
@@ -694,7 +695,7 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Lock ioqueue. */
pj_lock_acquire(ioqueue->lock);
- for (processed=0, i=0; i<count; ++i) {
+ for (event_cnt=0, i=0; i<count; ++i) {
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
events[i].epoll_data;
@@ -709,9 +710,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = READABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = READABLE_EVENT;
+ ++event_cnt;
continue;
}
@@ -723,9 +724,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = WRITEABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = WRITEABLE_EVENT;
+ ++event_cnt;
continue;
}
@@ -738,9 +739,9 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = WRITEABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = WRITEABLE_EVENT;
+ ++event_cnt;
continue;
}
#endif /* PJ_HAS_TCP */
@@ -758,21 +759,21 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = EXCEPTION_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = EXCEPTION_EVENT;
+ ++event_cnt;
} else if (key_has_pending_read(h) || key_has_pending_accept(h)) {
#if PJ_IOQUEUE_HAS_SAFE_UNREG
increment_counter(h);
#endif
- queue[processed].key = h;
- queue[processed].event_type = READABLE_EVENT;
- ++processed;
+ queue[event_cnt].key = h;
+ queue[event_cnt].event_type = READABLE_EVENT;
+ ++event_cnt;
}
continue;
}
}
- for (i=0; i<processed; ++i) {
+ for (i=0; i<event_cnt; ++i) {
if (queue[i].key->grp_lock)
pj_grp_lock_add_ref_dbg(queue[i].key->grp_lock, "ioqueue", 0);
}
@@ -783,22 +784,31 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
PJ_RACE_ME(5);
+ processed_cnt = 0;
+
/* Now process the events. */
- for (i=0; i<processed; ++i) {
- switch (queue[i].event_type) {
- case READABLE_EVENT:
- ioqueue_dispatch_read_event(ioqueue, queue[i].key);
- break;
- case WRITEABLE_EVENT:
- ioqueue_dispatch_write_event(ioqueue, queue[i].key);
- break;
- case EXCEPTION_EVENT:
- ioqueue_dispatch_exception_event(ioqueue, queue[i].key);
- break;
- case NO_EVENT:
- pj_assert(!"Invalid event!");
- break;
- }
+ for (i=0; i<event_cnt; ++i) {
+
+ /* Just do not exceed PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL */
+ if (processed_cnt < PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL) {
+ switch (queue[i].event_type) {
+ case READABLE_EVENT:
+ if (ioqueue_dispatch_read_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case WRITEABLE_EVENT:
+ if (ioqueue_dispatch_write_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case EXCEPTION_EVENT:
+ if (ioqueue_dispatch_exception_event(ioqueue, queue[i].key))
+ ++processed_cnt;
+ break;
+ case NO_EVENT:
+ pj_assert(!"Invalid event!");
+ break;
+ }
+ }
#if PJ_IOQUEUE_HAS_SAFE_UNREG
decrement_counter(queue[i].key);
@@ -812,14 +822,17 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
/* Special case:
* When epoll returns > 0 but no descriptors are actually set!
*/
- if (count > 0 && !processed && msec > 0) {
+ if (count > 0 && !event_cnt && msec > 0) {
pj_thread_sleep(msec);
}
+ TRACE_((THIS_FILE, " poll: count=%d events=%d processed=%d",
+ count, event_cnt, processed_cnt));
+
pj_get_timestamp(&t1);
TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
processed, pj_elapsed_usec(&t2, &t1)));
- return processed;
+ return processed_cnt;
}