summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2006-07-08 19:46:43 +0000
committerBenny Prijono <bennylp@teluu.com>2006-07-08 19:46:43 +0000
commita286f67546eb45c9779660decfb990b38203a268 (patch)
treec1332dc8c7179d59960eef3d68422aefe83cb8ab /pjlib
parentfc263fb7b06d10cabe2dabeb6a22a722a463527b (diff)
Attempted to fix epoll for Linux
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@592 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c31
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.h4
-rw-r--r--pjlib/src/pj/ioqueue_epoll.c74
-rw-r--r--pjlib/src/pj/ioqueue_select.c16
-rw-r--r--pjlib/src/pjlib-test/ioq_tcp.c27
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c10
-rw-r--r--pjlib/src/pjlib-test/test.h4
7 files changed, 106 insertions, 60 deletions
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 95ca6d6a..982c8080 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -196,8 +196,8 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Clear operation. */
h->connecting = 0;
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
@@ -272,7 +272,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
pj_list_erase(write_op);
if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
@@ -325,7 +325,7 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
@@ -378,7 +378,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
@@ -411,7 +411,7 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
- ioqueue_remove_from_set(ioqueue, h->fd, READABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
bytes_read = read_op->size;
@@ -518,8 +518,8 @@ void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
/* Clear operation. */
h->connecting = 0;
- ioqueue_remove_from_set(ioqueue, h->fd, WRITEABLE_EVENT);
- ioqueue_remove_from_set(ioqueue, h->fd, EXCEPTION_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
+ ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
pj_mutex_unlock(h->mutex);
@@ -585,7 +585,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@@ -653,7 +653,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@@ -759,7 +759,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@@ -827,6 +827,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
}
status = status;
}
+ PJ_LOG(3,(THIS_FILE, "pending write operation!!"));
}
/*
@@ -876,7 +877,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@@ -945,7 +946,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->accept_list, accept_op);
- ioqueue_add_to_set(key->ioqueue, key->fd, READABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
@@ -981,8 +982,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
/* Pending! */
pj_mutex_lock(key->mutex);
key->connecting = PJ_TRUE;
- ioqueue_add_to_set(key->ioqueue, key->fd, WRITEABLE_EVENT);
- ioqueue_add_to_set(key->ioqueue, key->fd, EXCEPTION_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
+ ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
} else {
diff --git a/pjlib/src/pj/ioqueue_common_abs.h b/pjlib/src/pj/ioqueue_common_abs.h
index e7d05561..e56065d7 100644
--- a/pjlib/src/pj/ioqueue_common_abs.h
+++ b/pjlib/src/pj/ioqueue_common_abs.h
@@ -128,9 +128,9 @@ enum ioqueue_event_type
};
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type );
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type);
diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c
index 14a6cc32..66dd13e5 100644
--- a/pjlib/src/pj/ioqueue_epoll.c
+++ b/pjlib/src/pj/ioqueue_epoll.c
@@ -141,8 +141,8 @@
#define THIS_FILE "ioq_epoll"
-#define TRACE_(expr) PJ_LOG(3,expr)
-//#define TRACE_(expr)
+//#define TRACE_(expr) PJ_LOG(3,expr)
+#define TRACE_(expr)
/*
* Include common ioqueue abstraction.
@@ -157,6 +157,12 @@ struct pj_ioqueue_key_t
DECLARE_COMMON_KEY
};
+struct queue
+{
+ pj_ioqueue_key_t *key;
+ enum ioqueue_event_type event_type;
+};
+
/*
* This describes the I/O queue.
*/
@@ -167,6 +173,8 @@ struct pj_ioqueue_t
unsigned max, count;
pj_ioqueue_key_t hlist;
int epfd;
+ struct epoll_event *events;
+ struct queue *queue;
};
/* Include implementation for common abstraction after we declare
@@ -229,6 +237,12 @@ PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool,
return PJ_RETURN_OS_ERROR(pj_get_native_os_error());
}
+ ioqueue->events = pj_pool_calloc(pool, max_fd, sizeof(struct epoll_event));
+ PJ_ASSERT_RETURN(ioqueue->events != NULL, PJ_ENOMEM);
+
+ ioqueue->queue = pj_pool_calloc(pool, max_fd, sizeof(struct queue));
+ PJ_ASSERT_RETURN(ioqueue->queue != NULL, PJ_ENOMEM);
+
PJ_LOG(4, ("pjlib", "epoll I/O Queue created (%p)", ioqueue));
*p_ioqueue = ioqueue;
@@ -305,7 +319,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
}
/* os_epoll_ctl. */
- ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
+ ev.events = EPOLLIN | EPOLLERR;
ev.epoll_data = (epoll_data_type)key;
status = os_epoll_ctl(ioqueue->epfd, EPOLL_CTL_ADD, sock, &ev);
if (status < 0) {
@@ -322,6 +336,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
pj_list_insert_before(&ioqueue->hlist, key);
++ioqueue->count;
+ //TRACE_((THIS_FILE, "socket registered, count=%d", ioqueue->count));
+
on_return:
*p_key = key;
pj_lock_release(ioqueue->lock);
@@ -373,9 +389,16 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
* set for the specified event.
*/
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
+ if (event_type == WRITEABLE_EVENT) {
+ struct epoll_event ev;
+
+ ev.events = EPOLLIN | EPOLLERR;
+ ev.epoll_data = (epoll_data_type)key;
+ os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
+ }
}
/*
@@ -385,9 +408,16 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
+ if (event_type == WRITEABLE_EVENT) {
+ struct epoll_event ev;
+
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;
+ ev.epoll_data = (epoll_data_type)key;
+ os_epoll_ctl( ioqueue->epfd, EPOLL_CTL_MOD, key->fd, &ev);
+ }
}
/*
@@ -397,22 +427,31 @@ static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
{
int i, count, processed;
- struct epoll_event events[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
int msec;
- struct queue {
- pj_ioqueue_key_t *key;
- enum ioqueue_event_type event_type;
- } queue[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
+ struct epoll_event *events = ioqueue->events;
+ struct queue *queue = ioqueue->queue;
+ pj_timestamp t1, t2;
PJ_CHECK_STACK();
msec = timeout ? PJ_TIME_VAL_MSEC(*timeout) : 9000;
-
- count = os_epoll_wait( ioqueue->epfd, events, PJ_ARRAY_SIZE(events), msec);
- if (count == 0)
+
+ TRACE_((THIS_FILE, "start os_epoll_wait, msec=%d", msec));
+ pj_get_timestamp(&t1);
+
+ count = os_epoll_wait( ioqueue->epfd, events, ioqueue->max, msec);
+ if (count == 0) {
+ TRACE_((THIS_FILE, "os_epoll_wait timed out"));
return count;
- else if (count < 0)
+ }
+ else if (count < 0) {
+ TRACE_((THIS_FILE, "os_epoll_wait error"));
return -pj_get_netos_error();
+ }
+
+ pj_get_timestamp(&t2);
+ TRACE_((THIS_FILE, "os_epoll_wait returns %d, time=%d usec",
+ count, pj_elapsed_usec(&t1, &t2)));
/* Lock ioqueue. */
pj_lock_acquire(ioqueue->lock);
@@ -421,6 +460,8 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
pj_ioqueue_key_t *h = (pj_ioqueue_key_t*)(epoll_data_type)
events[i].epoll_data;
+ TRACE_((THIS_FILE, "event %d: events=%d", i, events[i].events));
+
/*
* Check readability.
*/
@@ -486,6 +527,11 @@ PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
if (count > 0 && !processed && msec > 0) {
pj_thread_sleep(msec);
}
+
+ pj_get_timestamp(&t1);
+ TRACE_((THIS_FILE, "ioqueue_poll() returns %d, time=%d usec",
+ processed, pj_elapsed_usec(&t2, &t1)));
+
return processed;
}
diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c
index 33fcae7d..93adf07d 100644
--- a/pjlib/src/pj/ioqueue_select.c
+++ b/pjlib/src/pj/ioqueue_select.c
@@ -542,17 +542,17 @@ static void validate_sets(const pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type)
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->rfdset);
+ PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->wfdset);
+ PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
else if (event_type == EXCEPTION_EVENT)
- PJ_FD_CLR((pj_sock_t)fd, &ioqueue->xfdset);
+ PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
else
pj_assert(0);
@@ -566,17 +566,17 @@ static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
* set for the specified event.
*/
static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
- pj_sock_t fd,
+ pj_ioqueue_key_t *key,
enum ioqueue_event_type event_type )
{
pj_lock_acquire(ioqueue->lock);
if (event_type == READABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->rfdset);
+ PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
else if (event_type == WRITEABLE_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->wfdset);
+ PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
else if (event_type == EXCEPTION_EVENT)
- PJ_FD_SET((pj_sock_t)fd, &ioqueue->xfdset);
+ PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
else
pj_assert(0);
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index 7f004d9f..4a546f03 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -38,7 +38,6 @@
#if PJ_HAS_TCP
#define THIS_FILE "test_tcp"
-#define PORT 50000
#define NON_EXISTANT_PORT 50123
#define LOOP 100
#define BUF_MIN_SIZE 32
@@ -257,14 +256,21 @@ static int compliance_test_0(void)
}
// Bind server socket.
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PJ_AF_INET;
- addr.sin_port = pj_htons(PORT);
- if (pj_sock_bind(ssock, &addr, sizeof(addr))) {
+ pj_sockaddr_in_init(&addr, 0, 0);
+ if ((rc=pj_sock_bind(ssock, &addr, sizeof(addr))) != 0 ) {
app_perror("...bind error", rc);
status=-10; goto on_error;
}
+ // Get server address.
+ client_addr_len = sizeof(addr);
+ rc = pj_sock_getsockname(ssock, &addr, &client_addr_len);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...ERROR in pj_sock_getsockname()", rc);
+ status=-15; goto on_error;
+ }
+ addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
+
// Create I/O Queue.
rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
@@ -302,12 +308,6 @@ static int compliance_test_0(void)
++pending_op;
}
- // Initialize remote address.
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PJ_AF_INET;
- addr.sin_port = pj_htons(PORT);
- addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
-
// Client socket connect()
status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
@@ -461,10 +461,7 @@ static int compliance_test_1(void)
}
// Initialize remote address.
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = PJ_AF_INET;
- addr.sin_port = pj_htons(NON_EXISTANT_PORT);
- addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
+ pj_sockaddr_in_init(&addr, pj_cstr(&s, "127.0.0.1"), NON_EXISTANT_PORT);
// Client socket connect()
status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index d26e8fd9..dfc818fb 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -40,7 +40,7 @@
#define THIS_FILE "test_udp"
#define PORT 51233
-#define LOOP 100
+#define LOOP 2
///#define LOOP 2
#define BUF_MIN_SIZE 32
#define BUF_MAX_SIZE 2048
@@ -817,7 +817,7 @@ int udp_ioqueue_test()
int status;
int bufsize, sock_count;
- goto pass1;
+ //goto pass1;
PJ_LOG(3, (THIS_FILE, "...compliance test (%s)", pj_ioqueue_name()));
if ((status=compliance_test()) != 0) {
@@ -836,6 +836,8 @@ int udp_ioqueue_test()
return status;
}
+ //return 0;
+
PJ_LOG(4, (THIS_FILE, "...benchmarking different buffer size:"));
PJ_LOG(4, (THIS_FILE, "... note: buf=bytes sent, fds=# of fds, "
"elapsed=in timer ticks"));
@@ -847,7 +849,7 @@ pass1:
PJ_LOG(3, (THIS_FILE, "... (bytes) (nanosec)"));
PJ_LOG(3, (THIS_FILE, "...====================================="));
- goto pass2;
+ //goto pass2;
for (bufsize=BUF_MIN_SIZE; bufsize <= BUF_MAX_SIZE; bufsize *= 2) {
if ((status=bench_test(bufsize, SOCK_INACTIVE_MIN)) != 0)
@@ -859,7 +861,7 @@ pass2:
sock_count<=SOCK_INACTIVE_MAX+2;
sock_count *= 2)
{
- PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
+ //PJ_LOG(3,(THIS_FILE, "...testing with %d fds", sock_count));
if ((status=bench_test(bufsize, sock_count-2)) != 0)
return status;
}
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index b7fd6ded..d552ea40 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -24,7 +24,7 @@
#define GROUP_LIBC 0
#define GROUP_OS 0
#define GROUP_DATA_STRUCTURE 0
-#define GROUP_NETWORK 0
+#define GROUP_NETWORK 1
#define GROUP_FILE 0
#define INCLUDE_ERRNO_TEST GROUP_LIBC
@@ -45,7 +45,7 @@
#define INCLUDE_SOCK_TEST GROUP_NETWORK
#define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK
#define INCLUDE_SELECT_TEST GROUP_NETWORK
-#define INCLUDE_UDP_IOQUEUE_TEST 1 //GROUP_NETWORK
+#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_UNREG_TEST GROUP_NETWORK