From 4539be1bf3b88417e3b52fdbeb17d6a74266cb4a Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Sun, 6 Nov 2005 19:46:48 +0000 Subject: Tested UDP echo server on Win2K git-svn-id: http://svn.pjsip.org/repos/pjproject/main@17 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pjlib-test/echo_srv.c | 319 ---------------------------- pjlib/src/pjlib-test/list.c | 2 +- pjlib/src/pjlib-test/test.c | 4 +- pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c | 34 ++- 4 files changed, 28 insertions(+), 331 deletions(-) delete mode 100644 pjlib/src/pjlib-test/echo_srv.c (limited to 'pjlib/src/pjlib-test') diff --git a/pjlib/src/pjlib-test/echo_srv.c b/pjlib/src/pjlib-test/echo_srv.c deleted file mode 100644 index e10b5ffe..00000000 --- a/pjlib/src/pjlib-test/echo_srv.c +++ /dev/null @@ -1,319 +0,0 @@ -/* $Id$ - */ -#include "test.h" -#include -#include - -#if INCLUDE_ECHO_SERVER - -static pj_bool_t thread_quit_flag; - -struct server -{ - pj_pool_t *pool; - int sock_type; - int thread_count; - pj_ioqueue_t *ioqueue; - pj_sock_t sock; - pj_sock_t client_sock; - pj_ioqueue_key_t *key; - pj_ioqueue_callback cb; - char *buf; - pj_size_t bufsize; - pj_sockaddr_in addr; - int addrlen; - pj_size_t bytes_recv; - pj_timestamp start_time; -}; - -static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) -{ - struct server *server = pj_ioqueue_get_user_data(key); - pj_status_t rc; - - if (server->sock_type == PJ_SOCK_DGRAM) { - if (bytes_read > 0) { - /* Send data back to sender. */ - rc = pj_ioqueue_sendto( server->ioqueue, server->key, - server->buf, bytes_read, 0, - &server->addr, server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...sendto() error", rc); - } - } else { - PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read)); - } - - /* Start next receive. */ - rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, - server->buf, server->bufsize, 0, - &server->addr, &server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recvfrom() error", rc); - } - - } - else if (server->sock_type == PJ_SOCK_STREAM) { - if (bytes_read > 0) { - /* Send data back to sender. */ - rc = pj_ioqueue_send( server->ioqueue, server->key, - server->buf, bytes_read, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...send() error", rc); - bytes_read = 0; - } - } - - if (bytes_read <= 0) { - PJ_LOG(3,("", "...tcp closed")); - pj_ioqueue_unregister( server->ioqueue, server->key ); - pj_sock_close( server->sock ); - pj_pool_release( server->pool ); - return; - } - - /* Start next receive. */ - rc = pj_ioqueue_recv( server->ioqueue, server->key, - server->buf, server->bufsize, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recv() error", rc); - } - } - - /* Add counter. */ - if (bytes_read > 0) { - if (server->bytes_recv == 0) { - pj_get_timestamp(&server->start_time); - server->bytes_recv += bytes_read; - } else { - enum { USECS_IN_SECOND = 1000000 }; - pj_timestamp now; - pj_uint32_t usec_elapsed; - - server->bytes_recv += bytes_read; - - pj_get_timestamp(&now); - usec_elapsed = pj_elapsed_usec(&server->start_time, &now); - if (usec_elapsed > USECS_IN_SECOND) { - if (usec_elapsed < 2 * USECS_IN_SECOND) { - pj_highprec_t bw; - pj_uint32_t bw32; - const char *type_name; - - /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */ - bw = server->bytes_recv; - pj_highprec_mul(bw, USECS_IN_SECOND); - pj_highprec_div(bw, usec_elapsed); - - bw32 = (pj_uint32_t) bw; - - if (server->sock_type==PJ_SOCK_STREAM) - type_name = "tcp"; - else if (server->sock_type==PJ_SOCK_DGRAM) - type_name = "udp"; - else - type_name = "???"; - - PJ_LOG(3,("", - "...[%s:%d (%d threads)] Current bandwidth=%u KBps", - type_name, - ECHO_SERVER_START_PORT+server->thread_count, - server->thread_count, - bw32/1024)); - } - server->start_time = now; - server->bytes_recv = 0; - } - } - } -} - -static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock, - int status) -{ - struct server *server_server = pj_ioqueue_get_user_data(key); - pj_status_t rc; - - PJ_UNUSED_ARG(sock); - - if (status == 0) { - pj_pool_t *pool; - struct server *new_server; - - pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); - new_server = pj_pool_zalloc(pool, sizeof(struct server)); - - new_server->pool = pool; - new_server->ioqueue = server_server->ioqueue; - new_server->sock_type = server_server->sock_type; - new_server->thread_count = server_server->thread_count; - new_server->sock = server_server->client_sock; - new_server->bufsize = 4096; - new_server->buf = pj_pool_alloc(pool, new_server->bufsize); - new_server->cb = server_server->cb; - - rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue, - new_server->sock, new_server, - &server_server->cb, &new_server->key); - if (rc != PJ_SUCCESS) { - app_perror("...registering new tcp sock", rc); - pj_sock_close(new_server->sock); - pj_pool_release(pool); - thread_quit_flag = 1; - return; - } - - rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key, - new_server->buf, new_server->bufsize, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...recv() error", rc); - pj_sock_close(new_server->sock); - pj_pool_release(pool); - thread_quit_flag = 1; - return; - } - } - - rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key, - &server_server->client_sock, - NULL, NULL, NULL); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...accept() error", rc); - thread_quit_flag = 1; - } -} - -static int thread_proc(void *arg) -{ - pj_ioqueue_t *ioqueue = arg; - - while (!thread_quit_flag) { - pj_time_val timeout; - int count; - - timeout.sec = 0; timeout.msec = 50; - count = pj_ioqueue_poll( ioqueue, &timeout ); - if (count > 0) { - count = 0; - } - } - - return 0; -} - -static int start_echo_server( int sock_type, int port, int thread_count ) -{ - pj_pool_t *pool; - struct server *server; - int i; - pj_status_t rc; - - - pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); - if (!pool) - return -10; - - server = pj_pool_zalloc(pool, sizeof(struct server)); - - server->sock_type = sock_type; - server->thread_count = thread_count; - server->cb.on_read_complete = &on_read_complete; - server->cb.on_accept_complete = &on_accept_complete; - - /* create ioqueue */ - rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue); - if (rc != PJ_SUCCESS) { - app_perror("...error creating ioqueue", rc); - return -20; - } - - /* create and register socket to ioqueue. */ - rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock); - if (rc != PJ_SUCCESS) { - app_perror("...error initializing socket", rc); - return -30; - } - - rc = pj_ioqueue_register_sock( pool, server->ioqueue, - server->sock, - server, &server->cb, - &server->key); - if (rc != PJ_SUCCESS) { - app_perror("...error registering socket to ioqueue", rc); - return -40; - } - - /* create receive buffer. */ - server->bufsize = 4096; - server->buf = pj_pool_alloc(pool, server->bufsize); - - if (sock_type == PJ_SOCK_DGRAM) { - server->addrlen = sizeof(server->addr); - rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, - server->buf, server->bufsize, - 0, - &server->addr, &server->addrlen); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...read error", rc); - return -50; - } - } else { - rc = pj_ioqueue_accept( server->ioqueue, server->key, - &server->client_sock, NULL, NULL, NULL ); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...accept() error", rc); - return -60; - } - } - - /* create threads. */ - - for (i=0; iioqueue, - PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread); - if (rc != PJ_SUCCESS) { - app_perror("...unable to create thread", rc); - return -70; - } - } - - /* Done. */ - return PJ_SUCCESS; -} - -int echo_server(void) -{ - enum { MAX_THREADS = 4 }; - int sock_types[2]; - int i, j, rc; - - sock_types[0] = PJ_SOCK_DGRAM; - sock_types[1] = PJ_SOCK_STREAM; - - for (i=0; i<2; ++i) { - for (j=0; jis_pending = 0; if (bytes_received < 0) { - PJ_LOG(3,("","...error receiving data, received=%d", - bytes_received)); + if (-bytes_received != recv_rec->last_err) { + recv_rec->last_err = -bytes_received; + app_perror("...error receiving data", -bytes_received); + } } else if (bytes_received == 0) { /* note: previous error, or write callback */ } else { @@ -38,20 +42,24 @@ static void on_read_complete(pj_ioqueue_key_t *key, if (!send_rec->is_pending) { pj_ssize_t sent = bytes_received; pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received); - rc = pj_ioqueue_send(key, &send_rec->op_key_, - send_rec->buffer, &sent, 0); + pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen); + send_rec->addrlen = recv_rec->addrlen; + rc = pj_ioqueue_sendto(key, &send_rec->op_key_, + send_rec->buffer, &sent, 0, + &send_rec->addr, send_rec->addrlen); send_rec->is_pending = (rc==PJ_EPENDING); if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) { - app_perror("...send error", rc); + app_perror("...send error(1)", rc); } } } if (!send_rec->is_pending) { bytes_received = recv_rec->size; - rc = pj_ioqueue_recv(key, &recv_rec->op_key_, - recv_rec->buffer, &bytes_received, 0); + rc = pj_ioqueue_recvfrom(key, &recv_rec->op_key_, + recv_rec->buffer, &bytes_received, 0, + &recv_rec->addr, &recv_rec->addrlen); recv_rec->is_pending = (rc==PJ_EPENDING); if (rc == PJ_SUCCESS) { /* fall through next loop. */ @@ -80,8 +88,11 @@ static void on_write_complete(pj_ioqueue_key_t *key, struct op_key *send_rec = (struct op_key*)op_key; if (bytes_sent <= 0) { - pj_status_t rc = pj_get_netos_error(); - app_perror("...send error", rc); + pj_status_t rc = -bytes_sent; + if (rc != send_rec->last_err) { + send_rec->last_err = rc; + app_perror("...send error(2)", rc); + } } send_rec->is_pending = 0; @@ -101,6 +112,8 @@ static int worker_thread(void *arg) read_op.last_err = 0; read_op.buffer = recv_buf; read_op.size = sizeof(recv_buf); + read_op.addrlen = sizeof(read_op.addr); + write_op.peer = &read_op; write_op.is_pending = 0; write_op.last_err = 0; @@ -108,7 +121,8 @@ static int worker_thread(void *arg) write_op.size = sizeof(send_buf); length = sizeof(recv_buf); - rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0); + rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0, + &read_op.addr, &read_op.addrlen); if (rc == PJ_SUCCESS) { read_op.is_pending = 1; on_read_complete(key, &read_op.op_key_, length); -- cgit v1.2.3