diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 19:46:48 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 19:46:48 +0000 |
commit | 4539be1bf3b88417e3b52fdbeb17d6a74266cb4a (patch) | |
tree | 5db399f964cb95937b02f66ee797d4ca7d14c334 | |
parent | a5f8e7c270ac8c919dafe6c2fd0663ee6ce47fc4 (diff) |
Tested UDP echo server on Win2K
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@17 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r-- | pjlib/build/pjlib.dsp | 2 | ||||
-rw-r--r-- | pjlib/build/pjlib_test.dsp | 4 | ||||
-rw-r--r-- | pjlib/src/pj/ioqueue_common_abs.c | 17 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/echo_srv.c | 319 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/list.c | 2 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.c | 4 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c | 34 |
7 files changed, 41 insertions, 341 deletions
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp index 07550f90..8d84a5c1 100644 --- a/pjlib/build/pjlib.dsp +++ b/pjlib/build/pjlib.dsp @@ -216,11 +216,11 @@ SOURCE=..\src\pj\ioqueue_common_abs.h # Begin Source File
SOURCE=..\src\pj\ioqueue_select.c
+# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
SOURCE=..\src\pj\ioqueue_winnt.c
-# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp index d17af417..9aff9958 100644 --- a/pjlib/build/pjlib_test.dsp +++ b/pjlib/build/pjlib_test.dsp @@ -95,10 +95,6 @@ SOURCE="..\src\pjlib-test\echo_clt.c" # End Source File
# Begin Source File
-SOURCE="..\src\pjlib-test\echo_srv.c"
-# End Source File
-# Begin Source File
-
SOURCE="..\src\pjlib-test\errno.c"
# End Source File
# Begin Source File
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 774d53e3..d5f8d657 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -287,9 +287,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h) /* Done. */ } else { - pj_assert(!"Descriptor is signaled but key " - "has no pending operation!"); - + /* + * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */ pj_mutex_unlock(h->mutex); } } @@ -416,6 +418,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) } } else { + /*
+ * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */
pj_mutex_unlock(h->mutex); } } @@ -616,7 +623,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, */ write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND; - write_op->buf = NULL; + write_op->buf = (void*)data; write_op->size = *length; write_op->written = 0; write_op->flags = flags; @@ -694,7 +701,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, */ write_op = (struct write_operation*)op_key; write_op->op = PJ_IOQUEUE_OP_SEND_TO; - write_op->buf = NULL; + write_op->buf = (void*)data; write_op->size = *length; write_op->written = 0; write_op->flags = flags; diff --git a/pjlib/src/pjlib-test/echo_srv.c b/pjlib/src/pjlib-test/echo_srv.c deleted file mode 100644 index e10b5ffe..00000000 --- a/pjlib/src/pjlib-test/echo_srv.c +++ /dev/null @@ -1,319 +0,0 @@ -/* $Id$ - */ -#include "test.h" -#include <pjlib.h> -#include <pj/compat/high_precision.h> - -#if INCLUDE_ECHO_SERVER - -static pj_bool_t thread_quit_flag; - -struct server -{ - pj_pool_t *pool; - int sock_type; - int thread_count; - pj_ioqueue_t *ioqueue; - pj_sock_t sock; - pj_sock_t client_sock; - pj_ioqueue_key_t *key; - pj_ioqueue_callback cb; - char *buf; - pj_size_t bufsize; - pj_sockaddr_in addr; - int addrlen; - pj_size_t bytes_recv; - pj_timestamp start_time; -}; - -static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) -{ - struct server *server = pj_ioqueue_get_user_data(key); - pj_status_t rc; - - if (server->sock_type == PJ_SOCK_DGRAM) { - if (bytes_read > 0) { - /* Send data back to sender. */ - rc = pj_ioqueue_sendto( server->ioqueue, server->key, - server->buf, bytes_read, 0, - &server->addr, server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...sendto() error", rc); - } - } else { - PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read)); - } - - /* Start next receive. */ - rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, - server->buf, server->bufsize, 0, - &server->addr, &server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recvfrom() error", rc); - } - - } - else if (server->sock_type == PJ_SOCK_STREAM) { - if (bytes_read > 0) { - /* Send data back to sender. */ - rc = pj_ioqueue_send( server->ioqueue, server->key, - server->buf, bytes_read, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...send() error", rc); - bytes_read = 0; - } - } - - if (bytes_read <= 0) { - PJ_LOG(3,("", "...tcp closed")); - pj_ioqueue_unregister( server->ioqueue, server->key ); - pj_sock_close( server->sock ); - pj_pool_release( server->pool ); - return; - } - - /* Start next receive. */ - rc = pj_ioqueue_recv( server->ioqueue, server->key, - server->buf, server->bufsize, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recv() error", rc); - } - } - - /* Add counter. */ - if (bytes_read > 0) { - if (server->bytes_recv == 0) { - pj_get_timestamp(&server->start_time); - server->bytes_recv += bytes_read; - } else { - enum { USECS_IN_SECOND = 1000000 }; - pj_timestamp now; - pj_uint32_t usec_elapsed; - - server->bytes_recv += bytes_read; - - pj_get_timestamp(&now); - usec_elapsed = pj_elapsed_usec(&server->start_time, &now); - if (usec_elapsed > USECS_IN_SECOND) { - if (usec_elapsed < 2 * USECS_IN_SECOND) { - pj_highprec_t bw; - pj_uint32_t bw32; - const char *type_name; - - /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */ - bw = server->bytes_recv; - pj_highprec_mul(bw, USECS_IN_SECOND); - pj_highprec_div(bw, usec_elapsed); - - bw32 = (pj_uint32_t) bw; - - if (server->sock_type==PJ_SOCK_STREAM) - type_name = "tcp"; - else if (server->sock_type==PJ_SOCK_DGRAM) - type_name = "udp"; - else - type_name = "???"; - - PJ_LOG(3,("", - "...[%s:%d (%d threads)] Current bandwidth=%u KBps", - type_name, - ECHO_SERVER_START_PORT+server->thread_count, - server->thread_count, - bw32/1024)); - } - server->start_time = now; - server->bytes_recv = 0; - } - } - } -} - -static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock, - int status) -{ - struct server *server_server = pj_ioqueue_get_user_data(key); - pj_status_t rc; - - PJ_UNUSED_ARG(sock); - - if (status == 0) { - pj_pool_t *pool; - struct server *new_server; - - pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); - new_server = pj_pool_zalloc(pool, sizeof(struct server)); - - new_server->pool = pool; - new_server->ioqueue = server_server->ioqueue; - new_server->sock_type = server_server->sock_type; - new_server->thread_count = server_server->thread_count; - new_server->sock = server_server->client_sock; - new_server->bufsize = 4096; - new_server->buf = pj_pool_alloc(pool, new_server->bufsize); - new_server->cb = server_server->cb; - - rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue, - new_server->sock, new_server, - &server_server->cb, &new_server->key); - if (rc != PJ_SUCCESS) { - app_perror("...registering new tcp sock", rc); - pj_sock_close(new_server->sock); - pj_pool_release(pool); - thread_quit_flag = 1; - return; - } - - rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key, - new_server->buf, new_server->bufsize, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recv() error", rc); - pj_sock_close(new_server->sock); - pj_pool_release(pool); - thread_quit_flag = 1; - return; - } - } - - rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key, - &server_server->client_sock, - NULL, NULL, NULL); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...accept() error", rc); - thread_quit_flag = 1; - } -} - -static int thread_proc(void *arg) -{ - pj_ioqueue_t *ioqueue = arg; - - while (!thread_quit_flag) { - pj_time_val timeout; - int count; - - timeout.sec = 0; timeout.msec = 50; - count = pj_ioqueue_poll( ioqueue, &timeout ); - if (count > 0) { - count = 0; - } - } - - return 0; -} - -static int start_echo_server( int sock_type, int port, int thread_count ) -{ - pj_pool_t *pool; - struct server *server; - int i; - pj_status_t rc; - - - pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); - if (!pool) - return -10; - - server = pj_pool_zalloc(pool, sizeof(struct server)); - - server->sock_type = sock_type; - server->thread_count = thread_count; - server->cb.on_read_complete = &on_read_complete; - server->cb.on_accept_complete = &on_accept_complete; - - /* create ioqueue */ - rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue); - if (rc != PJ_SUCCESS) { - app_perror("...error creating ioqueue", rc); - return -20; - } - - /* create and register socket to ioqueue. */ - rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock); - if (rc != PJ_SUCCESS) { - app_perror("...error initializing socket", rc); - return -30; - } - - rc = pj_ioqueue_register_sock( pool, server->ioqueue, - server->sock, - server, &server->cb, - &server->key); - if (rc != PJ_SUCCESS) { - app_perror("...error registering socket to ioqueue", rc); - return -40; - } - - /* create receive buffer. */ - server->bufsize = 4096; - server->buf = pj_pool_alloc(pool, server->bufsize); - - if (sock_type == PJ_SOCK_DGRAM) { - server->addrlen = sizeof(server->addr); - rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, - server->buf, server->bufsize, - 0, - &server->addr, &server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...read error", rc); - return -50; - } - } else { - rc = pj_ioqueue_accept( server->ioqueue, server->key, - &server->client_sock, NULL, NULL, NULL ); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...accept() error", rc); - return -60; - } - } - - /* create threads. */ - - for (i=0; i<thread_count; ++i) { - pj_thread_t *thread; - rc = pj_thread_create(pool, NULL, &thread_proc, server->ioqueue, - PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread); - if (rc != PJ_SUCCESS) { - app_perror("...unable to create thread", rc); - return -70; - } - } - - /* Done. */ - return PJ_SUCCESS; -} - -int echo_server(void) -{ - enum { MAX_THREADS = 4 }; - int sock_types[2]; - int i, j, rc; - - sock_types[0] = PJ_SOCK_DGRAM; - sock_types[1] = PJ_SOCK_STREAM; - - for (i=0; i<2; ++i) { - for (j=0; j<MAX_THREADS; ++j) { - rc = start_echo_server(sock_types[i], ECHO_SERVER_START_PORT+j, j+1); - if (rc != 0) - return rc; - } - } - - pj_thread_sleep(100); - PJ_LOG(3,("", "Echo server started in port %d - %d", - ECHO_SERVER_START_PORT, ECHO_SERVER_START_PORT + MAX_THREADS)); - - PJ_LOG(3,("", "Press Ctrl-C to quit")); - - for (;!thread_quit_flag;) { - pj_thread_sleep(1000); - } - - return 0; -} - - -#else -int dummy_echo_server; -#endif /* INCLUDE_ECHO_SERVER */ - diff --git a/pjlib/src/pjlib-test/list.c b/pjlib/src/pjlib-test/list.c index 817525ed..d9f097c5 100644 --- a/pjlib/src/pjlib-test/list.c +++ b/pjlib/src/pjlib-test/list.c @@ -33,7 +33,7 @@ typedef struct list_node { - PJ_DECL_LIST_MEMBER(struct list_node) + PJ_DECL_LIST_MEMBER(struct list_node); int value; } list_node; diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c index e9a62e33..8ccdc568 100644 --- a/pjlib/src/pjlib-test/test.c +++ b/pjlib/src/pjlib-test/test.c @@ -132,7 +132,9 @@ int test_inner(void) #if INCLUDE_ECHO_SERVER //echo_server(); - echo_srv_sync(); + //echo_srv_sync();
+ udp_echo_srv_ioqueue();
+ #elif INCLUDE_ECHO_CLIENT if (param_echo_sock_type == 0) param_echo_sock_type = PJ_SOCK_DGRAM; diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c index 5fe6d6f2..981ff30e 100644 --- a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c +++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c @@ -14,6 +14,8 @@ struct op_key pj_size_t size;
int is_pending;
pj_status_t last_err;
+ pj_sockaddr_in addr;
+ int addrlen;
};
static void on_read_complete(pj_ioqueue_key_t *key,
@@ -28,8 +30,10 @@ static void on_read_complete(pj_ioqueue_key_t *key, recv_rec->is_pending = 0;
if (bytes_received < 0) {
- PJ_LOG(3,("","...error receiving data, received=%d",
- bytes_received));
+ if (-bytes_received != recv_rec->last_err) {
+ recv_rec->last_err = -bytes_received;
+ app_perror("...error receiving data", -bytes_received);
+ }
} else if (bytes_received == 0) {
/* note: previous error, or write callback */
} else {
@@ -38,20 +42,24 @@ static void on_read_complete(pj_ioqueue_key_t *key, if (!send_rec->is_pending) {
pj_ssize_t sent = bytes_received;
pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
- rc = pj_ioqueue_send(key, &send_rec->op_key_,
- send_rec->buffer, &sent, 0);
+ pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
+ send_rec->addrlen = recv_rec->addrlen;
+ rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
+ send_rec->buffer, &sent, 0,
+ &send_rec->addr, send_rec->addrlen);
send_rec->is_pending = (rc==PJ_EPENDING);
if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
- app_perror("...send error", rc);
+ app_perror("...send error(1)", rc);
}
}
}
if (!send_rec->is_pending) {
bytes_received = recv_rec->size;
- rc = pj_ioqueue_recv(key, &recv_rec->op_key_,
- recv_rec->buffer, &bytes_received, 0);
+ rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
+ recv_rec->buffer, &bytes_received, 0,
+ &recv_rec->addr, &recv_rec->addrlen);
recv_rec->is_pending = (rc==PJ_EPENDING);
if (rc == PJ_SUCCESS) {
/* fall through next loop. */
@@ -80,8 +88,11 @@ static void on_write_complete(pj_ioqueue_key_t *key, struct op_key *send_rec = (struct op_key*)op_key;
if (bytes_sent <= 0) {
- pj_status_t rc = pj_get_netos_error();
- app_perror("...send error", rc);
+ pj_status_t rc = -bytes_sent;
+ if (rc != send_rec->last_err) {
+ send_rec->last_err = rc;
+ app_perror("...send error(2)", rc);
+ }
}
send_rec->is_pending = 0;
@@ -101,6 +112,8 @@ static int worker_thread(void *arg) read_op.last_err = 0;
read_op.buffer = recv_buf;
read_op.size = sizeof(recv_buf);
+ read_op.addrlen = sizeof(read_op.addr);
+
write_op.peer = &read_op;
write_op.is_pending = 0;
write_op.last_err = 0;
@@ -108,7 +121,8 @@ static int worker_thread(void *arg) write_op.size = sizeof(send_buf);
length = sizeof(recv_buf);
- rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);
+ rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
+ &read_op.addr, &read_op.addrlen);
if (rc == PJ_SUCCESS) {
read_op.is_pending = 1;
on_read_complete(key, &read_op.op_key_, length);
|