summaryrefslogtreecommitdiff
path: root/pjlib/src/pjlib-test
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-06 09:37:47 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-06 09:37:47 +0000
commit7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch)
tree58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pjlib-test
parent58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff)
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test')
-rw-r--r--pjlib/src/pjlib-test/atomic.c24
-rw-r--r--pjlib/src/pjlib-test/ioq_perf.c178
-rw-r--r--pjlib/src/pjlib-test/ioq_tcp.c172
-rw-r--r--pjlib/src/pjlib-test/ioq_udp.c157
-rw-r--r--pjlib/src/pjlib-test/main.c3
-rw-r--r--pjlib/src/pjlib-test/test.c8
-rw-r--r--pjlib/src/pjlib-test/test.h14
-rw-r--r--pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c181
-rw-r--r--pjlib/src/pjlib-test/udp_echo_srv_sync.c56
9 files changed, 547 insertions, 246 deletions
diff --git a/pjlib/src/pjlib-test/atomic.c b/pjlib/src/pjlib-test/atomic.c
index 429085e1..09bdfdba 100644
--- a/pjlib/src/pjlib-test/atomic.c
+++ b/pjlib/src/pjlib-test/atomic.c
@@ -47,21 +47,29 @@ int atomic_test(void)
/* get: check the value. */
if (pj_atomic_get(atomic_var) != 111)
return -30;
-
- /* increment. */
- if (pj_atomic_inc(atomic_var) != 112)
+
+ /* increment. */
+ pj_atomic_inc(atomic_var);
+ if (pj_atomic_get(atomic_var) != 112)
return -40;
- /* decrement. */
- if (pj_atomic_dec(atomic_var) != 111)
+ /* decrement. */
+ pj_atomic_dec(atomic_var);
+ if (pj_atomic_get(atomic_var) != 111)
return -50;
- /* set */
- if (pj_atomic_set(atomic_var, 211) != 111)
+ /* set */
+ pj_atomic_set(atomic_var, 211);
+ if (pj_atomic_get(atomic_var) != 211)
return -60;
+
+ /* add */
+ pj_atomic_add(atomic_var, 10);
+ if (pj_atomic_get(atomic_var) != 221)
+ return -60;
/* check the value again. */
- if (pj_atomic_get(atomic_var) != 211)
+ if (pj_atomic_get(atomic_var) != 221)
return -70;
/* destroy */
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c
index cb93c4cf..4cd11068 100644
--- a/pjlib/src/pjlib-test/ioq_perf.c
+++ b/pjlib/src/pjlib-test/ioq_perf.c
@@ -34,77 +34,105 @@ static unsigned last_error_counter;
/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
- pj_sock_t server_fd,
- client_fd;
- pj_ioqueue_t *ioqueue;
- pj_ioqueue_key_t *server_key,
- *client_key;
- pj_size_t buffer_size;
- char *outgoing_buffer;
- char *incoming_buffer;
- pj_size_t bytes_sent,
- bytes_recv;
+ pj_sock_t server_fd,
+ client_fd;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_key_t *server_key,
+ *client_key;
+ pj_ioqueue_op_key_t recv_op,
+ send_op;
+ int has_pending_send;
+ pj_size_t buffer_size;
+ char *outgoing_buffer;
+ char *incoming_buffer;
+ pj_size_t bytes_sent,
+ bytes_recv;
} test_item;
/* Callback when data has been read.
* Increment item->bytes_recv and ready to read the next data.
*/
-static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
test_item *item = pj_ioqueue_get_user_data(key);
- pj_status_t rc;
+ pj_status_t rc;
+ int data_is_available = 1;
//TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read));
+
+ do {
+ if (thread_quit_flag)
+ return;
+
+ if (bytes_read < 0) {
+ pj_status_t rc = -bytes_read;
+ char errmsg[128];
+
+ if (rc != last_error) {
+ last_error = rc;
+ pj_strerror(rc, errmsg, sizeof(errmsg));
+ PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
+ bytes_read, errmsg));
+ PJ_LOG(3,(THIS_FILE,
+ ".....additional info: total read=%u, total written=%u",
+ item->bytes_recv, item->bytes_sent));
+ } else {
+ last_error_counter++;
+ }
+ bytes_read = 0;
+
+ } else if (bytes_read == 0) {
+ PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
+ }
- if (thread_quit_flag)
- return;
-
- if (bytes_read < 0) {
- pj_status_t rc = -bytes_read;
- char errmsg[128];
-
- if (rc != last_error) {
- last_error = rc;
- pj_strerror(rc, errmsg, sizeof(errmsg));
- PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)",
- bytes_read, errmsg));
- PJ_LOG(3,(THIS_FILE,
- ".....additional info: total read=%u, total written=%u",
- item->bytes_recv, item->bytes_sent));
- } else {
- last_error_counter++;
- }
- bytes_read = 0;
-
- } else if (bytes_read == 0) {
- PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
- }
-
- item->bytes_recv += bytes_read;
+ item->bytes_recv += bytes_read;
- /* To assure that the test quits, even if main thread
- * doesn't have time to run.
- */
- if (item->bytes_recv > item->buffer_size * 10000)
- thread_quit_flag = 1;
-
- rc = pj_ioqueue_recv( item->ioqueue, item->server_key,
- item->incoming_buffer, item->buffer_size, 0 );
-
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
- if (rc != last_error) {
- last_error = rc;
- app_perror("...error: read error", rc);
- } else {
- last_error_counter++;
- }
- }
+ /* To assure that the test quits, even if main thread
+ * doesn't have time to run.
+ */
+ if (item->bytes_recv > item->buffer_size * 10000)
+ thread_quit_flag = 1;
+
+ bytes_read = item->buffer_size;
+ rc = pj_ioqueue_recv( key, op_key,
+ item->incoming_buffer, &bytes_read, 0 );
+
+ if (rc == PJ_SUCCESS) {
+ data_is_available = 1;
+ } else if (rc == PJ_EPENDING) {
+ data_is_available = 0;
+ } else {
+ data_is_available = 0;
+ if (rc != last_error) {
+ last_error = rc;
+ app_perror("...error: read error", rc);
+ } else {
+ last_error_counter++;
+ }
+ }
+
+ if (!item->has_pending_send) {
+ pj_ssize_t sent = item->buffer_size;
+ rc = pj_ioqueue_send(item->client_key, &item->send_op,
+ item->outgoing_buffer, &sent, 0);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: write error", rc);
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
+ }
+
+ } while (data_is_available);
}
/* Callback when data has been written.
* Increment item->bytes_sent and write the next data.
*/
-static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
{
test_item *item = pj_ioqueue_get_user_data(key);
@@ -112,7 +140,8 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
if (thread_quit_flag)
return;
-
+
+ item->has_pending_send = 0;
item->bytes_sent += bytes_sent;
if (bytes_sent <= 0) {
@@ -121,12 +150,15 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent)
}
else {
pj_status_t rc;
-
- rc = pj_ioqueue_write(item->ioqueue, item->client_key,
- item->outgoing_buffer, item->buffer_size);
+
+ bytes_sent = item->buffer_size;
+ rc = pj_ioqueue_send( item->client_key, op_key,
+ item->outgoing_buffer, &bytes_sent, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: write error", rc);
- }
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
}
}
@@ -191,7 +223,7 @@ static int perform_test(int sock_type, const char *type_name,
thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
TRACE_((THIS_FILE, " creating ioqueue.."));
- rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue);
+ rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
if (rc != PJ_SUCCESS) {
app_perror("...error: unable to create ioqueue", rc);
return -15;
@@ -199,6 +231,7 @@ static int perform_test(int sock_type, const char *type_name,
/* Initialize each producer-consumer pair. */
for (i=0; i<sockpair_cnt; ++i) {
+ pj_ssize_t bytes;
items[i].ioqueue = ioqueue;
items[i].buffer_size = buffer_size;
@@ -241,24 +274,27 @@ static int perform_test(int sock_type, const char *type_name,
}
/* Start reading. */
- TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
- rc = pj_ioqueue_recv(ioqueue, items[i].server_key,
- items[i].incoming_buffer, items[i].buffer_size,
+ TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ bytes = items[i].buffer_size;
+ rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
+ items[i].incoming_buffer, &bytes,
0);
- if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ if (rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_recv", rc);
return -73;
}
/* Start writing. */
- TRACE_((THIS_FILE, " pj_ioqueue_write.."));
- rc = pj_ioqueue_write(ioqueue, items[i].client_key,
- items[i].outgoing_buffer, items[i].buffer_size);
+ TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ bytes = items[i].buffer_size;
+ rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op,
+ items[i].outgoing_buffer, &bytes, 0);
if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write", rc);
return -76;
}
-
+
+ items[i].has_pending_send = (rc==PJ_EPENDING);
}
/* Create the threads. */
@@ -324,8 +360,8 @@ static int perform_test(int sock_type, const char *type_name,
/* Close all sockets. */
TRACE_((THIS_FILE, " closing all sockets.."));
for (i=0; i<sockpair_cnt; ++i) {
- pj_ioqueue_unregister(ioqueue, items[i].server_key);
- pj_ioqueue_unregister(ioqueue, items[i].client_key);
+ pj_ioqueue_unregister(items[i].server_key);
+ pj_ioqueue_unregister(items[i].client_key);
pj_sock_close(items[i].server_fd);
pj_sock_close(items[i].client_fd);
}
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c
index ebce633b..fd5329e5 100644
--- a/pjlib/src/pjlib-test/ioq_tcp.c
+++ b/pjlib/src/pjlib-test/ioq_tcp.c
@@ -31,33 +31,45 @@
#define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2)
#define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048)
-static pj_ssize_t callback_read_size,
- callback_write_size,
- callback_accept_status,
- callback_connect_status;
-static pj_ioqueue_key_t*callback_read_key,
- *callback_write_key,
- *callback_accept_key,
- *callback_connect_key;
-
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static pj_ssize_t callback_read_size,
+ callback_write_size,
+ callback_accept_status,
+ callback_connect_status;
+static pj_ioqueue_key_t *callback_read_key,
+ *callback_write_key,
+ *callback_accept_key,
+ *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
- callback_read_key = key;
+ callback_read_key = key;
+ callback_read_op = op_key;
callback_read_size = bytes_read;
}
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written)
{
- callback_write_key = key;
+ callback_write_key = key;
+ callback_write_op = op_key;
callback_write_size = bytes_written;
}
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock,
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
int status)
{
PJ_UNUSED_ARG(sock);
- callback_accept_key = key;
+ callback_accept_key = key;
+ callback_accept_op = op_key;
callback_accept_status = status;
}
@@ -83,28 +95,38 @@ static int send_recv_test(pj_ioqueue_t *ioque,
pj_ssize_t bufsize,
pj_timestamp *t_elapsed)
{
- int rc;
- pj_ssize_t bytes;
+ pj_status_t status;
+ pj_ssize_t bytes;
+ pj_time_val timeout;
pj_timestamp t1, t2;
- int pending_op = 0;
-
- // Start reading on the server side.
- rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
- if (rc != 0 && rc != PJ_EPENDING) {
+ int pending_op = 0;
+ pj_ioqueue_op_key_t read_op, write_op;
+
+ // Start reading on the server side.
+ bytes = bufsize;
+ status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ app_perror("...pj_ioqueue_recv error", status);
return -100;
}
-
- ++pending_op;
+
+ if (status == PJ_EPENDING)
+ ++pending_op;
+ else {
+ /* Does not expect to return error or immediate data. */
+ return -115;
+ }
// Randomize send buffer.
pj_create_random_string((char*)send_buf, bufsize);
- // Starts send on the client side.
- bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize);
- if (bytes != bufsize && bytes != PJ_EPENDING) {
+ // Starts send on the client side.
+ bytes = bufsize;
+ status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0);
+ if (status != PJ_SUCCESS && bytes != PJ_EPENDING) {
return -120;
}
- if (bytes == PJ_EPENDING) {
+ if (status == PJ_EPENDING) {
++pending_op;
}
@@ -113,37 +135,52 @@ static int send_recv_test(pj_ioqueue_t *ioque,
// Reset indicators
callback_read_size = callback_write_size = 0;
- callback_read_key = callback_write_key = NULL;
+ callback_read_key = callback_write_key = NULL;
+ callback_read_op = callback_write_op = NULL;
// Poll the queue until we've got completion event in the server side.
- rc = 0;
- while (pending_op > 0) {
- rc = pj_ioqueue_poll(ioque, NULL);
- if (rc > 0) {
+ status = 0;
+ while (pending_op > 0) {
+ timeout.sec = 1; timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status > 0) {
if (callback_read_size) {
- if (callback_read_size != bufsize) {
+ if (callback_read_size != bufsize)
return -160;
- }
if (callback_read_key != skey)
- return -161;
+ return -161;
+ if (callback_read_op != &read_op)
+ return -162;
}
if (callback_write_size) {
if (callback_write_key != ckey)
- return -162;
+ return -163;
+ if (callback_write_op != &write_op)
+ return -164;
}
- pending_op -= rc;
- }
- if (rc < 0) {
+ pending_op -= status;
+ }
+ if (status == 0) {
+ PJ_LOG(3,("", "...error: timed out"));
+ }
+ if (status < 0) {
return -170;
}
}
+
+ // Pending op is zero.
+ // Subsequent poll should yield zero too.
+ timeout.sec = timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0)
+ return -173;
// End time.
pj_get_timestamp(&t2);
t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo);
- if (rc < 0) {
- return -150;
+ if (status < 0) {
+ return -176;
}
// Compare recv buffer with send buffer.
@@ -167,7 +204,8 @@ static int compliance_test_0(void)
pj_pool_t *pool = NULL;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
- pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_op_key_t accept_op;
int bufsize = BUF_MIN_SIZE;
pj_ssize_t status = -1;
int pending_op = 0;
@@ -205,7 +243,7 @@ static int compliance_test_0(void)
}
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
app_perror("...ERROR in pj_ioqueue_create()", rc);
status=-20; goto on_error;
@@ -231,7 +269,8 @@ static int compliance_test_0(void)
// Server socket accept()
client_addr_len = sizeof(pj_sockaddr_in);
- status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len);
+ status = pj_ioqueue_accept(skey, &accept_op, &csock0,
+ &client_addr, &rmt_addr, &client_addr_len);
if (status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_accept()", rc);
status=-30; goto on_error;
@@ -247,7 +286,7 @@ static int compliance_test_0(void)
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Client socket connect()
- status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+ status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status!=PJ_SUCCESS && status != PJ_EPENDING) {
app_perror("...ERROR in pj_ioqueue_connect()", rc);
status=-40; goto on_error;
@@ -262,6 +301,7 @@ static int compliance_test_0(void)
callback_read_key = callback_write_key =
callback_accept_key = callback_connect_key = NULL;
+ callback_accept_op = callback_read_op = callback_write_op = NULL;
while (pending_op) {
pj_time_val timeout = {1, 0};
@@ -273,8 +313,12 @@ static int compliance_test_0(void)
status=-41; goto on_error;
}
if (callback_accept_key != skey) {
- status=-41; goto on_error;
- }
+ status=-42; goto on_error;
+ }
+ if (callback_accept_op != &accept_op) {
+ status=-43; goto on_error;
+ }
+ callback_accept_status = -2;
}
if (callback_connect_status != -2) {
@@ -283,7 +327,8 @@ static int compliance_test_0(void)
}
if (callback_connect_key != ckey1) {
status=-51; goto on_error;
- }
+ }
+ callback_connect_status = -2;
}
pending_op -= status;
@@ -293,6 +338,16 @@ static int compliance_test_0(void)
}
}
}
+
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
// Check accepted socket.
if (csock0 == PJ_INVALID_SOCKET) {
@@ -312,7 +367,8 @@ static int compliance_test_0(void)
// Test send and receive.
t_elapsed.u32.lo = 0;
- status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed);
+ status = send_recv_test(ioque, ckey0, ckey1, send_buf,
+ recv_buf, bufsize, &t_elapsed);
if (status != 0) {
goto on_error;
}
@@ -354,7 +410,7 @@ static int compliance_test_1(void)
pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL);
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (!ioque) {
status=-20; goto on_error;
}
@@ -381,7 +437,7 @@ static int compliance_test_1(void)
addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1"));
// Client socket connect()
- status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr));
+ status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr));
if (status==PJ_SUCCESS) {
// unexpectedly success!
status = -30;
@@ -416,7 +472,17 @@ static int compliance_test_1(void)
}
}
}
-
+
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
+
// Success
status = 0;
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c
index a59dac88..6ee90e42 100644
--- a/pjlib/src/pjlib-test/ioq_udp.c
+++ b/pjlib/src/pjlib-test/ioq_udp.c
@@ -34,31 +34,43 @@
#undef TRACE_
#define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg))
-static pj_ssize_t callback_read_size,
- callback_write_size,
- callback_accept_status,
- callback_connect_status;
-static pj_ioqueue_key_t *callback_read_key,
- *callback_write_key,
- *callback_accept_key,
- *callback_connect_key;
-
-static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read)
+static pj_ssize_t callback_read_size,
+ callback_write_size,
+ callback_accept_status,
+ callback_connect_status;
+static pj_ioqueue_key_t *callback_read_key,
+ *callback_write_key,
+ *callback_accept_key,
+ *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+
+static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
{
- callback_read_key = key;
+ callback_read_key = key;
+ callback_read_op = op_key;
callback_read_size = bytes_read;
}
-static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written)
+static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written)
{
- callback_write_key = key;
+ callback_write_key = key;
+ callback_write_op = op_key;
callback_write_size = bytes_written;
}
-static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status)
+static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status)
{
PJ_UNUSED_ARG(sock);
- callback_accept_key = key;
+ callback_accept_key = key;
+ callback_accept_op = op_key;
callback_accept_status = status;
}
@@ -83,29 +95,6 @@ static pj_ioqueue_callback test_cb =
#endif
/*
- * native_format_test()
- * This is just a simple test to verify that various structures in sock.h
- * are really compatible with operating system's definitions.
- */
-static int native_format_test(void)
-{
- pj_status_t rc;
-
- // Test that PJ_INVALID_SOCKET is working.
- {
- pj_sock_t sock;
- rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock);
- if (rc == PJ_SUCCESS)
- return -1020;
- }
-
- // Previous func will set errno var.
- pj_set_os_error(PJ_SUCCESS);
-
- return 0;
-}
-
-/*
* compliance_test()
* To test that the basic IOQueue functionality works. It will just exchange
* data between two sockets.
@@ -118,7 +107,8 @@ static int compliance_test(void)
pj_pool_t *pool = NULL;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
- pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_op_key_t read_op, write_op;
int bufsize = BUF_MIN_SIZE;
pj_ssize_t bytes, status = -1;
pj_str_t temp;
@@ -157,8 +147,7 @@ static int compliance_test(void)
// Create I/O Queue.
TRACE_("create ioqueue...");
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
- PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
status=-20; goto on_error;
}
@@ -194,12 +183,14 @@ static int compliance_test(void)
// Register reading from ioqueue.
TRACE_("start recvfrom...");
- addrlen = sizeof(addr);
- bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0,
- &addr, &addrlen);
- if (bytes < 0 && bytes != PJ_EPENDING) {
+ addrlen = sizeof(addr);
+ bytes = bufsize;
+ rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0,
+ &addr, &addrlen);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_recvfrom", rc);
status=-28; goto on_error;
- } else if (bytes == PJ_EPENDING) {
+ } else if (rc == PJ_EPENDING) {
recv_pending = 1;
PJ_LOG(3, (THIS_FILE,
"......ok: recvfrom returned pending"));
@@ -210,14 +201,14 @@ static int compliance_test(void)
}
// Write must return the number of bytes.
- TRACE_("start sendto...");
- bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr,
- sizeof(addr));
- if (bytes != bufsize && bytes != PJ_EPENDING) {
- PJ_LOG(1,(THIS_FILE,
- "......error: sendto returned %d", bytes));
+ TRACE_("start sendto...");
+ bytes = bufsize;
+ rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr,
+ sizeof(addr));
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_sendto", rc);
status=-30; goto on_error;
- } else if (bytes == PJ_EPENDING) {
+ } else if (rc == PJ_EPENDING) {
send_pending = 1;
PJ_LOG(3, (THIS_FILE,
"......ok: sendto returned pending"));
@@ -232,9 +223,10 @@ static int compliance_test(void)
callback_accept_status = callback_connect_status = -2;
callback_read_key = callback_write_key =
callback_accept_key = callback_connect_key = NULL;
+ callback_read_op = callback_write_op = NULL;
// Poll if pending.
- while (send_pending && recv_pending) {
+ while (send_pending || recv_pending) {
int rc;
pj_time_val timeout = { 5, 0 };
@@ -253,9 +245,11 @@ static int compliance_test(void)
if (callback_read_size != bufsize) {
status=-61; goto on_error;
}
-
if (callback_read_key != skey) {
status=-65; goto on_error;
+ }
+ if (callback_read_op != &read_op) {
+ status=-66; goto on_error;
}
if (memcmp(send_buf, recv_buf, bufsize) != 0) {
@@ -270,9 +264,11 @@ static int compliance_test(void)
if (callback_write_size != bufsize) {
status=-73; goto on_error;
}
-
if (callback_write_key != ckey) {
status=-75; goto on_error;
+ }
+ if (callback_write_op != &write_op) {
+ status=-76; goto on_error;
}
send_pending = 0;
@@ -326,9 +322,7 @@ static int many_handles_test(void)
sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t));
/* Create IOQueue */
- rc = pj_ioqueue_create(pool, MAX,
- PJ_IOQUEUE_DEFAULT_THREADS,
- &ioqueue);
+ rc = pj_ioqueue_create(pool, MAX, &ioqueue);
if (rc != PJ_SUCCESS || ioqueue == NULL) {
app_perror("...error in pj_ioqueue_create", rc);
return -10;
@@ -358,7 +352,7 @@ static int many_handles_test(void)
/* Now deregister and close all handles. */
for (i=0; i<count; ++i) {
- rc = pj_ioqueue_unregister(ioqueue, key[i]);
+ rc = pj_ioqueue_unregister(key[i]);
if (rc != PJ_SUCCESS) {
app_perror("...error in pj_ioqueue_unregister", rc);
}
@@ -392,7 +386,8 @@ static int bench_test(int bufsize, int inactive_sock_count)
pj_sock_t ssock=-1, csock=-1;
pj_sockaddr_in addr;
pj_pool_t *pool = NULL;
- pj_sock_t *inactive_sock=NULL;
+ pj_sock_t *inactive_sock=NULL;
+ pj_ioqueue_op_key_t *inactive_read_op;
char *send_buf, *recv_buf;
pj_ioqueue_t *ioque = NULL;
pj_ioqueue_key_t *skey, *ckey, *key;
@@ -429,8 +424,7 @@ static int bench_test(int bufsize, int inactive_sock_count)
pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES);
// Create I/O Queue.
- rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES,
- PJ_IOQUEUE_DEFAULT_THREADS, &ioque);
+ rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque);
if (rc != PJ_SUCCESS) {
app_perror("...error: pj_ioqueue_create()", rc);
goto on_error;
@@ -439,10 +433,14 @@ static int bench_test(int bufsize, int inactive_sock_count)
// Allocate inactive sockets, and bind them to some arbitrary address.
// Then register them to the I/O queue, and start a read operation.
inactive_sock = (pj_sock_t*)pj_pool_alloc(pool,
- inactive_sock_count*sizeof(pj_sock_t));
+ inactive_sock_count*sizeof(pj_sock_t));
+ inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
+ inactive_sock_count*sizeof(pj_ioqueue_op_key_t));
memset(&addr, 0, sizeof(addr));
addr.sin_family = PJ_AF_INET;
- for (i=0; i<inactive_sock_count; ++i) {
+ for (i=0; i<inactive_sock_count; ++i) {
+ pj_ssize_t bytes;
+
rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]);
if (rc != PJ_SUCCESS || inactive_sock[i] < 0) {
app_perror("...error: pj_sock_socket()", rc);
@@ -462,8 +460,9 @@ static int bench_test(int bufsize, int inactive_sock_count)
app_perror("...error(1): pj_ioqueue_register_sock()", rc);
PJ_LOG(3,(THIS_FILE, "....i=%d", i));
goto on_error;
- }
- rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize);
+ }
+ bytes = bufsize;
+ rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0);
if ( rc < 0 && rc != PJ_EPENDING) {
pj_sock_close(inactive_sock[i]);
inactive_sock[i] = PJ_INVALID_SOCKET;
@@ -495,22 +494,25 @@ static int bench_test(int bufsize, int inactive_sock_count)
// Test loop.
t_elapsed.u64 = 0;
for (i=0; i<LOOP; ++i) {
- pj_ssize_t bytes;
+ pj_ssize_t bytes;
+ pj_ioqueue_op_key_t read_op, write_op;
// Randomize send buffer.
pj_create_random_string(send_buf, bufsize);
- // Start reading on the server side.
- rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize);
+ // Start reading on the server side.
+ bytes = bufsize;
+ rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0);
if (rc < 0 && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_read()", rc);
break;
}
- // Starts send on the client side.
- bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0,
- &addr, sizeof(addr));
- if (bytes != bufsize && bytes != PJ_EPENDING) {
+ // Starts send on the client side.
+ bytes = bufsize;
+ rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0,
+ &addr, sizeof(addr));
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
app_perror("...error: pj_ioqueue_write()", bytes);
rc = -1;
break;
@@ -585,7 +587,7 @@ on_error:
pj_sock_close(csock);
for (i=0; i<inactive_sock_count && inactive_sock &&
inactive_sock[i]!=PJ_INVALID_SOCKET; ++i)
- {
+ {
pj_sock_close(inactive_sock[i]);
}
if (ioque != NULL)
@@ -599,11 +601,6 @@ int udp_ioqueue_test()
int status;
int bufsize, sock_count;
- PJ_LOG(3, (THIS_FILE, "...format test"));
- if ((status = native_format_test()) != 0)
- return status;
- PJ_LOG(3, (THIS_FILE, "....native format test ok"));
-
PJ_LOG(3, (THIS_FILE, "...compliance test"));
if ((status=compliance_test()) != 0) {
return status;
diff --git a/pjlib/src/pjlib-test/main.c b/pjlib/src/pjlib-test/main.c
index 96acc925..6a764e64 100644
--- a/pjlib/src/pjlib-test/main.c
+++ b/pjlib/src/pjlib-test/main.c
@@ -11,7 +11,8 @@ extern const char *param_echo_server;
extern int param_echo_port;
-#if defined(PJ_WIN32) && PJ_WIN32!=0
+//#if defined(PJ_WIN32) && PJ_WIN32!=0
+#if 0
#include <windows.h>
static void boost(void)
{
diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c
index 5e804f69..e9a62e33 100644
--- a/pjlib/src/pjlib-test/test.c
+++ b/pjlib/src/pjlib-test/test.c
@@ -121,10 +121,10 @@ int test_inner(void)
#if PJ_HAS_TCP && INCLUDE_TCP_IOQUEUE_TEST
DO_TEST( tcp_ioqueue_test() );
#endif
-
-#if INCLUDE_IOQUEUE_PERF_TEST
- DO_TEST( ioqueue_perf_test() );
-#endif
+
+#if INCLUDE_IOQUEUE_PERF_TEST
+ DO_TEST( ioqueue_perf_test() );
+#endif
#if INCLUDE_XML_TEST
DO_TEST( xml_test() );
diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h
index 8efe20d0..f4408513 100644
--- a/pjlib/src/pjlib-test/test.h
+++ b/pjlib/src/pjlib-test/test.h
@@ -8,7 +8,7 @@
#define GROUP_LIBC 0
#define GROUP_OS 0
#define GROUP_DATA_STRUCTURE 0
-#define GROUP_NETWORK 0
+#define GROUP_NETWORK 1
#define GROUP_EXTRA 0
#define INCLUDE_ERRNO_TEST GROUP_LIBC
@@ -30,13 +30,13 @@
#define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK
#define INCLUDE_SELECT_TEST GROUP_NETWORK
#define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK
-#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
+#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK
#define INCLUDE_XML_TEST GROUP_EXTRA
-
#define INCLUDE_ECHO_SERVER 0
-#define INCLUDE_ECHO_CLIENT 1
+#define INCLUDE_ECHO_CLIENT 0
+
#define ECHO_SERVER_MAX_THREADS 4
#define ECHO_SERVER_START_PORT 65000
@@ -66,12 +66,16 @@ extern int sock_test(void);
extern int sock_perf_test(void);
extern int select_test(void);
extern int udp_ioqueue_test(void);
-extern int tcp_ioqueue_test(void);
+extern int tcp_ioqueue_test(void);
extern int ioqueue_perf_test(void);
extern int xml_test(void);
extern int echo_server(void);
extern int echo_client(int sock_type, const char *server, int port);
+
+extern int echo_srv_sync(void);
+extern int udp_echo_srv_ioqueue(void);
+extern int echo_srv_common_loop(pj_atomic_t *bytes_counter);
extern pj_pool_factory *mem;
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
new file mode 100644
index 00000000..5fe6d6f2
--- /dev/null
+++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c
@@ -0,0 +1,181 @@
+/* $Id$
+ */
+#include <pjlib.h>
+#include "test.h"
+
+static pj_ioqueue_key_t *key;
+static pj_atomic_t *total_bytes;
+
+struct op_key
+{
+ pj_ioqueue_op_key_t op_key_;
+ struct op_key *peer;
+ char *buffer;
+ pj_size_t size;
+ int is_pending;
+ pj_status_t last_err;
+};
+
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_received)
+{
+ pj_status_t rc;
+ struct op_key *recv_rec = (struct op_key *)op_key;
+
+ for (;;) {
+ struct op_key *send_rec = recv_rec->peer;
+ recv_rec->is_pending = 0;
+
+ if (bytes_received < 0) {
+ PJ_LOG(3,("","...error receiving data, received=%d",
+ bytes_received));
+ } else if (bytes_received == 0) {
+ /* note: previous error, or write callback */
+ } else {
+ pj_atomic_add(total_bytes, bytes_received);
+
+ if (!send_rec->is_pending) {
+ pj_ssize_t sent = bytes_received;
+ pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
+ rc = pj_ioqueue_send(key, &send_rec->op_key_,
+ send_rec->buffer, &sent, 0);
+ send_rec->is_pending = (rc==PJ_EPENDING);
+
+ if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
+ app_perror("...send error", rc);
+ }
+ }
+ }
+
+ if (!send_rec->is_pending) {
+ bytes_received = recv_rec->size;
+ rc = pj_ioqueue_recv(key, &recv_rec->op_key_,
+ recv_rec->buffer, &bytes_received, 0);
+ recv_rec->is_pending = (rc==PJ_EPENDING);
+ if (rc == PJ_SUCCESS) {
+ /* fall through next loop. */
+ } else if (rc == PJ_EPENDING) {
+ /* quit callback. */
+ break;
+ } else {
+ /* error */
+ app_perror("...recv error", rc);
+ recv_rec->last_err = rc;
+
+ bytes_received = 0;
+ /* fall through next loop. */
+ }
+ } else {
+ /* recv will be done when write completion callback is called. */
+ break;
+ }
+ }
+}
+
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ struct op_key *send_rec = (struct op_key*)op_key;
+
+ if (bytes_sent <= 0) {
+ pj_status_t rc = pj_get_netos_error();
+ app_perror("...send error", rc);
+ }
+
+ send_rec->is_pending = 0;
+ on_read_complete(key, &send_rec->peer->op_key_, 0);
+}
+
+static int worker_thread(void *arg)
+{
+ pj_ioqueue_t *ioqueue = arg;
+ struct op_key read_op, write_op;
+ char recv_buf[512], send_buf[512];
+ pj_ssize_t length;
+ pj_status_t rc;
+
+ read_op.peer = &write_op;
+ read_op.is_pending = 0;
+ read_op.last_err = 0;
+ read_op.buffer = recv_buf;
+ read_op.size = sizeof(recv_buf);
+ write_op.peer = &read_op;
+ write_op.is_pending = 0;
+ write_op.last_err = 0;
+ write_op.buffer = send_buf;
+ write_op.size = sizeof(send_buf);
+
+ length = sizeof(recv_buf);
+ rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);
+ if (rc == PJ_SUCCESS) {
+ read_op.is_pending = 1;
+ on_read_complete(key, &read_op.op_key_, length);
+ }
+
+ for (;;) {
+ pj_time_val timeout;
+ timeout.sec = 0; timeout.msec = 10;
+ rc = pj_ioqueue_poll(ioqueue, &timeout);
+ }
+}
+
+int udp_echo_srv_ioqueue(void)
+{
+ pj_pool_t *pool;
+ pj_sock_t sock;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback callback;
+ int i;
+ pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
+ pj_status_t rc;
+
+ pj_memset(&callback, 0, sizeof(callback));
+ callback.on_read_complete = &on_read_complete;
+ callback.on_write_complete = &on_write_complete;
+
+ pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
+ if (!pool)
+ return -10;
+
+ rc = pj_ioqueue_create(pool, 2, &ioqueue);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...pj_ioqueue_create error", rc);
+ return -20;
+ }
+
+ rc = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
+ ECHO_SERVER_START_PORT, &sock);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...app_socket error", rc);
+ return -30;
+ }
+
+ rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
+ &callback, &key);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error registering socket", rc);
+ return -40;
+ }
+
+ rc = pj_atomic_create(pool, 0, &total_bytes);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error creating atomic variable", rc);
+ return -45;
+ }
+
+ for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
+ rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
+ PJ_THREAD_DEFAULT_STACK_SIZE, 0,
+ &thread[i]);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...create thread error", rc);
+ return -50;
+ }
+ }
+
+ echo_srv_common_loop(total_bytes);
+
+ return 0;
+}
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_sync.c b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
index 0e73b134..19ee702c 100644
--- a/pjlib/src/pjlib-test/udp_echo_srv_sync.c
+++ b/pjlib/src/pjlib-test/udp_echo_srv_sync.c
@@ -8,7 +8,7 @@ static pj_atomic_t *total_bytes;
static int worker_thread(void *arg)
{
pj_sock_t sock = (pj_sock_t)arg;
- char buf[1516];
+ char buf[512];
pj_status_t last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS;
for (;;) {
@@ -48,9 +48,6 @@ int echo_srv_sync(void)
pj_sock_t sock;
pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
pj_status_t rc;
- pj_highprec_t last_received, avg_bw, highest_bw;
- pj_time_val last_print;
- unsigned count;
int i;
pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
@@ -83,25 +80,36 @@ int echo_srv_sync(void)
ECHO_SERVER_MAX_THREADS, ECHO_SERVER_START_PORT));
PJ_LOG(3,("", "...Press Ctrl-C to abort"));
+ echo_srv_common_loop(total_bytes);
+ return 0;
+}
+
+
+int echo_srv_common_loop(pj_atomic_t *bytes_counter)
+{
+ pj_highprec_t last_received, avg_bw, highest_bw;
+ pj_time_val last_print;
+ unsigned count;
+
last_received = 0;
pj_gettimeofday(&last_print);
avg_bw = highest_bw = 0;
- count = 0;
-
+ count = 0;
+
for (;;) {
- pj_highprec_t received, cur_received, bw;
+ pj_highprec_t received, cur_received, bw;
unsigned msec;
pj_time_val now, duration;
pj_thread_sleep(1000);
-
- received = cur_received = pj_atomic_get(total_bytes);
+
+ received = cur_received = pj_atomic_get(bytes_counter);
cur_received = cur_received - last_received;
-
+
pj_gettimeofday(&now);
duration = now;
PJ_TIME_VAL_SUB(duration, last_print);
- msec = PJ_TIME_VAL_MSEC(duration);
+ msec = PJ_TIME_VAL_MSEC(duration);
bw = cur_received;
pj_highprec_mul(bw, 1000);
@@ -113,13 +121,13 @@ int echo_srv_sync(void)
avg_bw = avg_bw + bw;
count++;
- PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
- ECHO_SERVER_MAX_THREADS,
- (unsigned)(bw / 1000),
- (unsigned)(avg_bw / count / 1000),
- (count==20 ? "<ses avg>" : "")));
-
- if (count==20) {
+ PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
+ ECHO_SERVER_MAX_THREADS,
+ (unsigned)(bw / 1000),
+ (unsigned)(avg_bw / count / 1000),
+ (count==20 ? "<ses avg>" : "")));
+
+ if (count==20) {
if (avg_bw/count > highest_bw)
highest_bw = avg_bw/count;
@@ -127,9 +135,9 @@ int echo_srv_sync(void)
avg_bw = 0;
PJ_LOG(3,("", "Highest average bandwidth=%u KB/s",
- (unsigned)(highest_bw/1000)));
- }
- }
-}
-
-
+ (unsigned)(highest_bw/1000)));
+ }
+ }
+}
+
+