/* $Header: /pjproject-0.3/pjlib/src/pjlib-test/echo_srv.c 3 10/29/05 10:23p Bennylp $ */ /* * $Log: /pjproject-0.3/pjlib/src/pjlib-test/echo_srv.c $ * * 3 10/29/05 10:23p Bennylp * Changed ioqueue accept specification. * * 2 10/29/05 11:51a Bennylp * Version 0.3-pre2. * * 1 10/24/05 11:28a Bennylp * Created. * */ #include "test.h" #include #include #if INCLUDE_ECHO_SERVER static pj_bool_t thread_quit_flag; struct server { pj_pool_t *pool; int sock_type; int thread_count; pj_ioqueue_t *ioqueue; pj_sock_t sock; pj_sock_t client_sock; pj_ioqueue_key_t *key; pj_ioqueue_callback cb; char *buf; pj_size_t bufsize; pj_sockaddr_in addr; int addrlen; pj_size_t bytes_recv; pj_timestamp start_time; }; static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) { struct server *server = pj_ioqueue_get_user_data(key); pj_status_t rc; if (server->sock_type == PJ_SOCK_DGRAM) { if (bytes_read > 0) { /* Send data back to sender. */ rc = pj_ioqueue_sendto( server->ioqueue, server->key, server->buf, bytes_read, 0, &server->addr, server->addrlen); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...sendto() error", rc); } } else { PJ_LOG(3,("", "...read error (bytes_read=%d)", bytes_read)); } /* Start next receive. */ rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, server->buf, server->bufsize, 0, &server->addr, &server->addrlen); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...recvfrom() error", rc); } } else if (server->sock_type == PJ_SOCK_STREAM) { if (bytes_read > 0) { /* Send data back to sender. */ rc = pj_ioqueue_send( server->ioqueue, server->key, server->buf, bytes_read, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...send() error", rc); bytes_read = 0; } } if (bytes_read <= 0) { PJ_LOG(3,("", "...tcp closed")); pj_ioqueue_unregister( server->ioqueue, server->key ); pj_sock_close( server->sock ); pj_pool_release( server->pool ); return; } /* Start next receive. */ rc = pj_ioqueue_recv( server->ioqueue, server->key, server->buf, server->bufsize, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...recv() error", rc); } } /* Add counter. */ if (bytes_read > 0) { if (server->bytes_recv == 0) { pj_get_timestamp(&server->start_time); server->bytes_recv += bytes_read; } else { enum { USECS_IN_SECOND = 1000000 }; pj_timestamp now; pj_uint32_t usec_elapsed; server->bytes_recv += bytes_read; pj_get_timestamp(&now); usec_elapsed = pj_elapsed_usec(&server->start_time, &now); if (usec_elapsed > USECS_IN_SECOND) { if (usec_elapsed < 2 * USECS_IN_SECOND) { pj_highprec_t bw; pj_uint32_t bw32; const char *type_name; /* bandwidth(bw) = server->bytes_recv * USECS/elapsed */ bw = server->bytes_recv; pj_highprec_mul(bw, USECS_IN_SECOND); pj_highprec_div(bw, usec_elapsed); bw32 = (pj_uint32_t) bw; if (server->sock_type==PJ_SOCK_STREAM) type_name = "tcp"; else if (server->sock_type==PJ_SOCK_DGRAM) type_name = "udp"; else type_name = "???"; PJ_LOG(3,("", "...[%s:%d (%d threads)] Current bandwidth=%u KBps", type_name, ECHO_SERVER_START_PORT+server->thread_count, server->thread_count, bw32/1024)); } server->start_time = now; server->bytes_recv = 0; } } } } static void on_accept_complete( pj_ioqueue_key_t *key, pj_sock_t sock, int status) { struct server *server_server = pj_ioqueue_get_user_data(key); pj_status_t rc; PJ_UNUSED_ARG(sock); if (status == 0) { pj_pool_t *pool; struct server *new_server; pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); new_server = pj_pool_zalloc(pool, sizeof(struct server)); new_server->pool = pool; new_server->ioqueue = server_server->ioqueue; new_server->sock_type = server_server->sock_type; new_server->thread_count = server_server->thread_count; new_server->sock = server_server->client_sock; new_server->bufsize = 4096; new_server->buf = pj_pool_alloc(pool, new_server->bufsize); new_server->cb = server_server->cb; rc = pj_ioqueue_register_sock( new_server->pool, new_server->ioqueue, new_server->sock, new_server, &server_server->cb, &new_server->key); if (rc != PJ_SUCCESS) { app_perror("...registering new tcp sock", rc); pj_sock_close(new_server->sock); pj_pool_release(pool); thread_quit_flag = 1; return; } rc = pj_ioqueue_recv( new_server->ioqueue, new_server->key, new_server->buf, new_server->bufsize, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...recv() error", rc); pj_sock_close(new_server->sock); pj_pool_release(pool); thread_quit_flag = 1; return; } } rc = pj_ioqueue_accept( server_server->ioqueue, server_server->key, &server_server->client_sock, NULL, NULL, NULL); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...accept() error", rc); thread_quit_flag = 1; } } static int thread_proc(void *arg) { pj_ioqueue_t *ioqueue = arg; while (!thread_quit_flag) { pj_time_val timeout; int count; timeout.sec = 0; timeout.msec = 50; count = pj_ioqueue_poll( ioqueue, &timeout ); if (count > 0) { count = 0; } } return 0; } static int start_echo_server( int sock_type, int port, int thread_count ) { pj_pool_t *pool; struct server *server; int i; pj_status_t rc; pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); if (!pool) return -10; server = pj_pool_zalloc(pool, sizeof(struct server)); server->sock_type = sock_type; server->thread_count = thread_count; server->cb.on_read_complete = &on_read_complete; server->cb.on_accept_complete = &on_accept_complete; /* create ioqueue */ rc = pj_ioqueue_create( pool, 32, thread_count, &server->ioqueue); if (rc != PJ_SUCCESS) { app_perror("...error creating ioqueue", rc); return -20; } /* create and register socket to ioqueue. */ rc = app_socket(PJ_AF_INET, sock_type, 0, port, &server->sock); if (rc != PJ_SUCCESS) { app_perror("...error initializing socket", rc); return -30; } rc = pj_ioqueue_register_sock( pool, server->ioqueue, server->sock, server, &server->cb, &server->key); if (rc != PJ_SUCCESS) { app_perror("...error registering socket to ioqueue", rc); return -40; } /* create receive buffer. */ server->bufsize = 4096; server->buf = pj_pool_alloc(pool, server->bufsize); if (sock_type == PJ_SOCK_DGRAM) { server->addrlen = sizeof(server->addr); rc = pj_ioqueue_recvfrom( server->ioqueue, server->key, server->buf, server->bufsize, 0, &server->addr, &server->addrlen); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...read error", rc); return -50; } } else { rc = pj_ioqueue_accept( server->ioqueue, server->key, &server->client_sock, NULL, NULL, NULL ); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...accept() error", rc); return -60; } } /* create threads. */ for (i=0; iioqueue, PJ_THREAD_DEFAULT_STACK_SIZE, 0, &thread); if (rc != PJ_SUCCESS) { app_perror("...unable to create thread", rc); return -70; } } /* Done. */ return PJ_SUCCESS; } int echo_server(void) { enum { MAX_THREADS = 4 }; int sock_types[2]; int i, j, rc; sock_types[0] = PJ_SOCK_DGRAM; sock_types[1] = PJ_SOCK_STREAM; for (i=0; i<2; ++i) { for (j=0; j