diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-21 01:55:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-21 01:55:47 +0000 |
commit | 5f1de1bbb341ea1dc1d27d9bf35764b643ef904a (patch) | |
tree | d18b0365b69b8b488a0b2b2bd715e9f14f77b505 /pjlib/src/pjlib-test/ioq_perf.c | |
parent | 9f4da35e676737f830a90a18de08440cf0f6cdf9 (diff) |
Set svn:eol-style property
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@65 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test/ioq_perf.c')
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 1004 |
1 files changed, 502 insertions, 502 deletions
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index b10decc3..3a8de8e1 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -1,502 +1,502 @@ -/* $Id$ */
-/*
- * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-#include "test.h"
-#include <pjlib.h>
-#include <pj/compat/high_precision.h>
-
-/**
- * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
- *
- * Test the performance of the I/O queue, using typical producer
- * consumer test. The test should examine the effect of using multiple
- * threads on the performance.
- *
- * This file is <b>pjlib-test/ioq_perf.c</b>
- *
- * \include pjlib-test/ioq_perf.c
- */
-
-#if INCLUDE_IOQUEUE_PERF_TEST
-
-#ifdef _MSC_VER
-# pragma warning ( disable: 4204) // non-constant aggregate initializer
-#endif
-
-#define THIS_FILE "ioq_perf"
-//#define TRACE_(expr) PJ_LOG(3,expr)
-#define TRACE_(expr)
-
-
-static pj_bool_t thread_quit_flag;
-static pj_status_t last_error;
-static unsigned last_error_counter;
-
-/* Descriptor for each producer/consumer pair. */
-typedef struct test_item
-{
- pj_sock_t server_fd,
- client_fd;
- pj_ioqueue_t *ioqueue;
- pj_ioqueue_key_t *server_key,
- *client_key;
- pj_ioqueue_op_key_t recv_op,
- send_op;
- int has_pending_send;
- pj_size_t buffer_size;
- char *outgoing_buffer;
- char *incoming_buffer;
- pj_size_t bytes_sent,
- bytes_recv;
-} test_item;
-
-/* Callback when data has been read.
- * Increment item->bytes_recv and ready to read the next data.
- */
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read)
-{
- test_item *item = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
- int data_is_available = 1;
-
- //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
-
- do {
- if (thread_quit_flag)
- return;
-
- if (bytes_read < 0) {
- pj_status_t rc = -bytes_read;
- char errmsg[128];
-
- if (rc != last_error) {
- //last_error = rc;
- pj_strerror(rc, errmsg, sizeof(errmsg));
- PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
- bytes_read, errmsg));
- PJ_LOG(3,(THIS_FILE,
- ".....additional info: total read=%u, total sent=%u",
- item->bytes_recv, item->bytes_sent));
- } else {
- last_error_counter++;
- }
- bytes_read = 0;
-
- } else if (bytes_read == 0) {
- PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
- }
-
- item->bytes_recv += bytes_read;
-
- /* To assure that the test quits, even if main thread
- * doesn't have time to run.
- */
- if (item->bytes_recv > item->buffer_size * 10000)
- thread_quit_flag = 1;
-
- bytes_read = item->buffer_size;
- rc = pj_ioqueue_recv( key, op_key,
- item->incoming_buffer, &bytes_read, 0 );
-
- if (rc == PJ_SUCCESS) {
- data_is_available = 1;
- } else if (rc == PJ_EPENDING) {
- data_is_available = 0;
- } else {
- data_is_available = 0;
- if (rc != last_error) {
- last_error = rc;
- app_perror("...error: read error(1)", rc);
- } else {
- last_error_counter++;
- }
- }
-
- if (!item->has_pending_send) {
- pj_ssize_t sent = item->buffer_size;
- rc = pj_ioqueue_send(item->client_key, &item->send_op,
- item->outgoing_buffer, &sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: write error", rc);
- }
-
- item->has_pending_send = (rc==PJ_EPENDING);
- }
-
- } while (data_is_available);
-}
-
-/* Callback when data has been written.
- * Increment item->bytes_sent and write the next data.
- */
-static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_sent)
-{
- test_item *item = pj_ioqueue_get_user_data(key);
-
- //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent));
-
- if (thread_quit_flag)
- return;
-
- item->has_pending_send = 0;
- item->bytes_sent += bytes_sent;
-
- if (bytes_sent <= 0) {
- PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d",
- bytes_sent));
- }
- else {
- pj_status_t rc;
-
- bytes_sent = item->buffer_size;
- rc = pj_ioqueue_send( item->client_key, op_key,
- item->outgoing_buffer, &bytes_sent, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: write error", rc);
- }
-
- item->has_pending_send = (rc==PJ_EPENDING);
- }
-}
-
-/* The worker thread. */
-static int worker_thread(void *arg)
-{
- pj_ioqueue_t *ioqueue = arg;
- const pj_time_val timeout = {0, 100};
- int rc;
-
- while (!thread_quit_flag) {
- rc = pj_ioqueue_poll(ioqueue, &timeout);
- //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
- if (rc < 0) {
- app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error());
- return -1;
- }
- }
- return 0;
-}
-
-/* Calculate the bandwidth for the specific test configuration.
- * The test is simple:
- * - create sockpair_cnt number of producer-consumer socket pair.
- * - create thread_cnt number of worker threads.
- * - each producer will send buffer_size bytes data as fast and
- * as soon as it can.
- * - each consumer will read buffer_size bytes of data as fast
- * as it could.
- * - measure the total bytes received by all consumers during a
- * period of time.
- */
-static int perform_test(int sock_type, const char *type_name,
- unsigned thread_cnt, unsigned sockpair_cnt,
- pj_size_t buffer_size,
- pj_size_t *p_bandwidth)
-{
- enum { MSEC_DURATION = 5000 };
- pj_pool_t *pool;
- test_item *items;
- pj_thread_t **thread;
- pj_ioqueue_t *ioqueue;
- pj_status_t rc;
- pj_ioqueue_callback ioqueue_callback;
- pj_uint32_t total_elapsed_usec, total_received;
- pj_highprec_t bandwidth;
- pj_timestamp start, stop;
- unsigned i;
-
- TRACE_((THIS_FILE, " starting test.."));
-
- ioqueue_callback.on_read_complete = &on_read_complete;
- ioqueue_callback.on_write_complete = &on_write_complete;
-
- thread_quit_flag = 0;
-
- pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
- if (!pool)
- return -10;
-
- items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
- thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
-
- TRACE_((THIS_FILE, " creating ioqueue.."));
- rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
- if (rc != PJ_SUCCESS) {
- app_perror("...error: unable to create ioqueue", rc);
- return -15;
- }
-
- /* Initialize each producer-consumer pair. */
- for (i=0; i<sockpair_cnt; ++i) {
- pj_ssize_t bytes;
-
- items[i].ioqueue = ioqueue;
- items[i].buffer_size = buffer_size;
- items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size);
- items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size);
- items[i].bytes_recv = items[i].bytes_sent = 0;
-
- /* randomize outgoing buffer. */
- pj_create_random_string(items[i].outgoing_buffer, buffer_size);
-
- /* Create socket pair. */
- TRACE_((THIS_FILE, " calling socketpair.."));
- rc = app_socketpair(PJ_AF_INET, sock_type, 0,
- &items[i].server_fd, &items[i].client_fd);
- if (rc != PJ_SUCCESS) {
- app_perror("...error: unable to create socket pair", rc);
- return -20;
- }
-
- /* Register server socket to ioqueue. */
- TRACE_((THIS_FILE, " register(1).."));
- rc = pj_ioqueue_register_sock(pool, ioqueue,
- items[i].server_fd,
- &items[i], &ioqueue_callback,
- &items[i].server_key);
- if (rc != PJ_SUCCESS) {
- app_perror("...error: registering server socket to ioqueue", rc);
- return -60;
- }
-
- /* Register client socket to ioqueue. */
- TRACE_((THIS_FILE, " register(2).."));
- rc = pj_ioqueue_register_sock(pool, ioqueue,
- items[i].client_fd,
- &items[i], &ioqueue_callback,
- &items[i].client_key);
- if (rc != PJ_SUCCESS) {
- app_perror("...error: registering server socket to ioqueue", rc);
- return -70;
- }
-
- /* Start reading. */
- TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
- bytes = items[i].buffer_size;
- rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
- items[i].incoming_buffer, &bytes,
- 0);
- if (rc != PJ_EPENDING) {
- app_perror("...error: pj_ioqueue_recv", rc);
- return -73;
- }
-
- /* Start writing. */
- TRACE_((THIS_FILE, " pj_ioqueue_write.."));
- bytes = items[i].buffer_size;
- rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
- items[i].outgoing_buffer, &bytes, 0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- app_perror("...error: pj_ioqueue_write", rc);
- return -76;
- }
-
- items[i].has_pending_send = (rc==PJ_EPENDING);
- }
-
- /* Create the threads. */
- for (i=0; i<thread_cnt; ++i) {
- rc = pj_thread_create( pool, NULL,
- &worker_thread,
- ioqueue,
- PJ_THREAD_DEFAULT_STACK_SIZE,
- PJ_THREAD_SUSPENDED, &thread[i] );
- if (rc != PJ_SUCCESS) {
- app_perror("...error: unable to create thread", rc);
- return -80;
- }
- }
-
- /* Mark start time. */
- rc = pj_get_timestamp(&start);
- if (rc != PJ_SUCCESS)
- return -90;
-
- /* Start the thread. */
- TRACE_((THIS_FILE, " resuming all threads.."));
- for (i=0; i<thread_cnt; ++i) {
- rc = pj_thread_resume(thread[i]);
- if (rc != 0)
- return -100;
- }
-
- /* Wait for MSEC_DURATION seconds.
- * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
- * but unfortunately it doesn't work when system doesn't employ
- * timeslicing for threads.
- */
- TRACE_((THIS_FILE, " wait for few seconds.."));
- do {
- pj_thread_sleep(1);
-
- /* Mark end time. */
- rc = pj_get_timestamp(&stop);
-
- if (thread_quit_flag) {
- TRACE_((THIS_FILE, " transfer limit reached.."));
- break;
- }
-
- if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
- TRACE_((THIS_FILE, " time limit reached.."));
- break;
- }
-
- } while (1);
-
- /* Terminate all threads. */
- TRACE_((THIS_FILE, " terminating all threads.."));
- thread_quit_flag = 1;
-
- for (i=0; i<thread_cnt; ++i) {
- TRACE_((THIS_FILE, " join thread %d..", i));
- pj_thread_join(thread[i]);
- pj_thread_destroy(thread[i]);
- }
-
- /* Close all sockets. */
- TRACE_((THIS_FILE, " closing all sockets.."));
- for (i=0; i<sockpair_cnt; ++i) {
- pj_ioqueue_unregister(items[i].server_key);
- pj_ioqueue_unregister(items[i].client_key);
- pj_sock_close(items[i].server_fd);
- pj_sock_close(items[i].client_fd);
- }
-
- /* Destroy ioqueue. */
- TRACE_((THIS_FILE, " destroying ioqueue.."));
- pj_ioqueue_destroy(ioqueue);
-
- /* Calculate actual time in usec. */
- total_elapsed_usec = pj_elapsed_usec(&start, &stop);
-
- /* Calculate total bytes received. */
- total_received = 0;
- for (i=0; i<sockpair_cnt; ++i) {
- total_received = items[i].bytes_recv;
- }
-
- /* bandwidth = total_received*1000/total_elapsed_usec */
- bandwidth = total_received;
- pj_highprec_mul(bandwidth, 1000);
- pj_highprec_div(bandwidth, total_elapsed_usec);
-
- *p_bandwidth = (pj_uint32_t)bandwidth;
-
- PJ_LOG(3,(THIS_FILE, " %.4s %d %d %3d us %8d KB/s",
- type_name, thread_cnt, sockpair_cnt,
- -1 /*total_elapsed_usec/sockpair_cnt*/,
- *p_bandwidth));
-
- /* Done. */
- pj_pool_release(pool);
-
- TRACE_((THIS_FILE, " done.."));
- return 0;
-}
-
-/*
- * main test entry.
- */
-int ioqueue_perf_test(void)
-{
- enum { BUF_SIZE = 512 };
- int i, rc;
- struct {
- int type;
- const char *type_name;
- int thread_cnt;
- int sockpair_cnt;
- } test_param[] =
- {
- { PJ_SOCK_DGRAM, "udp", 1, 1},
- { PJ_SOCK_DGRAM, "udp", 1, 2},
- { PJ_SOCK_DGRAM, "udp", 1, 4},
- { PJ_SOCK_DGRAM, "udp", 1, 8},
- { PJ_SOCK_DGRAM, "udp", 2, 1},
- { PJ_SOCK_DGRAM, "udp", 2, 2},
- { PJ_SOCK_DGRAM, "udp", 2, 4},
- { PJ_SOCK_DGRAM, "udp", 2, 8},
- { PJ_SOCK_DGRAM, "udp", 4, 1},
- { PJ_SOCK_DGRAM, "udp", 4, 2},
- { PJ_SOCK_DGRAM, "udp", 4, 4},
- { PJ_SOCK_DGRAM, "udp", 4, 8},
- { PJ_SOCK_STREAM, "tcp", 1, 1},
- { PJ_SOCK_STREAM, "tcp", 1, 2},
- { PJ_SOCK_STREAM, "tcp", 1, 4},
- { PJ_SOCK_STREAM, "tcp", 1, 8},
- { PJ_SOCK_STREAM, "tcp", 2, 1},
- { PJ_SOCK_STREAM, "tcp", 2, 2},
- { PJ_SOCK_STREAM, "tcp", 2, 4},
- { PJ_SOCK_STREAM, "tcp", 2, 8},
- { PJ_SOCK_STREAM, "tcp", 4, 1},
- { PJ_SOCK_STREAM, "tcp", 4, 2},
- { PJ_SOCK_STREAM, "tcp", 4, 4},
- { PJ_SOCK_STREAM, "tcp", 4, 8},
- };
- pj_size_t best_bandwidth;
- int best_index = 0;
-
- PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name()));
- PJ_LOG(3,(THIS_FILE, " ==============================================="));
- PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Avg.Time Bandwidth"));
- PJ_LOG(3,(THIS_FILE, " ==============================================="));
-
- best_bandwidth = 0;
- for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) {
- pj_size_t bandwidth;
-
- rc = perform_test(test_param[i].type,
- test_param[i].type_name,
- test_param[i].thread_cnt,
- test_param[i].sockpair_cnt,
- BUF_SIZE,
- &bandwidth);
- if (rc != 0)
- return rc;
-
- if (bandwidth > best_bandwidth)
- best_bandwidth = bandwidth, best_index = i;
-
- /* Give it a rest before next test. */
- pj_thread_sleep(500);
- }
-
- PJ_LOG(3,(THIS_FILE,
- " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
- test_param[best_index].type_name,
- test_param[best_index].thread_cnt,
- test_param[best_index].sockpair_cnt,
- best_bandwidth));
- PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)",
- BUF_SIZE, last_error_counter));
- return 0;
-}
-
-#else
-/* To prevent warning about "translation unit is empty"
- * when this test is disabled.
- */
-int dummy_uiq_perf_test;
-#endif /* INCLUDE_IOQUEUE_PERF_TEST */
-
-
+/* $Id$ */ +/* + * Copyright (C)2003-2006 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "test.h" +#include <pjlib.h> +#include <pj/compat/high_precision.h> + +/** + * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance + * + * Test the performance of the I/O queue, using typical producer + * consumer test. The test should examine the effect of using multiple + * threads on the performance. + * + * This file is <b>pjlib-test/ioq_perf.c</b> + * + * \include pjlib-test/ioq_perf.c + */ + +#if INCLUDE_IOQUEUE_PERF_TEST + +#ifdef _MSC_VER +# pragma warning ( disable: 4204) // non-constant aggregate initializer +#endif + +#define THIS_FILE "ioq_perf" +//#define TRACE_(expr) PJ_LOG(3,expr) +#define TRACE_(expr) + + +static pj_bool_t thread_quit_flag; +static pj_status_t last_error; +static unsigned last_error_counter; + +/* Descriptor for each producer/consumer pair. */ +typedef struct test_item +{ + pj_sock_t server_fd, + client_fd; + pj_ioqueue_t *ioqueue; + pj_ioqueue_key_t *server_key, + *client_key; + pj_ioqueue_op_key_t recv_op, + send_op; + int has_pending_send; + pj_size_t buffer_size; + char *outgoing_buffer; + char *incoming_buffer; + pj_size_t bytes_sent, + bytes_recv; +} test_item; + +/* Callback when data has been read. + * Increment item->bytes_recv and ready to read the next data. + */ +static void on_read_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + test_item *item = pj_ioqueue_get_user_data(key); + pj_status_t rc; + int data_is_available = 1; + + //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); + + do { + if (thread_quit_flag) + return; + + if (bytes_read < 0) { + pj_status_t rc = -bytes_read; + char errmsg[128]; + + if (rc != last_error) { + //last_error = rc; + pj_strerror(rc, errmsg, sizeof(errmsg)); + PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", + bytes_read, errmsg)); + PJ_LOG(3,(THIS_FILE, + ".....additional info: total read=%u, total sent=%u", + item->bytes_recv, item->bytes_sent)); + } else { + last_error_counter++; + } + bytes_read = 0; + + } else if (bytes_read == 0) { + PJ_LOG(3,(THIS_FILE, "...socket has closed!")); + } + + item->bytes_recv += bytes_read; + + /* To assure that the test quits, even if main thread + * doesn't have time to run. + */ + if (item->bytes_recv > item->buffer_size * 10000) + thread_quit_flag = 1; + + bytes_read = item->buffer_size; + rc = pj_ioqueue_recv( key, op_key, + item->incoming_buffer, &bytes_read, 0 ); + + if (rc == PJ_SUCCESS) { + data_is_available = 1; + } else if (rc == PJ_EPENDING) { + data_is_available = 0; + } else { + data_is_available = 0; + if (rc != last_error) { + last_error = rc; + app_perror("...error: read error(1)", rc); + } else { + last_error_counter++; + } + } + + if (!item->has_pending_send) { + pj_ssize_t sent = item->buffer_size; + rc = pj_ioqueue_send(item->client_key, &item->send_op, + item->outgoing_buffer, &sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } + + } while (data_is_available); +} + +/* Callback when data has been written. + * Increment item->bytes_sent and write the next data. + */ +static void on_write_complete(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent) +{ + test_item *item = pj_ioqueue_get_user_data(key); + + //TRACE_((THIS_FILE, " write complete: sent = %d", bytes_sent)); + + if (thread_quit_flag) + return; + + item->has_pending_send = 0; + item->bytes_sent += bytes_sent; + + if (bytes_sent <= 0) { + PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", + bytes_sent)); + } + else { + pj_status_t rc; + + bytes_sent = item->buffer_size; + rc = pj_ioqueue_send( item->client_key, op_key, + item->outgoing_buffer, &bytes_sent, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: write error", rc); + } + + item->has_pending_send = (rc==PJ_EPENDING); + } +} + +/* The worker thread. */ +static int worker_thread(void *arg) +{ + pj_ioqueue_t *ioqueue = arg; + const pj_time_val timeout = {0, 100}; + int rc; + + while (!thread_quit_flag) { + rc = pj_ioqueue_poll(ioqueue, &timeout); + //TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc)); + if (rc < 0) { + app_perror("...error in pj_ioqueue_poll()", pj_get_netos_error()); + return -1; + } + } + return 0; +} + +/* Calculate the bandwidth for the specific test configuration. + * The test is simple: + * - create sockpair_cnt number of producer-consumer socket pair. + * - create thread_cnt number of worker threads. + * - each producer will send buffer_size bytes data as fast and + * as soon as it can. + * - each consumer will read buffer_size bytes of data as fast + * as it could. + * - measure the total bytes received by all consumers during a + * period of time. + */ +static int perform_test(int sock_type, const char *type_name, + unsigned thread_cnt, unsigned sockpair_cnt, + pj_size_t buffer_size, + pj_size_t *p_bandwidth) +{ + enum { MSEC_DURATION = 5000 }; + pj_pool_t *pool; + test_item *items; + pj_thread_t **thread; + pj_ioqueue_t *ioqueue; + pj_status_t rc; + pj_ioqueue_callback ioqueue_callback; + pj_uint32_t total_elapsed_usec, total_received; + pj_highprec_t bandwidth; + pj_timestamp start, stop; + unsigned i; + + TRACE_((THIS_FILE, " starting test..")); + + ioqueue_callback.on_read_complete = &on_read_complete; + ioqueue_callback.on_write_complete = &on_write_complete; + + thread_quit_flag = 0; + + pool = pj_pool_create(mem, NULL, 4096, 4096, NULL); + if (!pool) + return -10; + + items = pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item)); + thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); + + TRACE_((THIS_FILE, " creating ioqueue..")); + rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); + if (rc != PJ_SUCCESS) { + app_perror("...error: unable to create ioqueue", rc); + return -15; + } + + /* Initialize each producer-consumer pair. */ + for (i=0; i<sockpair_cnt; ++i) { + pj_ssize_t bytes; + + items[i].ioqueue = ioqueue; + items[i].buffer_size = buffer_size; + items[i].outgoing_buffer = pj_pool_alloc(pool, buffer_size); + items[i].incoming_buffer = pj_pool_alloc(pool, buffer_size); + items[i].bytes_recv = items[i].bytes_sent = 0; + + /* randomize outgoing buffer. */ + pj_create_random_string(items[i].outgoing_buffer, buffer_size); + + /* Create socket pair. */ + TRACE_((THIS_FILE, " calling socketpair..")); + rc = app_socketpair(PJ_AF_INET, sock_type, 0, + &items[i].server_fd, &items[i].client_fd); + if (rc != PJ_SUCCESS) { + app_perror("...error: unable to create socket pair", rc); + return -20; + } + + /* Register server socket to ioqueue. */ + TRACE_((THIS_FILE, " register(1)..")); + rc = pj_ioqueue_register_sock(pool, ioqueue, + items[i].server_fd, + &items[i], &ioqueue_callback, + &items[i].server_key); + if (rc != PJ_SUCCESS) { + app_perror("...error: registering server socket to ioqueue", rc); + return -60; + } + + /* Register client socket to ioqueue. */ + TRACE_((THIS_FILE, " register(2)..")); + rc = pj_ioqueue_register_sock(pool, ioqueue, + items[i].client_fd, + &items[i], &ioqueue_callback, + &items[i].client_key); + if (rc != PJ_SUCCESS) { + app_perror("...error: registering server socket to ioqueue", rc); + return -70; + } + + /* Start reading. */ + TRACE_((THIS_FILE, " pj_ioqueue_recv..")); + bytes = items[i].buffer_size; + rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, + items[i].incoming_buffer, &bytes, + 0); + if (rc != PJ_EPENDING) { + app_perror("...error: pj_ioqueue_recv", rc); + return -73; + } + + /* Start writing. */ + TRACE_((THIS_FILE, " pj_ioqueue_write..")); + bytes = items[i].buffer_size; + rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, + items[i].outgoing_buffer, &bytes, 0); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + app_perror("...error: pj_ioqueue_write", rc); + return -76; + } + + items[i].has_pending_send = (rc==PJ_EPENDING); + } + + /* Create the threads. */ + for (i=0; i<thread_cnt; ++i) { + rc = pj_thread_create( pool, NULL, + &worker_thread, + ioqueue, + PJ_THREAD_DEFAULT_STACK_SIZE, + PJ_THREAD_SUSPENDED, &thread[i] ); + if (rc != PJ_SUCCESS) { + app_perror("...error: unable to create thread", rc); + return -80; + } + } + + /* Mark start time. */ + rc = pj_get_timestamp(&start); + if (rc != PJ_SUCCESS) + return -90; + + /* Start the thread. */ + TRACE_((THIS_FILE, " resuming all threads..")); + for (i=0; i<thread_cnt; ++i) { + rc = pj_thread_resume(thread[i]); + if (rc != 0) + return -100; + } + + /* Wait for MSEC_DURATION seconds. + * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually, + * but unfortunately it doesn't work when system doesn't employ + * timeslicing for threads. + */ + TRACE_((THIS_FILE, " wait for few seconds..")); + do { + pj_thread_sleep(1); + + /* Mark end time. */ + rc = pj_get_timestamp(&stop); + + if (thread_quit_flag) { + TRACE_((THIS_FILE, " transfer limit reached..")); + break; + } + + if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) { + TRACE_((THIS_FILE, " time limit reached..")); + break; + } + + } while (1); + + /* Terminate all threads. */ + TRACE_((THIS_FILE, " terminating all threads..")); + thread_quit_flag = 1; + + for (i=0; i<thread_cnt; ++i) { + TRACE_((THIS_FILE, " join thread %d..", i)); + pj_thread_join(thread[i]); + pj_thread_destroy(thread[i]); + } + + /* Close all sockets. */ + TRACE_((THIS_FILE, " closing all sockets..")); + for (i=0; i<sockpair_cnt; ++i) { + pj_ioqueue_unregister(items[i].server_key); + pj_ioqueue_unregister(items[i].client_key); + pj_sock_close(items[i].server_fd); + pj_sock_close(items[i].client_fd); + } + + /* Destroy ioqueue. */ + TRACE_((THIS_FILE, " destroying ioqueue..")); + pj_ioqueue_destroy(ioqueue); + + /* Calculate actual time in usec. */ + total_elapsed_usec = pj_elapsed_usec(&start, &stop); + + /* Calculate total bytes received. */ + total_received = 0; + for (i=0; i<sockpair_cnt; ++i) { + total_received = items[i].bytes_recv; + } + + /* bandwidth = total_received*1000/total_elapsed_usec */ + bandwidth = total_received; + pj_highprec_mul(bandwidth, 1000); + pj_highprec_div(bandwidth, total_elapsed_usec); + + *p_bandwidth = (pj_uint32_t)bandwidth; + + PJ_LOG(3,(THIS_FILE, " %.4s %d %d %3d us %8d KB/s", + type_name, thread_cnt, sockpair_cnt, + -1 /*total_elapsed_usec/sockpair_cnt*/, + *p_bandwidth)); + + /* Done. */ + pj_pool_release(pool); + + TRACE_((THIS_FILE, " done..")); + return 0; +} + +/* + * main test entry. + */ +int ioqueue_perf_test(void) +{ + enum { BUF_SIZE = 512 }; + int i, rc; + struct { + int type; + const char *type_name; + int thread_cnt; + int sockpair_cnt; + } test_param[] = + { + { PJ_SOCK_DGRAM, "udp", 1, 1}, + { PJ_SOCK_DGRAM, "udp", 1, 2}, + { PJ_SOCK_DGRAM, "udp", 1, 4}, + { PJ_SOCK_DGRAM, "udp", 1, 8}, + { PJ_SOCK_DGRAM, "udp", 2, 1}, + { PJ_SOCK_DGRAM, "udp", 2, 2}, + { PJ_SOCK_DGRAM, "udp", 2, 4}, + { PJ_SOCK_DGRAM, "udp", 2, 8}, + { PJ_SOCK_DGRAM, "udp", 4, 1}, + { PJ_SOCK_DGRAM, "udp", 4, 2}, + { PJ_SOCK_DGRAM, "udp", 4, 4}, + { PJ_SOCK_DGRAM, "udp", 4, 8}, + { PJ_SOCK_STREAM, "tcp", 1, 1}, + { PJ_SOCK_STREAM, "tcp", 1, 2}, + { PJ_SOCK_STREAM, "tcp", 1, 4}, + { PJ_SOCK_STREAM, "tcp", 1, 8}, + { PJ_SOCK_STREAM, "tcp", 2, 1}, + { PJ_SOCK_STREAM, "tcp", 2, 2}, + { PJ_SOCK_STREAM, "tcp", 2, 4}, + { PJ_SOCK_STREAM, "tcp", 2, 8}, + { PJ_SOCK_STREAM, "tcp", 4, 1}, + { PJ_SOCK_STREAM, "tcp", 4, 2}, + { PJ_SOCK_STREAM, "tcp", 4, 4}, + { PJ_SOCK_STREAM, "tcp", 4, 8}, + }; + pj_size_t best_bandwidth; + int best_index = 0; + + PJ_LOG(3,(THIS_FILE, " Benchmarking %s ioqueue:", pj_ioqueue_name())); + PJ_LOG(3,(THIS_FILE, " ===============================================")); + PJ_LOG(3,(THIS_FILE, " Type Threads Skt.Pairs Avg.Time Bandwidth")); + PJ_LOG(3,(THIS_FILE, " ===============================================")); + + best_bandwidth = 0; + for (i=0; i<sizeof(test_param)/sizeof(test_param[0]); ++i) { + pj_size_t bandwidth; + + rc = perform_test(test_param[i].type, + test_param[i].type_name, + test_param[i].thread_cnt, + test_param[i].sockpair_cnt, + BUF_SIZE, + &bandwidth); + if (rc != 0) + return rc; + + if (bandwidth > best_bandwidth) + best_bandwidth = bandwidth, best_index = i; + + /* Give it a rest before next test. */ + pj_thread_sleep(500); + } + + PJ_LOG(3,(THIS_FILE, + " Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s", + test_param[best_index].type_name, + test_param[best_index].thread_cnt, + test_param[best_index].sockpair_cnt, + best_bandwidth)); + PJ_LOG(3,(THIS_FILE, " (Note: packet size=%d, total errors=%u)", + BUF_SIZE, last_error_counter)); + return 0; +} + +#else +/* To prevent warning about "translation unit is empty" + * when this test is disabled. + */ +int dummy_uiq_perf_test; +#endif /* INCLUDE_IOQUEUE_PERF_TEST */ + + |