summaryrefslogtreecommitdiff
path: root/pjlib
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-06 19:46:48 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-06 19:46:48 +0000
commit4539be1bf3b88417e3b52fdbeb17d6a74266cb4a (patch)
tree5db399f964cb95937b02f66ee797d4ca7d14c334 /pjlib
parenta5f8e7c270ac8c919dafe6c2fd0663ee6ce47fc4 (diff)
Tested UDP echo server on Win2K
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@17 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r--pjlib/build/pjlib.dsp2
-rw-r--r--pjlib/build/pjlib_test.dsp4
-rw-r--r--pjlib/src/pj/ioqueue_common_abs.c17
-rw-r--r--pjlib/src/pjlib-test/echo_srv.c319
-rw-r--r--pjlib/src/pjlib-test/list.c2
-rw-r--r--pjlib/src/pjlib-test/test.c4
-rw-r--r--pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c34
7 files changed, 41 insertions, 341 deletions
diff --git a/pjlib/build/pjlib.dsp b/pjlib/build/pjlib.dsp
index 07550f90..8d84a5c1 100644
--- a/pjlib/build/pjlib.dsp
+++ b/pjlib/build/pjlib.dsp
@@ -216,11 +216,11 @@ SOURCE=..\src\pj\ioqueue_common_abs.h
# Begin Source File
SOURCE=..\src\pj\ioqueue_select.c
+# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
SOURCE=..\src\pj\ioqueue_winnt.c
-# PROP Exclude_From_Build 1
# End Source File
# Begin Source File
diff --git a/pjlib/build/pjlib_test.dsp b/pjlib/build/pjlib_test.dsp
index d17af417..9aff9958 100644
--- a/pjlib/build/pjlib_test.dsp
+++ b/pjlib/build/pjlib_test.dsp
@@ -95,10 +95,6 @@ SOURCE="..\src\pjlib-test\echo_clt.c"
# End Source File
# Begin Source File
-SOURCE="..\src\pjlib-test\echo_srv.c"
-# End Source File
-# Begin Source File
-
SOURCE="..\src\pjlib-test\errno.c"
# End Source File
# Begin Source File
diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c
index 774d53e3..d5f8d657 100644
--- a/pjlib/src/pj/ioqueue_common_abs.c
+++ b/pjlib/src/pj/ioqueue_common_abs.c
@@ -287,9 +287,11 @@ void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
/* Done. */
} else {
- pj_assert(!"Descriptor is signaled but key "
- "has no pending operation!");
-
+ /*
+ * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */
pj_mutex_unlock(h->mutex);
}
}
@@ -416,6 +418,11 @@ void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
}
} else {
+ /*
+ * This is normal; execution may fall here when multiple threads
+ * are signalled for the same event, but only one thread eventually
+ * able to process the event.
+ */
pj_mutex_unlock(h->mutex);
}
}
@@ -616,7 +623,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
*/
write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND;
- write_op->buf = NULL;
+ write_op->buf = (void*)data;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
@@ -694,7 +701,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
*/
write_op = (struct write_operation*)op_key;
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
- write_op->buf = NULL;
+ write_op->buf = (void*)data;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
diff --git a/pjlib/src/pjlib-test/echo_srv.c b/pjlib/src/pjlib-test/echo_srv.c
deleted file mode 100644
index e10b5ffe..00000000
--- a/pjlib/src/pjlib-test/echo_srv.c
+++ /dev/null
@@ -1,319 +0,0 @@
-/* $Id$
- */
-#include "test.h"
-#include <pjlib.h>
-#include <pj/compat/high_precision.h>
-
-#if INCLUDE_ECHO_SERVER
-
-static pj_bool_t thread_quit_flag;
-
-struct server
-{
- pj_pool_t *pool;
- int sock_type;
- int thread_count;
- pj_ioqueue_t *ioqueue;
- pj_sock_t sock;
- pj_sock_t client_sock;
- pj_ioqueue_key_t *key;
- pj_ioqueue_callback cb;
- char *buf;
- pj_size_t bufsize;
- pj_sockaddr_in addr;
- int addrlen;
- pj_size_t bytes_recv;
- pj_timestamp start_time;
-};
-
-static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
-{
- struct server *server = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
-
- if (server->sock_type == PJ_SOCK_DGRAM) {
- if (bytes_read > 0) {
- /* Send data back to sender. */
- rc = pj_ioqueue_sendto( server->ioqueue, server->key,
- server->buf, bytes_read, 0,
- &server->addr, server->addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...sendto() error", rc);
- }
- } else {
- PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read));
- }
-
- /* Start next receive. */
- rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
- server->buf, server->bufsize, 0,
- &server->addr, &server->addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...recvfrom() error", rc);
- }
-
- }
- else if (server->sock_type == PJ_SOCK_STREAM) {
- if (bytes_read > 0) {
- /* Send data back to sender. */
- rc = pj_ioqueue_send( server->ioqueue, server->key,
- server->buf, bytes_read, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...send() error", rc);
- bytes_read = 0;
- }
- }
-
- if (bytes_read <= 0) {
- PJ_LOG(3,("", "...tcp closed"));
- pj_ioqueue_unregister( server->ioqueue, server->key );
- pj_sock_close( server->sock );
- pj_pool_release( server->pool );
- return;
- }
-
- /* Start next receive. */
- rc = pj_ioqueue_recv( server->ioqueue, server->key,
- server->buf, server->bufsize, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...recv() error", rc);
- }
- }
-
- /* Add counter. */
- if (bytes_read > 0) {
- if (server->bytes_recv == 0) {
- pj_get_timestamp(&server->start_time);
- server->bytes_recv += bytes_read;
- } else {
- enum { USECS_IN_SECOND = 1000000 };
- pj_timestamp now;
- pj_uint32_t usec_elapsed;
-
- server->bytes_recv += bytes_read;
-
- pj_get_timestamp(&now);
- usec_elapsed = pj_elapsed_usec(&server->start_time, &now);
- if (usec_elapsed > USECS_IN_SECOND) {
- if (usec_elapsed < 2 * USECS_IN_SECOND) {
- pj_highprec_t bw;
- pj_uint32_t bw32;
- const char *type_name;
-
- /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */
- bw = server->bytes_recv;
- pj_highprec_mul(bw, USECS_IN_SECOND);
- pj_highprec_div(bw, usec_elapsed);
-
- bw32 = (pj_uint32_t) bw;
-
- if (server->sock_type==PJ_SOCK_STREAM)
- type_name = "tcp";
- else if (server->sock_type==PJ_SOCK_DGRAM)
- type_name = "udp";
- else
- type_name = "???";
-
- PJ_LOG(3,("",
- "...[%s:%d (%d threads)] Current bandwidth=%u KBps",
- type_name,
- ECHO_SERVER_START_PORT+server->thread_count,
- server->thread_count,
- bw32/1024));
- }
- server->start_time = now;
- server->bytes_recv = 0;
- }
- }
- }
-}
-
-static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock,
- int status)
-{
- struct server *server_server = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
-
- PJ_UNUSED_ARG(sock);
-
- if (status == 0) {
- pj_pool_t *pool;
- struct server *new_server;
-
- pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
- new_server = pj_pool_zalloc(pool, sizeof(struct server));
-
- new_server->pool = pool;
- new_server->ioqueue = server_server->ioqueue;
- new_server->sock_type = server_server->sock_type;
- new_server->thread_count = server_server->thread_count;
- new_server->sock = server_server->client_sock;
- new_server->bufsize = 4096;
- new_server->buf = pj_pool_alloc(pool, new_server->bufsize);
- new_server->cb = server_server->cb;
-
- rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue,
- new_server->sock, new_server,
- &server_server->cb, &new_server->key);
- if (rc != PJ_SUCCESS) {
- app_perror("...registering new tcp sock", rc);
- pj_sock_close(new_server->sock);
- pj_pool_release(pool);
- thread_quit_flag = 1;
- return;
- }
-
- rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key,
- new_server->buf, new_server->bufsize, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...recv() error", rc);
- pj_sock_close(new_server->sock);
- pj_pool_release(pool);
- thread_quit_flag = 1;
- return;
- }
- }
-
- rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key,
- &server_server->client_sock,
- NULL, NULL, NULL);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...accept() error", rc);
- thread_quit_flag = 1;
- }
-}
-
-static int thread_proc(void *arg)
-{
- pj_ioqueue_t *ioqueue = arg;
-
- while (!thread_quit_flag) {
- pj_time_val timeout;
- int count;
-
- timeout.sec = 0; timeout.msec = 50;
- count = pj_ioqueue_poll( ioqueue, &timeout );
- if (count > 0) {
- count = 0;
- }
- }
-
- return 0;
-}
-
-static int start_echo_server( int sock_type, int port, int thread_count )
-{
- pj_pool_t *pool;
- struct server *server;
- int i;
- pj_status_t rc;
-
-
- pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
- if (!pool)
- return -10;
-
- server = pj_pool_zalloc(pool, sizeof(struct server));
-
- server->sock_type = sock_type;
- server->thread_count = thread_count;
- server->cb.on_read_complete = &on_read_complete;
- server->cb.on_accept_complete = &on_accept_complete;
-
- /* create ioqueue */
- rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue);
- if (rc != PJ_SUCCESS) {
- app_perror("...error creating ioqueue", rc);
- return -20;
- }
-
- /* create and register socket to ioqueue. */
- rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock);
- if (rc != PJ_SUCCESS) {
- app_perror("...error initializing socket", rc);
- return -30;
- }
-
- rc = pj_ioqueue_register_sock( pool, server->ioqueue,
- server->sock,
- server, &server->cb,
- &server->key);
- if (rc != PJ_SUCCESS) {
- app_perror("...error registering socket to ioqueue", rc);
- return -40;
- }
-
- /* create receive buffer. */
- server->bufsize = 4096;
- server->buf = pj_pool_alloc(pool, server->bufsize);
-
- if (sock_type == PJ_SOCK_DGRAM) {
- server->addrlen = sizeof(server->addr);
- rc = pj_ioqueue_recvfrom( server->ioqueue, server->key,
- server->buf, server->bufsize,
- 0,
- &server->addr, &server->addrlen);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...read error", rc);
- return -50;
- }
- } else {
- rc = pj_ioqueue_accept( server->ioqueue, server->key,
- &server->client_sock, NULL, NULL, NULL );
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...accept() error", rc);
- return -60;
- }
- }
-
- /* create threads. */
-
- for (i=0; i<thread_count; ++i) {
- pj_thread_t *thread;
- rc = pj_thread_create(pool, NULL, &thread_proc, server->ioqueue,
- PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread);
- if (rc != PJ_SUCCESS) {
- app_perror("...unable to create thread", rc);
- return -70;
- }
- }
-
- /* Done. */
- return PJ_SUCCESS;
-}
-
-int echo_server(void)
-{
- enum { MAX_THREADS = 4 };
- int sock_types[2];
- int i, j, rc;
-
- sock_types[0] = PJ_SOCK_DGRAM;
- sock_types[1] = PJ_SOCK_STREAM;
-
- for (i=0; i<2; ++i) {
- for (j=0; j<MAX_THREADS; ++j) {
- rc = start_echo_server(sock_types[i], ECHO_SERVER_START_PORT+j, j+1);
- if (rc != 0)
- return rc;
- }
- }
-
- pj_thread_sleep(100);
- PJ_LOG(3,("", "Echo server started in port %d - %d",
- ECHO_SERVER_START_PORT, ECHO_SERVER_START_PORT + MAX_THREADS));
-
- PJ_LOG(3,("", "Press Ctrl-C to quit"));
-
- for (;!thread_quit_flag;) {
- pj_thread_sleep(1000);
- }
-
- return 0;
-}
-
-
-#else
-int dummy_echo_server;
-#endif /* INCLUDE_ECHO_SERVER */
-
diff --git a/pjlib/src/pjlib-test/list.c b/pjlib/src/pjlib-test/list.c
index 817525ed..d9f097c5 100644
--- a/pjlib/src/pjlib-test/list.c
+++ b/pjlib/src/pjlib-test/list.c
@@ -33,7 +33,7 @@
typedef struct list_node
{
- PJ_DECL_LIST_MEMBER(struct list_node)
+ PJ_DECL_LIST_MEMBER(struct list_node);
int value;
} list_node;
diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c
index e9a62e33..8ccdc568 100644
--- a/pjlib/src/pjlib-test/test.c
+++ b/pjlib/src/pjlib-test/test.c
@@ -132,7 +132,9 @@ int test_inner(void)
#if INCLUDE_ECHO_SERVER
//echo_server();
- echo_srv_sync();
+ //echo_srv_sync();
+ udp_echo_srv_ioqueue();
+
#elif INCLUDE_ECHO_CLIENT
if (param_echo_sock_type == 0)
param_echo_sock_type = PJ_SOCK_DGRAM;
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
index 5fe6d6f2..981ff30e 100644
--- a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
+++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
@@ -14,6 +14,8 @@ struct op_key
pj_size_t size;
int is_pending;
pj_status_t last_err;
+ pj_sockaddr_in addr;
+ int addrlen;
};
static void on_read_complete(pj_ioqueue_key_t *key,
@@ -28,8 +30,10 @@ static void on_read_complete(pj_ioqueue_key_t *key,
recv_rec->is_pending = 0;
if (bytes_received < 0) {
- PJ_LOG(3,("","...error receiving data, received=%d",
- bytes_received));
+ if (-bytes_received != recv_rec->last_err) {
+ recv_rec->last_err = -bytes_received;
+ app_perror("...error receiving data", -bytes_received);
+ }
} else if (bytes_received == 0) {
/* note: previous error, or write callback */
} else {
@@ -38,20 +42,24 @@ static void on_read_complete(pj_ioqueue_key_t *key,
if (!send_rec->is_pending) {
pj_ssize_t sent = bytes_received;
pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
- rc = pj_ioqueue_send(key, &send_rec->op_key_,
- send_rec->buffer, &sent, 0);
+ pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
+ send_rec->addrlen = recv_rec->addrlen;
+ rc = pj_ioqueue_sendto(key, &send_rec->op_key_,
+ send_rec->buffer, &sent, 0,
+ &send_rec->addr, send_rec->addrlen);
send_rec->is_pending = (rc==PJ_EPENDING);
if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
- app_perror("...send error", rc);
+ app_perror("...send error(1)", rc);
}
}
}
if (!send_rec->is_pending) {
bytes_received = recv_rec->size;
- rc = pj_ioqueue_recv(key, &recv_rec->op_key_,
- recv_rec->buffer, &bytes_received, 0);
+ rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_,
+ recv_rec->buffer, &bytes_received, 0,
+ &recv_rec->addr, &recv_rec->addrlen);
recv_rec->is_pending = (rc==PJ_EPENDING);
if (rc == PJ_SUCCESS) {
/* fall through next loop. */
@@ -80,8 +88,11 @@ static void on_write_complete(pj_ioqueue_key_t *key,
struct op_key *send_rec = (struct op_key*)op_key;
if (bytes_sent <= 0) {
- pj_status_t rc = pj_get_netos_error();
- app_perror("...send error", rc);
+ pj_status_t rc = -bytes_sent;
+ if (rc != send_rec->last_err) {
+ send_rec->last_err = rc;
+ app_perror("...send error(2)", rc);
+ }
}
send_rec->is_pending = 0;
@@ -101,6 +112,8 @@ static int worker_thread(void *arg)
read_op.last_err = 0;
read_op.buffer = recv_buf;
read_op.size = sizeof(recv_buf);
+ read_op.addrlen = sizeof(read_op.addr);
+
write_op.peer = &read_op;
write_op.is_pending = 0;
write_op.last_err = 0;
@@ -108,7 +121,8 @@ static int worker_thread(void *arg)
write_op.size = sizeof(send_buf);
length = sizeof(recv_buf);
- rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);
+ rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
+ &read_op.addr, &read_op.addrlen);
if (rc == PJ_SUCCESS) {
read_op.is_pending = 1;
on_read_complete(key, &read_op.op_key_, length);