diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
commit | 7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch) | |
tree | 58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pjlib-test/ioq_udp.c | |
parent | 58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff) |
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test/ioq_udp.c')
-rw-r--r-- | pjlib/src/pjlib-test/ioq_udp.c | 157 |
1 files changed, 77 insertions, 80 deletions
diff --git a/pjlib/src/pjlib-test/ioq_udp.c b/pjlib/src/pjlib-test/ioq_udp.c index a59dac88..6ee90e42 100644 --- a/pjlib/src/pjlib-test/ioq_udp.c +++ b/pjlib/src/pjlib-test/ioq_udp.c @@ -34,31 +34,43 @@ #undef TRACE_ #define TRACE_(msg) PJ_LOG(3,(THIS_FILE,"....." msg)) -static pj_ssize_t callback_read_size, - callback_write_size, - callback_accept_status, - callback_connect_status; -static pj_ioqueue_key_t *callback_read_key, - *callback_write_key, - *callback_accept_key, - *callback_connect_key; - -static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static pj_ssize_t callback_read_size, + callback_write_size, + callback_accept_status, + callback_connect_status; +static pj_ioqueue_key_t *callback_read_key, + *callback_write_key, + *callback_accept_key, + *callback_connect_key; +static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op;
+ +static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { - callback_read_key = key; + callback_read_key = key;
+ callback_read_op = op_key; callback_read_size = bytes_read; } -static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) +static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written) { - callback_write_key = key; + callback_write_key = key;
+ callback_write_op = op_key; callback_write_size = bytes_written; } -static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, int status) +static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status) { PJ_UNUSED_ARG(sock); - callback_accept_key = key; + callback_accept_key = key;
+ callback_accept_op = op_key; callback_accept_status = status; } @@ -83,29 +95,6 @@ static pj_ioqueue_callback test_cb = #endif /* - * native_format_test() - * This is just a simple test to verify that various structures in sock.h - * are really compatible with operating system's definitions. - */ -static int native_format_test(void) -{ - pj_status_t rc; - - // Test that PJ_INVALID_SOCKET is working. - { - pj_sock_t sock; - rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, -1, &sock); - if (rc == PJ_SUCCESS) - return -1020; - } - - // Previous func will set errno var. - pj_set_os_error(PJ_SUCCESS); - - return 0; -} - -/* * compliance_test() * To test that the basic IOQueue functionality works. It will just exchange * data between two sockets. @@ -118,7 +107,8 @@ static int compliance_test(void) pj_pool_t *pool = NULL; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; - pj_ioqueue_key_t *skey, *ckey; + pj_ioqueue_key_t *skey, *ckey;
+ pj_ioqueue_op_key_t read_op, write_op; int bufsize = BUF_MIN_SIZE; pj_ssize_t bytes, status = -1; pj_str_t temp; @@ -157,8 +147,7 @@ static int compliance_test(void) // Create I/O Queue. TRACE_("create ioqueue..."); - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, - PJ_IOQUEUE_DEFAULT_THREADS, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { status=-20; goto on_error; } @@ -194,12 +183,14 @@ static int compliance_test(void) // Register reading from ioqueue. TRACE_("start recvfrom..."); - addrlen = sizeof(addr); - bytes = pj_ioqueue_recvfrom(ioque, skey, recv_buf, bufsize, 0, - &addr, &addrlen); - if (bytes < 0 && bytes != PJ_EPENDING) { + addrlen = sizeof(addr);
+ bytes = bufsize; + rc = pj_ioqueue_recvfrom(skey, &read_op, recv_buf, &bytes, 0, + &addr, &addrlen); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_recvfrom", rc); status=-28; goto on_error; - } else if (bytes == PJ_EPENDING) { + } else if (rc == PJ_EPENDING) { recv_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: recvfrom returned pending")); @@ -210,14 +201,14 @@ static int compliance_test(void) } // Write must return the number of bytes. - TRACE_("start sendto..."); - bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, &addr, - sizeof(addr)); - if (bytes != bufsize && bytes != PJ_EPENDING) { - PJ_LOG(1,(THIS_FILE, - "......error: sendto returned %d", bytes)); + TRACE_("start sendto...");
+ bytes = bufsize; + rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, &addr, + sizeof(addr)); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: pj_ioqueue_sendto", rc); status=-30; goto on_error; - } else if (bytes == PJ_EPENDING) { + } else if (rc == PJ_EPENDING) { send_pending = 1; PJ_LOG(3, (THIS_FILE, "......ok: sendto returned pending")); @@ -232,9 +223,10 @@ static int compliance_test(void) callback_accept_status = callback_connect_status = -2; callback_read_key = callback_write_key = callback_accept_key = callback_connect_key = NULL; + callback_read_op = callback_write_op = NULL;
// Poll if pending. - while (send_pending && recv_pending) { + while (send_pending || recv_pending) { int rc; pj_time_val timeout = { 5, 0 }; @@ -253,9 +245,11 @@ static int compliance_test(void) if (callback_read_size != bufsize) { status=-61; goto on_error; } - if (callback_read_key != skey) { status=-65; goto on_error; + }
+ if (callback_read_op != &read_op) {
+ status=-66; goto on_error;
} if (memcmp(send_buf, recv_buf, bufsize) != 0) { @@ -270,9 +264,11 @@ static int compliance_test(void) if (callback_write_size != bufsize) { status=-73; goto on_error; } - if (callback_write_key != ckey) { status=-75; goto on_error; + }
+ if (callback_write_op != &write_op) {
+ status=-76; goto on_error;
} send_pending = 0; @@ -326,9 +322,7 @@ static int many_handles_test(void) sock = pj_pool_alloc(pool, MAX*sizeof(pj_sock_t)); /* Create IOQueue */ - rc = pj_ioqueue_create(pool, MAX, - PJ_IOQUEUE_DEFAULT_THREADS, - &ioqueue); + rc = pj_ioqueue_create(pool, MAX, &ioqueue); if (rc != PJ_SUCCESS || ioqueue == NULL) { app_perror("...error in pj_ioqueue_create", rc); return -10; @@ -358,7 +352,7 @@ static int many_handles_test(void) /* Now deregister and close all handles. */ for (i=0; i<count; ++i) { - rc = pj_ioqueue_unregister(ioqueue, key[i]); + rc = pj_ioqueue_unregister(key[i]); if (rc != PJ_SUCCESS) { app_perror("...error in pj_ioqueue_unregister", rc); } @@ -392,7 +386,8 @@ static int bench_test(int bufsize, int inactive_sock_count) pj_sock_t ssock=-1, csock=-1; pj_sockaddr_in addr; pj_pool_t *pool = NULL; - pj_sock_t *inactive_sock=NULL; + pj_sock_t *inactive_sock=NULL;
+ pj_ioqueue_op_key_t *inactive_read_op; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; pj_ioqueue_key_t *skey, *ckey, *key; @@ -429,8 +424,7 @@ static int bench_test(int bufsize, int inactive_sock_count) pj_assert(inactive_sock_count+2 <= PJ_IOQUEUE_MAX_HANDLES); // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, - PJ_IOQUEUE_DEFAULT_THREADS, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { app_perror("...error: pj_ioqueue_create()", rc); goto on_error; @@ -439,10 +433,14 @@ static int bench_test(int bufsize, int inactive_sock_count) // Allocate inactive sockets, and bind them to some arbitrary address. // Then register them to the I/O queue, and start a read operation. inactive_sock = (pj_sock_t*)pj_pool_alloc(pool, - inactive_sock_count*sizeof(pj_sock_t)); + inactive_sock_count*sizeof(pj_sock_t));
+ inactive_read_op = (pj_ioqueue_op_key_t*)pj_pool_alloc(pool,
+ inactive_sock_count*sizeof(pj_ioqueue_op_key_t)); memset(&addr, 0, sizeof(addr)); addr.sin_family = PJ_AF_INET; - for (i=0; i<inactive_sock_count; ++i) { + for (i=0; i<inactive_sock_count; ++i) {
+ pj_ssize_t bytes;
+ rc = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &inactive_sock[i]); if (rc != PJ_SUCCESS || inactive_sock[i] < 0) { app_perror("...error: pj_sock_socket()", rc); @@ -462,8 +460,9 @@ static int bench_test(int bufsize, int inactive_sock_count) app_perror("...error(1): pj_ioqueue_register_sock()", rc); PJ_LOG(3,(THIS_FILE, "....i=%d", i)); goto on_error; - } - rc = pj_ioqueue_read(ioque, key, recv_buf, bufsize); + }
+ bytes = bufsize; + rc = pj_ioqueue_recv(key, &inactive_read_op[i], recv_buf, &bytes, 0); if ( rc < 0 && rc != PJ_EPENDING) { pj_sock_close(inactive_sock[i]); inactive_sock[i] = PJ_INVALID_SOCKET; @@ -495,22 +494,25 @@ static int bench_test(int bufsize, int inactive_sock_count) // Test loop. t_elapsed.u64 = 0; for (i=0; i<LOOP; ++i) { - pj_ssize_t bytes; + pj_ssize_t bytes;
+ pj_ioqueue_op_key_t read_op, write_op; // Randomize send buffer. pj_create_random_string(send_buf, bufsize); - // Start reading on the server side. - rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); + // Start reading on the server side.
+ bytes = bufsize; + rc = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); if (rc < 0 && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_read()", rc); break; } - // Starts send on the client side. - bytes = pj_ioqueue_sendto(ioque, ckey, send_buf, bufsize, 0, - &addr, sizeof(addr)); - if (bytes != bufsize && bytes != PJ_EPENDING) { + // Starts send on the client side.
+ bytes = bufsize; + rc = pj_ioqueue_sendto(ckey, &write_op, send_buf, &bytes, 0, + &addr, sizeof(addr)); + if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_write()", bytes); rc = -1; break; @@ -585,7 +587,7 @@ on_error: pj_sock_close(csock); for (i=0; i<inactive_sock_count && inactive_sock && inactive_sock[i]!=PJ_INVALID_SOCKET; ++i) - { + {
pj_sock_close(inactive_sock[i]); } if (ioque != NULL) @@ -599,11 +601,6 @@ int udp_ioqueue_test() int status; int bufsize, sock_count; - PJ_LOG(3, (THIS_FILE, "...format test")); - if ((status = native_format_test()) != 0) - return status; - PJ_LOG(3, (THIS_FILE, "....native format test ok")); - PJ_LOG(3, (THIS_FILE, "...compliance test")); if ((status=compliance_test()) != 0) { return status; |