From 5f1de1bbb341ea1dc1d27d9bf35764b643ef904a Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Mon, 21 Nov 2005 01:55:47 +0000 Subject: Set svn:eol-style property git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@65 74dad513-b988-da41-8d7b-12977e46ad98 --- pjlib/src/pjlib-test/ioq_perf.c | 1004 +++++++++++++++++++-------------------- 1 file changed, 502 insertions(+), 502 deletions(-) (limited to 'pjlib/src/pjlib-test/ioq_perf.c') diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index b10decc3..3a8de8e1 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -1,502 +1,502 @@ -/* $Id$ */ -/* - * Copyright (C)2003-2006 Benny Prijono - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include "test.h" -#include -#include - -/** - * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance - * - * Test the performance of the I/O queue, using typical producer - * consumer test. The test should examine the effect of using multiple - * threads on the performance. - * - * This file is pjlib-test/ioq_perf.c - * - * \include pjlib-test/ioq_perf.c - */ - -#if INCLUDE_IOQUEUE_PERF_TEST - -#ifdef _MSC_VER -# pragma warning ( disable: 4204) // non-constant aggregate initializer -#endif - -#define THIS_FILE "ioq_perf" -//#define TRACE_(expr) PJ_LOG(3,expr) -#define TRACE_(expr) - - -static pj_bool_t thread_quit_flag; -static pj_status_t last_error; -static unsigned last_error_counter; - -/* Descriptor for each producer/consumer pair. */ -typedef struct test_item -{ - pj_sock_t server_fd, - client_fd; - pj_ioqueue_t *ioqueue; - pj_ioqueue_key_t *server_key, - *client_key; - pj_ioqueue_op_key_t recv_op, - send_op; - int has_pending_send; - pj_size_t buffer_size; - char *outgoing_buffer; - char *incoming_buffer; - pj_size_t bytes_sent, - bytes_recv; -} test_item; - -/* Callback when data has been read. - * Increment item->bytes_recv and ready to read the next data. - */ -static void on_read_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_read) -{ - test_item *item = pj_ioqueue_get_user_data(key); - pj_status_t rc; - int data_is_available = 1; - - //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); - - do { - if (thread_quit_flag) - return; - - if (bytes_read < 0) { - pj_status_t rc = -bytes_read; - char errmsg[128]; - - if (rc != last_error) { - //last_error = rc; - pj_strerror(rc, errmsg, sizeof(errmsg)); - PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", - bytes_read, errmsg)); - PJ_LOG(3,(THIS_FILE, - ".....additional info: total read=%u, total sent=%u", - item->bytes_recv, item->bytes_sent)); - } else { - last_error_counter++; - } - bytes_read = 0; - - } else if (bytes_read == 0) { - PJ_LOG(3,(THIS_FILE, "...socket has closed!")); - } - - item->bytes_recv += bytes_read; - - /* To assure that the test quits, even if main thread - * doesn't have time to run. - */ - if (item->bytes_recv > item->buffer_size * 10000) - thread_quit_flag = 1; - - bytes_read = item->buffer_size; - rc = pj_ioqueue_recv( key, op_key, - item->incoming_buffer, &bytes_read, 0 ); - - if (rc == PJ_SUCCESS) { - data_is_available = 1; - } else if (rc == PJ_EPENDING) { - data_is_available = 0; - } else { - data_is_available = 0; - if (rc != last_error) { - last_error = rc; - app_perror("...error: read error(1)", rc); - } else { - last_error_counter++; - } - } - - if (!item->has_pending_send) { - pj_ssize_t sent = item->buffer_size; - rc = pj_ioqueue_send(item->client_key, &item->send_op, - item->outgoing_buffer, &sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...error: write error", rc); - } - - item->has_pending_send = (rc==PJ_EPENDING); - } - - } while (data_is_available); -} - -/* Callback when data has been written. - * Increment item->bytes_sent and write the next data. - */ -static void on_write_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_sent) -{ - test_item *item = pj_ioqueue_get_user_data(key); - - //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent)); - - if (thread_quit_flag) - return; - - item->has_pending_send = 0; - item->bytes_sent += bytes_sent; - - if (bytes_sent <= 0) { - PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", - bytes_sent)); - } - else { - pj_status_t rc; - - bytes_sent = item->buffer_size; - rc = pj_ioqueue_send( item->client_key, op_key, - item->outgoing_buffer, &bytes_sent, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - app_perror("...error: write error", rc); - } - - item->has_pending_send = (rc==PJ_EPENDING); - } -} - -/* The worker thread. */ -static int worker_thread(void *arg) -{ - pj_ioqueue_t *ioqueue = arg; - const pj_time_val timeout = {0, 100}; - int rc; - - while (!thread_quit_flag) { - rc = pj_ioqueue_poll(ioqueue, &timeout); - //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc)); - if (rc < 0) { - app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error()); - return -1; - } - } - return 0; -} - -/* Calculate the bandwidth for the specific test configuration. - * The test is simple: - * - create sockpair_cnt number of producer-consumer socket pair. - * - create thread_cnt number of worker threads. - * - each producer will send buffer_size bytes data as fast and - * as soon as it can. - * - each consumer will read buffer_size bytes of data as fast - * as it could. - * - measure the total bytes received by all consumers during a - * period of time. - */ -static int perform_test(int sock_type, const char *type_name, - unsigned thread_cnt, unsigned sockpair_cnt, - pj_size_t buffer_size, - pj_size_t *p_bandwidth) -{ - enum { MSEC_DURATION = 5000 }; - pj_pool_t *pool; - test_item *items; - pj_thread_t **thread; - pj_ioqueue_t *ioqueue; - pj_status_t rc; - pj_ioqueue_callback ioqueue_callback; - pj_uint32_t total_elapsed_usec, total_received; - pj_highprec_t bandwidth; - pj_timestamp start, stop; - unsigned i; - - TRACE_((THIS_FILE, " starting test..")); - - ioqueue_callback.on_read_complete = &on_read_complete; - ioqueue_callback.on_write_complete = &on_write_complete; - - thread_quit_flag = 0; - - pool = pj_pool_create(mem, NULL, 4096, 4096, NULL); - if (!pool) - return -10; - - items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item)); - thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); - - TRACE_((THIS_FILE, " creating ioqueue..")); - rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); - if (rc != PJ_SUCCESS) { - app_perror("...error: unable to create ioqueue", rc); - return -15; - } - - /* Initialize each producer-consumer pair. */ - for (i=0; i best_bandwidth) - best_bandwidth = bandwidth, best_index = i; - - /* Give it a rest before next test. */ - pj_thread_sleep(500); - } - - PJ_LOG(3,(THIS_FILE, - " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s", - test_param[best_index].type_name, - test_param[best_index].thread_cnt, - test_param[best_index].sockpair_cnt, - best_bandwidth)); - PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)", - BUF_SIZE, last_error_counter)); - return 0; -} - -#else -/* To prevent warning about "translation unit is empty" - * when this test is disabled. - */ -int dummy_uiq_perf_test; -#endif /* INCLUDE_IOQUEUE_PERF_TEST */ - - +/* $Id$ */ +/* + * Copyright (C)2003-2006 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" +#include +#include + +/** + * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance + * + * Test the performance of the I/O queue, using typical producer + * consumer test. The test should examine the effect of using multiple + * threads on the performance. + * + * This file is pjlib-test/ioq_perf.c + * + * \include pjlib-test/ioq_perf.c + */ + +#if INCLUDE_IOQUEUE_PERF_TEST + +#ifdef _MSC_VER +# pragma warning ( disable: 4204) // non-constant aggregate initializer +#endif + +#define THIS_FILE "ioq_perf" +//#define TRACE_(expr) PJ_LOG(3,expr) +#define TRACE_(expr) + + +static pj_bool_t thread_quit_flag; +static pj_status_t last_error; +static unsigned last_error_counter; + +/* Descriptor for each producer/consumer pair. */ +typedef struct test_item +{ + pj_sock_t server_fd, + client_fd; + pj_ioqueue_t *ioqueue; + pj_ioqueue_key_t *server_key, + *client_key; + pj_ioqueue_op_key_t recv_op, + send_op; + int has_pending_send; + pj_size_t buffer_size; + char *outgoing_buffer; + char *incoming_buffer; + pj_size_t bytes_sent, + bytes_recv; +} test_item; + +/* Callback when data has been read. + * Increment item->bytes_recv and ready to read the next data. + */ +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + test_item *item = pj_ioqueue_get_user_data(key); + pj_status_t rc; + int data_is_available = 1; + + //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); + + do { + if (thread_quit_flag) + return; + + if (bytes_read < 0) { + pj_status_t rc = -bytes_read; + char errmsg[128]; + + if (rc != last_error) { + //last_error = rc; + pj_strerror(rc, errmsg, sizeof(errmsg)); + PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", + bytes_read, errmsg)); + PJ_LOG(3,(THIS_FILE, + ".....additional info: total read=%u, total sent=%u", + item->bytes_recv, item->bytes_sent)); + } else { + last_error_counter++; + } + bytes_read = 0; + + } else if (bytes_read == 0) { + PJ_LOG(3,(THIS_FILE, "...socket has closed!")); + } + + item->bytes_recv += bytes_read; + + /* To assure that the test quits, even if main thread + * doesn't have time to run. + */ + if (item->bytes_recv > item->buffer_size * 10000) + thread_quit_flag = 1; + + bytes_read = item->buffer_size; + rc = pj_ioqueue_recv( key, op_key, + item->incoming_buffer, &bytes_read, 0 ); + + if (rc == PJ_SUCCESS) { + data_is_available = 1; + } else if (rc == PJ_EPENDING) { + data_is_available = 0; + } else { + data_is_available = 0; + if (rc != last_error) { + last_error = rc; + app_perror("...error: read error(1)", rc); + } else { + last_error_counter++; + } + } + + if (!item->has_pending_send) { + pj_ssize_t sent = item->buffer_size; + rc = pj_ioqueue_send(item->client_key, &item->send_op, + item->outgoing_buffer, &sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } + + } while (data_is_available); +} + +/* Callback when data has been written. + * Increment item->bytes_sent and write the next data. + */ +static void on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent) +{ + test_item *item = pj_ioqueue_get_user_data(key); + + //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent)); + + if (thread_quit_flag) + return; + + item->has_pending_send = 0; + item->bytes_sent += bytes_sent; + + if (bytes_sent <= 0) { + PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", + bytes_sent)); + } + else { + pj_status_t rc; + + bytes_sent = item->buffer_size; + rc = pj_ioqueue_send( item->client_key, op_key, + item->outgoing_buffer, &bytes_sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } +} + +/* The worker thread. */ +static int worker_thread(void *arg) +{ + pj_ioqueue_t *ioqueue = arg; + const pj_time_val timeout = {0, 100}; + int rc; + + while (!thread_quit_flag) { + rc = pj_ioqueue_poll(ioqueue, &timeout); + //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc)); + if (rc < 0) { + app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error()); + return -1; + } + } + return 0; +} + +/* Calculate the bandwidth for the specific test configuration. + * The test is simple: + * - create sockpair_cnt number of producer-consumer socket pair. + * - create thread_cnt number of worker threads. + * - each producer will send buffer_size bytes data as fast and + * as soon as it can. + * - each consumer will read buffer_size bytes of data as fast + * as it could. + * - measure the total bytes received by all consumers during a + * period of time. + */ +static int perform_test(int sock_type, const char *type_name, + unsigned thread_cnt, unsigned sockpair_cnt, + pj_size_t buffer_size, + pj_size_t *p_bandwidth) +{ + enum { MSEC_DURATION = 5000 }; + pj_pool_t *pool; + test_item *items; + pj_thread_t **thread; + pj_ioqueue_t *ioqueue; + pj_status_t rc; + pj_ioqueue_callback ioqueue_callback; + pj_uint32_t total_elapsed_usec, total_received; + pj_highprec_t bandwidth; + pj_timestamp start, stop; + unsigned i; + + TRACE_((THIS_FILE, " starting test..")); + + ioqueue_callback.on_read_complete = &on_read_complete; + ioqueue_callback.on_write_complete = &on_write_complete; + + thread_quit_flag = 0; + + pool = pj_pool_create(mem, NULL, 4096, 4096, NULL); + if (!pool) + return -10; + + items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item)); + thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); + + TRACE_((THIS_FILE, " creating ioqueue..")); + rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); + if (rc != PJ_SUCCESS) { + app_perror("...error: unable to create ioqueue", rc); + return -15; + } + + /* Initialize each producer-consumer pair. */ + for (i=0; i best_bandwidth) + best_bandwidth = bandwidth, best_index = i; + + /* Give it a rest before next test. */ + pj_thread_sleep(500); + } + + PJ_LOG(3,(THIS_FILE, + " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s", + test_param[best_index].type_name, + test_param[best_index].thread_cnt, + test_param[best_index].sockpair_cnt, + best_bandwidth)); + PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)", + BUF_SIZE, last_error_counter)); + return 0; +} + +#else +/* To prevent warning about "translation unit is empty" + * when this test is disabled. + */ +int dummy_uiq_perf_test; +#endif /* INCLUDE_IOQUEUE_PERF_TEST */ + + -- cgit v1.2.3