diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
commit | 7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch) | |
tree | 58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pjlib-test/ioq_tcp.c | |
parent | 58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff) |
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test/ioq_tcp.c')
-rw-r--r-- | pjlib/src/pjlib-test/ioq_tcp.c | 172 |
1 files changed, 119 insertions, 53 deletions
diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index ebce633b..fd5329e5 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -31,33 +31,45 @@ #define SOCK_INACTIVE_MAX (PJ_IOQUEUE_MAX_HANDLES - 2) #define POOL_SIZE (2*BUF_MAX_SIZE + SOCK_INACTIVE_MAX*128 + 2048) -static pj_ssize_t callback_read_size, - callback_write_size, - callback_accept_status, - callback_connect_status; -static pj_ioqueue_key_t*callback_read_key, - *callback_write_key, - *callback_accept_key, - *callback_connect_key; - -static void on_ioqueue_read(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static pj_ssize_t callback_read_size, + callback_write_size, + callback_accept_status, + callback_connect_status; +static pj_ioqueue_key_t *callback_read_key, + *callback_write_key, + *callback_accept_key, + *callback_connect_key;
+static pj_ioqueue_op_key_t *callback_read_op,
+ *callback_write_op,
+ *callback_accept_op; + +static void on_ioqueue_read(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { - callback_read_key = key; + callback_read_key = key;
+ callback_read_op = op_key; callback_read_size = bytes_read; } -static void on_ioqueue_write(pj_ioqueue_key_t *key, pj_ssize_t bytes_written) +static void on_ioqueue_write(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_written) { - callback_write_key = key; + callback_write_key = key;
+ callback_write_op = op_key; callback_write_size = bytes_written; } -static void on_ioqueue_accept(pj_ioqueue_key_t *key, pj_sock_t sock, +static void on_ioqueue_accept(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock, int status) { PJ_UNUSED_ARG(sock); - callback_accept_key = key; + callback_accept_key = key;
+ callback_accept_op = op_key; callback_accept_status = status; } @@ -83,28 +95,38 @@ static int send_recv_test(pj_ioqueue_t *ioque, pj_ssize_t bufsize, pj_timestamp *t_elapsed) { - int rc; - pj_ssize_t bytes; + pj_status_t status; + pj_ssize_t bytes;
+ pj_time_val timeout; pj_timestamp t1, t2; - int pending_op = 0; - - // Start reading on the server side. - rc = pj_ioqueue_read(ioque, skey, recv_buf, bufsize); - if (rc != 0 && rc != PJ_EPENDING) { + int pending_op = 0;
+ pj_ioqueue_op_key_t read_op, write_op; + + // Start reading on the server side.
+ bytes = bufsize; + status = pj_ioqueue_recv(skey, &read_op, recv_buf, &bytes, 0); + if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ app_perror("...pj_ioqueue_recv error", status); return -100; } - - ++pending_op; +
+ if (status == PJ_EPENDING) + ++pending_op;
+ else {
+ /* Does not expect to return error or immediate data. */
+ return -115;
+ } // Randomize send buffer. pj_create_random_string((char*)send_buf, bufsize); - // Starts send on the client side. - bytes = pj_ioqueue_write(ioque, ckey, send_buf, bufsize); - if (bytes != bufsize && bytes != PJ_EPENDING) { + // Starts send on the client side.
+ bytes = bufsize; + status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0); + if (status != PJ_SUCCESS && bytes != PJ_EPENDING) { return -120; } - if (bytes == PJ_EPENDING) { + if (status == PJ_EPENDING) { ++pending_op; } @@ -113,37 +135,52 @@ static int send_recv_test(pj_ioqueue_t *ioque, // Reset indicators callback_read_size = callback_write_size = 0; - callback_read_key = callback_write_key = NULL; + callback_read_key = callback_write_key = NULL;
+ callback_read_op = callback_write_op = NULL; // Poll the queue until we've got completion event in the server side. - rc = 0; - while (pending_op > 0) { - rc = pj_ioqueue_poll(ioque, NULL); - if (rc > 0) { + status = 0; + while (pending_op > 0) {
+ timeout.sec = 1; timeout.msec = 0; + status = pj_ioqueue_poll(ioque, &timeout); + if (status > 0) { if (callback_read_size) { - if (callback_read_size != bufsize) { + if (callback_read_size != bufsize) return -160; - } if (callback_read_key != skey) - return -161; + return -161;
+ if (callback_read_op != &read_op)
+ return -162; } if (callback_write_size) { if (callback_write_key != ckey) - return -162; + return -163;
+ if (callback_write_op != &write_op)
+ return -164; } - pending_op -= rc; - } - if (rc < 0) { + pending_op -= status; + }
+ if (status == 0) {
+ PJ_LOG(3,("", "...error: timed out"));
+ } + if (status < 0) { return -170; } } +
+ // Pending op is zero.
+ // Subsequent poll should yield zero too.
+ timeout.sec = timeout.msec = 0;
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0)
+ return -173;
// End time. pj_get_timestamp(&t2); t_elapsed->u32.lo += (t2.u32.lo - t1.u32.lo); - if (rc < 0) { - return -150; + if (status < 0) { + return -176; } // Compare recv buffer with send buffer. @@ -167,7 +204,8 @@ static int compliance_test_0(void) pj_pool_t *pool = NULL; char *send_buf, *recv_buf; pj_ioqueue_t *ioque = NULL; - pj_ioqueue_key_t *skey, *ckey0, *ckey1; + pj_ioqueue_key_t *skey, *ckey0, *ckey1;
+ pj_ioqueue_op_key_t accept_op; int bufsize = BUF_MIN_SIZE; pj_ssize_t status = -1; int pending_op = 0; @@ -205,7 +243,7 @@ static int compliance_test_0(void) } // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (rc != PJ_SUCCESS) { app_perror("...ERROR in pj_ioqueue_create()", rc); status=-20; goto on_error; @@ -231,7 +269,8 @@ static int compliance_test_0(void) // Server socket accept() client_addr_len = sizeof(pj_sockaddr_in); - status = pj_ioqueue_accept(ioque, skey, &csock0, &client_addr, &rmt_addr, &client_addr_len); + status = pj_ioqueue_accept(skey, &accept_op, &csock0,
+ &client_addr, &rmt_addr, &client_addr_len); if (status != PJ_EPENDING) { app_perror("...ERROR in pj_ioqueue_accept()", rc); status=-30; goto on_error; @@ -247,7 +286,7 @@ static int compliance_test_0(void) addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); // Client socket connect() - status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); + status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); if (status!=PJ_SUCCESS && status != PJ_EPENDING) { app_perror("...ERROR in pj_ioqueue_connect()", rc); status=-40; goto on_error; @@ -262,6 +301,7 @@ static int compliance_test_0(void) callback_read_key = callback_write_key = callback_accept_key = callback_connect_key = NULL; + callback_accept_op = callback_read_op = callback_write_op = NULL;
while (pending_op) { pj_time_val timeout = {1, 0}; @@ -273,8 +313,12 @@ static int compliance_test_0(void) status=-41; goto on_error; } if (callback_accept_key != skey) { - status=-41; goto on_error; - } + status=-42; goto on_error; + }
+ if (callback_accept_op != &accept_op) {
+ status=-43; goto on_error;
+ }
+ callback_accept_status = -2; } if (callback_connect_status != -2) { @@ -283,7 +327,8 @@ static int compliance_test_0(void) } if (callback_connect_key != ckey1) { status=-51; goto on_error; - } + }
+ callback_connect_status = -2; } pending_op -= status; @@ -293,6 +338,16 @@ static int compliance_test_0(void) } } } +
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
// Check accepted socket. if (csock0 == PJ_INVALID_SOCKET) { @@ -312,7 +367,8 @@ static int compliance_test_0(void) // Test send and receive. t_elapsed.u32.lo = 0; - status = send_recv_test(ioque, ckey0, ckey1, send_buf, recv_buf, bufsize, &t_elapsed); + status = send_recv_test(ioque, ckey0, ckey1, send_buf,
+ recv_buf, bufsize, &t_elapsed); if (status != 0) { goto on_error; } @@ -354,7 +410,7 @@ static int compliance_test_1(void) pool = pj_pool_create(mem, NULL, POOL_SIZE, 4000, NULL); // Create I/O Queue. - rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, 0, &ioque); + rc = pj_ioqueue_create(pool, PJ_IOQUEUE_MAX_HANDLES, &ioque); if (!ioque) { status=-20; goto on_error; } @@ -381,7 +437,7 @@ static int compliance_test_1(void) addr.sin_addr = pj_inet_addr(pj_cstr(&s, "127.0.0.1")); // Client socket connect() - status = pj_ioqueue_connect(ioque, ckey1, &addr, sizeof(addr)); + status = pj_ioqueue_connect(ckey1, &addr, sizeof(addr)); if (status==PJ_SUCCESS) { // unexpectedly success! status = -30; @@ -416,7 +472,17 @@ static int compliance_test_1(void) } } } - +
+ // There's no pending operation.
+ // When we poll the ioqueue, there must not be events.
+ if (pending_op == 0) {
+ pj_time_val timeout = {1, 0};
+ status = pj_ioqueue_poll(ioque, &timeout);
+ if (status != 0) {
+ status=-60; goto on_error;
+ }
+ }
+
// Success status = 0; |