diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
commit | 7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch) | |
tree | 58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pjlib-test | |
parent | 58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff) |
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test')
-rw-r--r-- | pjlib/src/pjlib-test/atomic.c | 24 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 178 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_tcp.c | 172 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/ioq_udp.c | 157 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/main.c | 3 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.c | 8 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/test.h | 14 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c | 181 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/udp_echo_srv_sync.c | 56 |
9 files changed, 547 insertions, 246 deletions
diff --git a/pjlib/src/pjlib-test/atomic.c b/pjlib/src/pjlib-test/atomic.c index 429085e1..09bdfdba 100644 --- a/pjlib/src/pjlib-test/atomic.c +++ b/pjlib/src/pjlib-test/atomic.c @@ -47,21 +47,29 @@ int atomic_test(void) /* get: check the value. */ if (pj_atomic_get(atomic_var) != 111) return -30; - - /* increment. */ - if (pj_atomic_inc(atomic_var) != 112) +
+ /* increment. */
+ pj_atomic_inc(atomic_var); + if (pj_atomic_get(atomic_var) != 112) return -40; - /* decrement. */ - if (pj_atomic_dec(atomic_var) != 111) + /* decrement. */
+ pj_atomic_dec(atomic_var); + if (pj_atomic_get(atomic_var) != 111) return -50; - /* set */ - if (pj_atomic_set(atomic_var, 211) != 111) + /* set */
+ pj_atomic_set(atomic_var, 211); + if (pj_atomic_get(atomic_var) != 211) return -60; +
+ /* add */
+ pj_atomic_add(atomic_var, 10);
+ if (pj_atomic_get(atomic_var) != 221)
+ return -60;
/* check the value again. */ - if (pj_atomic_get(atomic_var) != 211) + if (pj_atomic_get(atomic_var) != 221) return -70; /* destroy */ diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index cb93c4cf..4cd11068 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -34,77 +34,105 @@ static unsigned last_error_counter; /* Descriptor for each producer/consumer pair. */ typedef struct test_item { - pj_sock_t server_fd, - client_fd; - pj_ioqueue_t *ioqueue; - pj_ioqueue_key_t *server_key, - *client_key; - pj_size_t buffer_size; - char *outgoing_buffer; - char *incoming_buffer; - pj_size_t bytes_sent, - bytes_recv; + pj_sock_t server_fd, + client_fd; + pj_ioqueue_t *ioqueue; + pj_ioqueue_key_t *server_key, + *client_key;
+ pj_ioqueue_op_key_t recv_op,
+ send_op;
+ int has_pending_send; + pj_size_t buffer_size; + char *outgoing_buffer; + char *incoming_buffer; + pj_size_t bytes_sent, + bytes_recv; } test_item; /* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */ -static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { test_item *item = pj_ioqueue_get_user_data(key); - pj_status_t rc; + pj_status_t rc;
+ int data_is_available = 1; //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); +
+ do { + if (thread_quit_flag) + return; + + if (bytes_read < 0) { + pj_status_t rc = -bytes_read; + char errmsg[128]; + + if (rc != last_error) { + last_error = rc; + pj_strerror(rc, errmsg, sizeof(errmsg)); + PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", + bytes_read, errmsg)); + PJ_LOG(3,(THIS_FILE, + ".....additional info: total read=%u, total written=%u", + item->bytes_recv, item->bytes_sent)); + } else { + last_error_counter++; + } + bytes_read = 0; + + } else if (bytes_read == 0) { + PJ_LOG(3,(THIS_FILE, "...socket has closed!")); + } - if (thread_quit_flag) - return; - - if (bytes_read < 0) { - pj_status_t rc = -bytes_read; - char errmsg[128]; - - if (rc != last_error) { - last_error = rc; - pj_strerror(rc, errmsg, sizeof(errmsg)); - PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", - bytes_read, errmsg)); - PJ_LOG(3,(THIS_FILE, - ".....additional info: total read=%u, total written=%u", - item->bytes_recv, item->bytes_sent)); - } else { - last_error_counter++; - } - bytes_read = 0; - - } else if (bytes_read == 0) { - PJ_LOG(3,(THIS_FILE, "...socket has closed!")); - } - - item->bytes_recv += bytes_read; + item->bytes_recv += bytes_read; - /* To assure that the test quits, even if main thread - * doesn't have time to run. - */ - if (item->bytes_recv > item->buffer_size * 10000) - thread_quit_flag = 1; - - rc = pj_ioqueue_recv( item->ioqueue, item->server_key, - item->incoming_buffer, item->buffer_size, 0 ); - - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - if (rc != last_error) { - last_error = rc; - app_perror("...error: read error", rc); - } else { - last_error_counter++; - } - } + /* To assure that the test quits, even if main thread + * doesn't have time to run. + */ + if (item->bytes_recv > item->buffer_size * 10000) + thread_quit_flag = 1; +
+ bytes_read = item->buffer_size; + rc = pj_ioqueue_recv( key, op_key, + item->incoming_buffer, &bytes_read, 0 ); + + if (rc == PJ_SUCCESS) {
+ data_is_available = 1;
+ } else if (rc == PJ_EPENDING) {
+ data_is_available = 0;
+ } else {
+ data_is_available = 0; + if (rc != last_error) { + last_error = rc; + app_perror("...error: read error", rc); + } else { + last_error_counter++; + } + }
+
+ if (!item->has_pending_send) {
+ pj_ssize_t sent = item->buffer_size;
+ rc = pj_ioqueue_send(item->client_key, &item->send_op,
+ item->outgoing_buffer, &sent, 0);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: write error", rc);
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
+ }
+
+ } while (data_is_available); } /* Callback when data has been written. * Increment item->bytes_sent and write the next data. */ -static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) +static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent) { test_item *item = pj_ioqueue_get_user_data(key); @@ -112,7 +140,8 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) if (thread_quit_flag) return; - +
+ item->has_pending_send = 0; item->bytes_sent += bytes_sent; if (bytes_sent <= 0) { @@ -121,12 +150,15 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) } else { pj_status_t rc; - - rc = pj_ioqueue_write(item->ioqueue, item->client_key, - item->outgoing_buffer, item->buffer_size); +
+ bytes_sent = item->buffer_size; + rc = pj_ioqueue_send( item->client_key, op_key, + item->outgoing_buffer, &bytes_sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); - } + }
+
+ item->has_pending_send = (rc==PJ_EPENDING); } } @@ -191,7 +223,7 @@ static int perform_test(int sock_type, const char *type_name, thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); TRACE_((THIS_FILE, " creating ioqueue..")); - rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue); + rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); if (rc != PJ_SUCCESS) { app_perror("...error: unable to create ioqueue", rc); return -15; @@ -199,6 +231,7 @@ static int perform_test(int sock_type, const char *type_name, /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { + pj_ssize_t bytes;
items[i].ioqueue = ioqueue; items[i].buffer_size = buffer_size; @@ -241,24 +274,27 @@ static int perform_test(int sock_type, const char *type_name, } /* Start reading. */ - TRACE_((THIS_FILE, " pj_ioqueue_recv..")); - rc = pj_ioqueue_recv(ioqueue, items[i].server_key, - items[i].incoming_buffer, items[i].buffer_size, + TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ bytes = items[i].buffer_size; + rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, + items[i].incoming_buffer, &bytes, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + if (rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_recv", rc); return -73; } /* Start writing. */ - TRACE_((THIS_FILE, " pj_ioqueue_write..")); - rc = pj_ioqueue_write(ioqueue, items[i].client_key, - items[i].outgoing_buffer, items[i].buffer_size); + TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ bytes = items[i].buffer_size; + rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, + items[i].outgoing_buffer, &bytes, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_write", rc); return -76; } - +
+ items[i].has_pending_send = (rc==PJ_EPENDING); } /* Create the threads. */ @@ -324,8 +360,8 @@ static int perform_test(int sock_type, const char *type_name, /* Close all sockets. */ TRACE_((THIS_FILE, " closing all sockets..")); for (i=0; i<sockpair_cnt; ++i) { - pj_ioqueue_unregister(ioqueue, items[i].server_key); - pj_ioqueue_unregister(ioqueue, items[i].client_key); + pj_ioqueue_unregister(items[i].server_key); + pj_ioqueue_unregister(items[i].client_key); pj_sock_close(items[i].server_fd); pj_sock_close(items[i].client_fd); } diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index ebce633b..fd5329e5 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -31,33 +31,45 @@ #define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2) #define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048) -static pj_ssize_t callback_read_size, - callback_write_size, - callback_accept_status, - callback_connect_status; -static pj_ioqueue_key_t*callback_read_key, - *callback_write_key, - *callback_accept_key, - *callback_connect_key; - -static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static pj_ssize_t callback_read_size, + callback_write_size, + callback_accept_status, + callback_connect_status; +static pj_ioqueue_key_t *callback_read_key, + *callback_write_key, + *callback_accept_key, + *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op; + +static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { - callback_read_key = key; + callback_read_key = key;
+ callback_read_op = op_key; callback_read_size = bytes_read; } -static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) +static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written) { - callback_write_key = key; + callback_write_key = key;
+ callback_write_op = op_key; callback_write_size = bytes_written; } -static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, +static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status) { PJ_UNUSED_ARG(sock); - callback_accept_key = key; + callback_accept_key = key;
+ callback_accept_op = op_key; callback_accept_status = status; } @@ -83,28 +95,38 @@ static int send_recv_test(pj_ioqueue_t *ioque, pj_ssize_t bufsize, pj_timestamp *t_elapsed) { - int rc; - pj_ssize_t bytes; + pj_status_t status; + pj_ssize_t bytes;
+ pj_time_val timeout; pj_timestamp t1, t2; - int pending_op = 0; - - // Start reading on the server side. - rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); - if (rc != 0 && rc != PJ_EPENDING) { + int pending_op = 0;
+ pj_ioqueue_op_key_t read_op, write_op; + + // Start reading on the server side.
+ bytes = bufsize; + status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); + if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ app_perror("...pj_ioqueue_recv error", status); return -100; } - - ++pending_op; +
+ if (status == PJ_EPENDING) + ++pending_op;
+ else {
+ /* Does not expect to return error or immediate data. */
+ return -115;
+ } // Randomize send buffer. pj_create_random_string((char*)send_buf, bufsize); - // Starts send on the client side. - bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize); - if (bytes != bufsize && bytes != PJ_EPENDING) { + // Starts send on the client side.
+ bytes = bufsize; + status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0); + if (status != PJ_SUCCESS && bytes != PJ_EPENDING) { return -120; } - if (bytes == PJ_EPENDING) { + if (status == PJ_EPENDING) { ++pending_op; } @@ -113,37 +135,52 @@ static int send_recv_test(pj_ioqueue_t *ioque, // Reset indicators callback_read_size = callback_write_size = 0; - callback_read_key = callback_write_key = NULL; + callback_read_key = callback_write_key = NULL;
+ callback_read_op = callback_write_op = NULL; // Poll the queue until we've got completion event in the server side. - rc = 0; - while (pending_op > 0) { - rc = pj_ioqueue_poll(ioque, NULL); - if (rc > 0) { + status = 0; + while (pending_op > 0) {
+ timeout.sec = 1; timeout.msec = 0; + status = pj_ioqueue_poll(ioque, &timeout); + if (status > 0) { if (callback_read_size) { - if (callback_read_size != bufsize) { + if (callback_read_size != bufsize) return -160; - } if (callback_read_key != skey) - return -161; + return -161;
+ if (callback_read_op != &read_op)
+ return -162; } if (callback_write_size) { if (callback_write_key != ckey) - return -162; + return -163;
+ if (callback_write_op != &write_op)
+ return -164; } - pending_op -= rc; - } - if (rc < 0) { + pending_op -= status; + }
+ if (status == 0) {
+ PJ_LOG(3,("", "...error: timed out"));
+ } + if (status < 0) { return -170; } } +
+ // Pending op is zero.
+ // Subsequent poll should yield zero too.
+ timeout.sec = timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0)
+ return -173;
// End time. pj_get_timestamp(&t2); t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); - if (rc < 0) { - return -150; + if (status < 0) { + return -176; } // Compare recv buffer with send buffer. @@ -167,7 +204,8 @@ static int compliance_test_0(void) pj_pool_t *pool = NULL; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; - pj_ioqueue_key_t *skey, *ckey0, *ckey1; + pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_op_key_t accept_op; int bufsize = BUF_MIN_SIZE; pj_ssize_t status = -1; int pending_op = 0; @@ -205,7 +243,7 @@ static int compliance_test_0(void) } // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { app_perror("...ERROR in pj_ioqueue_create()", rc); status=-20; goto on_error; @@ -231,7 +269,8 @@ static int compliance_test_0(void) // Server socket accept() client_addr_len = sizeof(pj_sockaddr_in); - status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len); + status = pj_ioqueue_accept(skey, &accept_op, &csock0,
+ &client_addr, &rmt_addr, &client_addr_len); if (status != PJ_EPENDING) { app_perror("...ERROR in pj_ioqueue_accept()", rc); status=-30; goto on_error; @@ -247,7 +286,7 @@ static int compliance_test_0(void) addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); // Client socket connect() - status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); + status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); if (status!=PJ_SUCCESS && status != PJ_EPENDING) { app_perror("...ERROR in pj_ioqueue_connect()", rc); status=-40; goto on_error; @@ -262,6 +301,7 @@ static int compliance_test_0(void) callback_read_key = callback_write_key = callback_accept_key = callback_connect_key = NULL; + callback_accept_op = callback_read_op = callback_write_op = NULL;
while (pending_op) { pj_time_val timeout = {1, 0}; @@ -273,8 +313,12 @@ static int compliance_test_0(void) status=-41; goto on_error; } if (callback_accept_key != skey) { - status=-41; goto on_error; - } + status=-42; goto on_error; + }
+ if (callback_accept_op != &accept_op) {
+ status=-43; goto on_error;
+ }
+ callback_accept_status = -2; } if (callback_connect_status != -2) { @@ -283,7 +327,8 @@ static int compliance_test_0(void) } if (callback_connect_key != ckey1) { status=-51; goto on_error; - } + }
+ callback_connect_status = -2; } pending_op -= status; @@ -293,6 +338,16 @@ static int compliance_test_0(void) } } } +
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
// Check accepted socket. if (csock0 == PJ_INVALID_SOCKET) { @@ -312,7 +367,8 @@ static int compliance_test_0(void) // Test send and receive. t_elapsed.u32.lo = 0; - status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed); + status = send_recv_test(ioque, ckey0, ckey1, send_buf,
+ recv_buf, bufsize, &t_elapsed); if (status != 0) { goto on_error; } @@ -354,7 +410,7 @@ static int compliance_test_1(void) pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL); // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (!ioque) { status=-20; goto on_error; } @@ -381,7 +437,7 @@ static int compliance_test_1(void) addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); // Client socket connect() - status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); + status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); if (status==PJ_SUCCESS) { // unexpectedly success! status = -30; @@ -416,7 +472,17 @@ static int compliance_test_1(void) } } } - +
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
+
// Success status = 0; diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c index a59dac88..6ee90e42 100644 --- a/pjlib/src/pjlib-test/ioq_udp.c +++ b/pjlib/src/pjlib-test/ioq_udp.c @@ -34,31 +34,43 @@ #undef TRACE_ #define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg)) -static pj_ssize_t callback_read_size, - callback_write_size, - callback_accept_status, - callback_connect_status; -static pj_ioqueue_key_t *callback_read_key, - *callback_write_key, - *callback_accept_key, - *callback_connect_key; - -static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static pj_ssize_t callback_read_size, + callback_write_size, + callback_accept_status, + callback_connect_status; +static pj_ioqueue_key_t *callback_read_key, + *callback_write_key, + *callback_accept_key, + *callback_connect_key; +static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+ +static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { - callback_read_key = key; + callback_read_key = key;
+ callback_read_op = op_key; callback_read_size = bytes_read; } -static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) +static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written) { - callback_write_key = key; + callback_write_key = key;
+ callback_write_op = op_key; callback_write_size = bytes_written; } -static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status) +static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status) { PJ_UNUSED_ARG(sock); - callback_accept_key = key; + callback_accept_key = key;
+ callback_accept_op = op_key; callback_accept_status = status; } @@ -83,29 +95,6 @@ static pj_ioqueue_callback test_cb = #endif /* - * native_format_test() - * This is just a simple test to verify that various structures in sock.h - * are really compatible with operating system's definitions. - */ -static int native_format_test(void) -{ - pj_status_t rc; - - // Test that PJ_INVALID_SOCKET is working. - { - pj_sock_t sock; - rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock); - if (rc == PJ_SUCCESS) - return -1020; - } - - // Previous func will set errno var. - pj_set_os_error(PJ_SUCCESS); - - return 0; -} - -/* * compliance_test() * To test that the basic IOQueue functionality works. It will just exchange * data between two sockets. @@ -118,7 +107,8 @@ static int compliance_test(void) pj_pool_t *pool = NULL; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; - pj_ioqueue_key_t *skey, *ckey; + pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_op_key_t read_op, write_op; int bufsize = BUF_MIN_SIZE; pj_ssize_t bytes, status = -1; pj_str_t temp; @@ -157,8 +147,7 @@ static int compliance_test(void) // Create I/O Queue. TRACE_("create ioqueue..."); - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, - PJ_IOQUEUE_DEFAULT_THREADS, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { status=-20; goto on_error; } @@ -194,12 +183,14 @@ static int compliance_test(void) // Register reading from ioqueue. TRACE_("start recvfrom..."); - addrlen = sizeof(addr); - bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0, - &addr, &addrlen); - if (bytes < 0 && bytes != PJ_EPENDING) { + addrlen = sizeof(addr);
+ bytes = bufsize; + rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, + &addr, &addrlen); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_recvfrom", rc); status=-28; goto on_error; - } else if (bytes == PJ_EPENDING) { + } else if (rc == PJ_EPENDING) { recv_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: recvfrom returned pending")); @@ -210,14 +201,14 @@ static int compliance_test(void) } // Write must return the number of bytes. - TRACE_("start sendto..."); - bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr, - sizeof(addr)); - if (bytes != bufsize && bytes != PJ_EPENDING) { - PJ_LOG(1,(THIS_FILE, - "......error: sendto returned %d", bytes)); + TRACE_("start sendto...");
+ bytes = bufsize; + rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr, + sizeof(addr)); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_sendto", rc); status=-30; goto on_error; - } else if (bytes == PJ_EPENDING) { + } else if (rc == PJ_EPENDING) { send_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: sendto returned pending")); @@ -232,9 +223,10 @@ static int compliance_test(void) callback_accept_status = callback_connect_status = -2; callback_read_key = callback_write_key = callback_accept_key = callback_connect_key = NULL; + callback_read_op = callback_write_op = NULL;
// Poll if pending. - while (send_pending && recv_pending) { + while (send_pending || recv_pending) { int rc; pj_time_val timeout = { 5, 0 }; @@ -253,9 +245,11 @@ static int compliance_test(void) if (callback_read_size != bufsize) { status=-61; goto on_error; } - if (callback_read_key != skey) { status=-65; goto on_error; + }
+ if (callback_read_op != &read_op) {
+ status=-66; goto on_error;
} if (memcmp(send_buf, recv_buf, bufsize) != 0) { @@ -270,9 +264,11 @@ static int compliance_test(void) if (callback_write_size != bufsize) { status=-73; goto on_error; } - if (callback_write_key != ckey) { status=-75; goto on_error; + }
+ if (callback_write_op != &write_op) {
+ status=-76; goto on_error;
} send_pending = 0; @@ -326,9 +322,7 @@ static int many_handles_test(void) sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t)); /* Create IOQueue */ - rc = pj_ioqueue_create(pool, MAX, - PJ_IOQUEUE_DEFAULT_THREADS, - &ioqueue); + rc = pj_ioqueue_create(pool, MAX, &ioqueue); if (rc != PJ_SUCCESS || ioqueue == NULL) { app_perror("...error in pj_ioqueue_create", rc); return -10; @@ -358,7 +352,7 @@ static int many_handles_test(void) /* Now deregister and close all handles. */ for (i=0; i<count; ++i) { - rc = pj_ioqueue_unregister(ioqueue, key[i]); + rc = pj_ioqueue_unregister(key[i]); if (rc != PJ_SUCCESS) { app_perror("...error in pj_ioqueue_unregister", rc); } @@ -392,7 +386,8 @@ static int bench_test(int bufsize, int inactive_sock_count) pj_sock_t ssock=-1, csock=-1; pj_sockaddr_in addr; pj_pool_t *pool = NULL; - pj_sock_t *inactive_sock=NULL; + pj_sock_t *inactive_sock=NULL;
+ pj_ioqueue_op_key_t *inactive_read_op; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; pj_ioqueue_key_t *skey, *ckey, *key; @@ -429,8 +424,7 @@ static int bench_test(int bufsize, int inactive_sock_count) pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES); // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, - PJ_IOQUEUE_DEFAULT_THREADS, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { app_perror("...error: pj_ioqueue_create()", rc); goto on_error; @@ -439,10 +433,14 @@ static int bench_test(int bufsize, int inactive_sock_count) // Allocate inactive sockets, and bind them to some arbitrary address. // Then register them to the I/O queue, and start a read operation. inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, - inactive_sock_count*sizeof(pj_sock_t)); + inactive_sock_count*sizeof(pj_sock_t));
+ inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
+ inactive_sock_count*sizeof(pj_ioqueue_op_key_t)); memset(&addr, 0, sizeof(addr)); addr.sin_family = PJ_AF_INET; - for (i=0; i<inactive_sock_count; ++i) { + for (i=0; i<inactive_sock_count; ++i) {
+ pj_ssize_t bytes;
+ rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]); if (rc != PJ_SUCCESS || inactive_sock[i] < 0) { app_perror("...error: pj_sock_socket()", rc); @@ -462,8 +460,9 @@ static int bench_test(int bufsize, int inactive_sock_count) app_perror("...error(1): pj_ioqueue_register_sock()", rc); PJ_LOG(3,(THIS_FILE, "....i=%d", i)); goto on_error; - } - rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize); + }
+ bytes = bufsize; + rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0); if ( rc < 0 && rc != PJ_EPENDING) { pj_sock_close(inactive_sock[i]); inactive_sock[i] = PJ_INVALID_SOCKET; @@ -495,22 +494,25 @@ static int bench_test(int bufsize, int inactive_sock_count) // Test loop. t_elapsed.u64 = 0; for (i=0; i<LOOP; ++i) { - pj_ssize_t bytes; + pj_ssize_t bytes;
+ pj_ioqueue_op_key_t read_op, write_op; // Randomize send buffer. pj_create_random_string(send_buf, bufsize); - // Start reading on the server side. - rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); + // Start reading on the server side.
+ bytes = bufsize; + rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); if (rc < 0 && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_read()", rc); break; } - // Starts send on the client side. - bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, - &addr, sizeof(addr)); - if (bytes != bufsize && bytes != PJ_EPENDING) { + // Starts send on the client side.
+ bytes = bufsize; + rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, + &addr, sizeof(addr)); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_write()", bytes); rc = -1; break; @@ -585,7 +587,7 @@ on_error: pj_sock_close(csock); for (i=0; i<inactive_sock_count && inactive_sock && inactive_sock[i]!=PJ_INVALID_SOCKET; ++i) - { + {
pj_sock_close(inactive_sock[i]); } if (ioque != NULL) @@ -599,11 +601,6 @@ int udp_ioqueue_test() int status; int bufsize, sock_count; - PJ_LOG(3, (THIS_FILE, "...format test")); - if ((status = native_format_test()) != 0) - return status; - PJ_LOG(3, (THIS_FILE, "....native format test ok")); - PJ_LOG(3, (THIS_FILE, "...compliance test")); if ((status=compliance_test()) != 0) { return status; diff --git a/pjlib/src/pjlib-test/main.c b/pjlib/src/pjlib-test/main.c index 96acc925..6a764e64 100644 --- a/pjlib/src/pjlib-test/main.c +++ b/pjlib/src/pjlib-test/main.c @@ -11,7 +11,8 @@ extern const char *param_echo_server; extern int param_echo_port; -#if defined(PJ_WIN32) && PJ_WIN32!=0 +//#if defined(PJ_WIN32) && PJ_WIN32!=0
+#if 0 #include <windows.h> static void boost(void) { diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c index 5e804f69..e9a62e33 100644 --- a/pjlib/src/pjlib-test/test.c +++ b/pjlib/src/pjlib-test/test.c @@ -121,10 +121,10 @@ int test_inner(void) #if PJ_HAS_TCP && INCLUDE_TCP_IOQUEUE_TEST DO_TEST( tcp_ioqueue_test() ); #endif - -#if INCLUDE_IOQUEUE_PERF_TEST - DO_TEST( ioqueue_perf_test() ); -#endif +
+#if INCLUDE_IOQUEUE_PERF_TEST
+ DO_TEST( ioqueue_perf_test() );
+#endif
#if INCLUDE_XML_TEST DO_TEST( xml_test() ); diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h index 8efe20d0..f4408513 100644 --- a/pjlib/src/pjlib-test/test.h +++ b/pjlib/src/pjlib-test/test.h @@ -8,7 +8,7 @@ #define GROUP_LIBC 0 #define GROUP_OS 0 #define GROUP_DATA_STRUCTURE 0 -#define GROUP_NETWORK 0 +#define GROUP_NETWORK 1 #define GROUP_EXTRA 0 #define INCLUDE_ERRNO_TEST GROUP_LIBC @@ -30,13 +30,13 @@ #define INCLUDE_SOCK_PERF_TEST GROUP_NETWORK #define INCLUDE_SELECT_TEST GROUP_NETWORK #define INCLUDE_UDP_IOQUEUE_TEST GROUP_NETWORK -#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK +#define INCLUDE_TCP_IOQUEUE_TEST GROUP_NETWORK
#define INCLUDE_IOQUEUE_PERF_TEST GROUP_NETWORK #define INCLUDE_XML_TEST GROUP_EXTRA - #define INCLUDE_ECHO_SERVER 0 -#define INCLUDE_ECHO_CLIENT 1 +#define INCLUDE_ECHO_CLIENT 0 +
#define ECHO_SERVER_MAX_THREADS 4 #define ECHO_SERVER_START_PORT 65000 @@ -66,12 +66,16 @@ extern int sock_test(void); extern int sock_perf_test(void); extern int select_test(void); extern int udp_ioqueue_test(void); -extern int tcp_ioqueue_test(void); +extern int tcp_ioqueue_test(void);
extern int ioqueue_perf_test(void); extern int xml_test(void); extern int echo_server(void); extern int echo_client(int sock_type, const char *server, int port); +
+extern int echo_srv_sync(void);
+extern int udp_echo_srv_ioqueue(void);
+extern int echo_srv_common_loop(pj_atomic_t *bytes_counter);
extern pj_pool_factory *mem; diff --git a/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c new file mode 100644 index 00000000..5fe6d6f2 --- /dev/null +++ b/pjlib/src/pjlib-test/udp_echo_srv_ioqueue.c @@ -0,0 +1,181 @@ +/* $Id$
+ */
+#include <pjlib.h>
+#include "test.h"
+
+static pj_ioqueue_key_t *key;
+static pj_atomic_t *total_bytes;
+
+struct op_key
+{
+ pj_ioqueue_op_key_t op_key_;
+ struct op_key *peer;
+ char *buffer;
+ pj_size_t size;
+ int is_pending;
+ pj_status_t last_err;
+};
+
+static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_received)
+{
+ pj_status_t rc;
+ struct op_key *recv_rec = (struct op_key *)op_key;
+
+ for (;;) {
+ struct op_key *send_rec = recv_rec->peer;
+ recv_rec->is_pending = 0;
+
+ if (bytes_received < 0) {
+ PJ_LOG(3,("","...error receiving data, received=%d",
+ bytes_received));
+ } else if (bytes_received == 0) {
+ /* note: previous error, or write callback */
+ } else {
+ pj_atomic_add(total_bytes, bytes_received);
+
+ if (!send_rec->is_pending) {
+ pj_ssize_t sent = bytes_received;
+ pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
+ rc = pj_ioqueue_send(key, &send_rec->op_key_,
+ send_rec->buffer, &sent, 0);
+ send_rec->is_pending = (rc==PJ_EPENDING);
+
+ if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
+ app_perror("...send error", rc);
+ }
+ }
+ }
+
+ if (!send_rec->is_pending) {
+ bytes_received = recv_rec->size;
+ rc = pj_ioqueue_recv(key, &recv_rec->op_key_,
+ recv_rec->buffer, &bytes_received, 0);
+ recv_rec->is_pending = (rc==PJ_EPENDING);
+ if (rc == PJ_SUCCESS) {
+ /* fall through next loop. */
+ } else if (rc == PJ_EPENDING) {
+ /* quit callback. */
+ break;
+ } else {
+ /* error */
+ app_perror("...recv error", rc);
+ recv_rec->last_err = rc;
+
+ bytes_received = 0;
+ /* fall through next loop. */
+ }
+ } else {
+ /* recv will be done when write completion callback is called. */
+ break;
+ }
+ }
+}
+
+static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ struct op_key *send_rec = (struct op_key*)op_key;
+
+ if (bytes_sent <= 0) {
+ pj_status_t rc = pj_get_netos_error();
+ app_perror("...send error", rc);
+ }
+
+ send_rec->is_pending = 0;
+ on_read_complete(key, &send_rec->peer->op_key_, 0);
+}
+
+static int worker_thread(void *arg)
+{
+ pj_ioqueue_t *ioqueue = arg;
+ struct op_key read_op, write_op;
+ char recv_buf[512], send_buf[512];
+ pj_ssize_t length;
+ pj_status_t rc;
+
+ read_op.peer = &write_op;
+ read_op.is_pending = 0;
+ read_op.last_err = 0;
+ read_op.buffer = recv_buf;
+ read_op.size = sizeof(recv_buf);
+ write_op.peer = &read_op;
+ write_op.is_pending = 0;
+ write_op.last_err = 0;
+ write_op.buffer = send_buf;
+ write_op.size = sizeof(send_buf);
+
+ length = sizeof(recv_buf);
+ rc = pj_ioqueue_recv(key, &read_op.op_key_, recv_buf, &length, 0);
+ if (rc == PJ_SUCCESS) {
+ read_op.is_pending = 1;
+ on_read_complete(key, &read_op.op_key_, length);
+ }
+
+ for (;;) {
+ pj_time_val timeout;
+ timeout.sec = 0; timeout.msec = 10;
+ rc = pj_ioqueue_poll(ioqueue, &timeout);
+ }
+}
+
+int udp_echo_srv_ioqueue(void)
+{
+ pj_pool_t *pool;
+ pj_sock_t sock;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback callback;
+ int i;
+ pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
+ pj_status_t rc;
+
+ pj_memset(&callback, 0, sizeof(callback));
+ callback.on_read_complete = &on_read_complete;
+ callback.on_write_complete = &on_write_complete;
+
+ pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
+ if (!pool)
+ return -10;
+
+ rc = pj_ioqueue_create(pool, 2, &ioqueue);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...pj_ioqueue_create error", rc);
+ return -20;
+ }
+
+ rc = app_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
+ ECHO_SERVER_START_PORT, &sock);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...app_socket error", rc);
+ return -30;
+ }
+
+ rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
+ &callback, &key);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error registering socket", rc);
+ return -40;
+ }
+
+ rc = pj_atomic_create(pool, 0, &total_bytes);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...error creating atomic variable", rc);
+ return -45;
+ }
+
+ for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
+ rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
+ PJ_THREAD_DEFAULT_STACK_SIZE, 0,
+ &thread[i]);
+ if (rc != PJ_SUCCESS) {
+ app_perror("...create thread error", rc);
+ return -50;
+ }
+ }
+
+ echo_srv_common_loop(total_bytes);
+
+ return 0;
+}
diff --git a/pjlib/src/pjlib-test/udp_echo_srv_sync.c b/pjlib/src/pjlib-test/udp_echo_srv_sync.c index 0e73b134..19ee702c 100644 --- a/pjlib/src/pjlib-test/udp_echo_srv_sync.c +++ b/pjlib/src/pjlib-test/udp_echo_srv_sync.c @@ -8,7 +8,7 @@ static pj_atomic_t *total_bytes; static int worker_thread(void *arg) { pj_sock_t sock = (pj_sock_t)arg; - char buf[1516]; + char buf[512]; pj_status_t last_recv_err = PJ_SUCCESS, last_write_err = PJ_SUCCESS; for (;;) { @@ -48,9 +48,6 @@ int echo_srv_sync(void) pj_sock_t sock; pj_thread_t *thread[ECHO_SERVER_MAX_THREADS]; pj_status_t rc;
- pj_highprec_t last_received, avg_bw, highest_bw;
- pj_time_val last_print;
- unsigned count;
int i; pool = pj_pool_create(mem, NULL, 4000, 4000, NULL); @@ -83,25 +80,36 @@ int echo_srv_sync(void) ECHO_SERVER_MAX_THREADS, ECHO_SERVER_START_PORT)); PJ_LOG(3,("", "...Press Ctrl-C to abort")); + echo_srv_common_loop(total_bytes);
+ return 0; +} + + +int echo_srv_common_loop(pj_atomic_t *bytes_counter)
+{
+ pj_highprec_t last_received, avg_bw, highest_bw;
+ pj_time_val last_print;
+ unsigned count;
+
last_received = 0;
pj_gettimeofday(&last_print);
avg_bw = highest_bw = 0;
- count = 0; - + count = 0;
+
for (;;) {
- pj_highprec_t received, cur_received, bw; + pj_highprec_t received, cur_received, bw;
unsigned msec;
pj_time_val now, duration;
pj_thread_sleep(1000);
- - received = cur_received = pj_atomic_get(total_bytes); +
+ received = cur_received = pj_atomic_get(bytes_counter);
cur_received = cur_received - last_received;
- +
pj_gettimeofday(&now);
duration = now;
PJ_TIME_VAL_SUB(duration, last_print);
- msec = PJ_TIME_VAL_MSEC(duration); + msec = PJ_TIME_VAL_MSEC(duration);
bw = cur_received;
pj_highprec_mul(bw, 1000);
@@ -113,13 +121,13 @@ int echo_srv_sync(void) avg_bw = avg_bw + bw;
count++;
- PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s", - ECHO_SERVER_MAX_THREADS, - (unsigned)(bw / 1000), - (unsigned)(avg_bw / count / 1000), - (count==20 ? "<ses avg>" : ""))); - - if (count==20) { + PJ_LOG(3,("", "Synchronous UDP (%d threads): %u KB/s (avg=%u KB/s) %s",
+ ECHO_SERVER_MAX_THREADS,
+ (unsigned)(bw / 1000),
+ (unsigned)(avg_bw / count / 1000),
+ (count==20 ? "<ses avg>" : "")));
+
+ if (count==20) {
if (avg_bw/count > highest_bw)
highest_bw = avg_bw/count;
@@ -127,9 +135,9 @@ int echo_srv_sync(void) avg_bw = 0;
PJ_LOG(3,("", "Highest average bandwidth=%u KB/s",
- (unsigned)(highest_bw/1000))); - } - } -} - - + (unsigned)(highest_bw/1000)));
+ }
+ }
+}
+
+
|