diff options
author | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2005-11-06 09:37:47 +0000 |
commit | 7c7300624eb867fa7c1ea52b9c636889aac60e80 (patch) | |
tree | 58460baa296e7eb6bd775d060f2a1e960717f565 /pjlib/src/pjlib-test/ioq_perf.c | |
parent | 58aee2809c36f43a3b66dac7d9db5d13070114b9 (diff) |
Changed ioqueue to allow simultaneous operations on the same key
git-svn-id: http://svn.pjsip.org/repos/pjproject/main@11 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pjlib-test/ioq_perf.c')
-rw-r--r-- | pjlib/src/pjlib-test/ioq_perf.c | 178 |
1 files changed, 107 insertions, 71 deletions
diff --git a/pjlib/src/pjlib-test/ioq_perf.c b/pjlib/src/pjlib-test/ioq_perf.c index cb93c4cf..4cd11068 100644 --- a/pjlib/src/pjlib-test/ioq_perf.c +++ b/pjlib/src/pjlib-test/ioq_perf.c @@ -34,77 +34,105 @@ static unsigned last_error_counter; /* Descriptor for each producer/consumer pair. */ typedef struct test_item { - pj_sock_t server_fd, - client_fd; - pj_ioqueue_t *ioqueue; - pj_ioqueue_key_t *server_key, - *client_key; - pj_size_t buffer_size; - char *outgoing_buffer; - char *incoming_buffer; - pj_size_t bytes_sent, - bytes_recv; + pj_sock_t server_fd, + client_fd; + pj_ioqueue_t *ioqueue; + pj_ioqueue_key_t *server_key, + *client_key;
+ pj_ioqueue_op_key_t recv_op,
+ send_op;
+ int has_pending_send; + pj_size_t buffer_size; + char *outgoing_buffer; + char *incoming_buffer; + pj_size_t bytes_sent, + bytes_recv; } test_item; /* Callback when data has been read. * Increment item->bytes_recv and ready to read the next data. */ -static void on_read_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_read) +static void on_read_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read) { test_item *item = pj_ioqueue_get_user_data(key); - pj_status_t rc; + pj_status_t rc;
+ int data_is_available = 1; //TRACE_((THIS_FILE, " read complete, bytes_read=%d", bytes_read)); +
+ do { + if (thread_quit_flag) + return; + + if (bytes_read < 0) { + pj_status_t rc = -bytes_read; + char errmsg[128]; + + if (rc != last_error) { + last_error = rc; + pj_strerror(rc, errmsg, sizeof(errmsg)); + PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", + bytes_read, errmsg)); + PJ_LOG(3,(THIS_FILE, + ".....additional info: total read=%u, total written=%u", + item->bytes_recv, item->bytes_sent)); + } else { + last_error_counter++; + } + bytes_read = 0; + + } else if (bytes_read == 0) { + PJ_LOG(3,(THIS_FILE, "...socket has closed!")); + } - if (thread_quit_flag) - return; - - if (bytes_read < 0) { - pj_status_t rc = -bytes_read; - char errmsg[128]; - - if (rc != last_error) { - last_error = rc; - pj_strerror(rc, errmsg, sizeof(errmsg)); - PJ_LOG(3,(THIS_FILE, "...error: read error, bytes_read=%d (%s)", - bytes_read, errmsg)); - PJ_LOG(3,(THIS_FILE, - ".....additional info: total read=%u, total written=%u", - item->bytes_recv, item->bytes_sent)); - } else { - last_error_counter++; - } - bytes_read = 0; - - } else if (bytes_read == 0) { - PJ_LOG(3,(THIS_FILE, "...socket has closed!")); - } - - item->bytes_recv += bytes_read; + item->bytes_recv += bytes_read; - /* To assure that the test quits, even if main thread - * doesn't have time to run. - */ - if (item->bytes_recv > item->buffer_size * 10000) - thread_quit_flag = 1; - - rc = pj_ioqueue_recv( item->ioqueue, item->server_key, - item->incoming_buffer, item->buffer_size, 0 ); - - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { - if (rc != last_error) { - last_error = rc; - app_perror("...error: read error", rc); - } else { - last_error_counter++; - } - } + /* To assure that the test quits, even if main thread + * doesn't have time to run. + */ + if (item->bytes_recv > item->buffer_size * 10000) + thread_quit_flag = 1; +
+ bytes_read = item->buffer_size; + rc = pj_ioqueue_recv( key, op_key, + item->incoming_buffer, &bytes_read, 0 ); + + if (rc == PJ_SUCCESS) {
+ data_is_available = 1;
+ } else if (rc == PJ_EPENDING) {
+ data_is_available = 0;
+ } else {
+ data_is_available = 0; + if (rc != last_error) { + last_error = rc; + app_perror("...error: read error", rc); + } else { + last_error_counter++; + } + }
+
+ if (!item->has_pending_send) {
+ pj_ssize_t sent = item->buffer_size;
+ rc = pj_ioqueue_send(item->client_key, &item->send_op,
+ item->outgoing_buffer, &sent, 0);
+ if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
+ app_perror("...error: write error", rc);
+ }
+
+ item->has_pending_send = (rc==PJ_EPENDING);
+ }
+
+ } while (data_is_available); } /* Callback when data has been written. * Increment item->bytes_sent and write the next data. */ -static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) +static void on_write_complete(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent) { test_item *item = pj_ioqueue_get_user_data(key); @@ -112,7 +140,8 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) if (thread_quit_flag) return; - +
+ item->has_pending_send = 0; item->bytes_sent += bytes_sent; if (bytes_sent <= 0) { @@ -121,12 +150,15 @@ static void on_write_complete(pj_ioqueue_key_t *key, pj_ssize_t bytes_sent) } else { pj_status_t rc; - - rc = pj_ioqueue_write(item->ioqueue, item->client_key, - item->outgoing_buffer, item->buffer_size); +
+ bytes_sent = item->buffer_size; + rc = pj_ioqueue_send( item->client_key, op_key, + item->outgoing_buffer, &bytes_sent, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: write error", rc); - } + }
+
+ item->has_pending_send = (rc==PJ_EPENDING); } } @@ -191,7 +223,7 @@ static int perform_test(int sock_type, const char *type_name, thread = pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*)); TRACE_((THIS_FILE, " creating ioqueue..")); - rc = pj_ioqueue_create(pool, sockpair_cnt*2, thread_cnt, &ioqueue); + rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue); if (rc != PJ_SUCCESS) { app_perror("...error: unable to create ioqueue", rc); return -15; @@ -199,6 +231,7 @@ static int perform_test(int sock_type, const char *type_name, /* Initialize each producer-consumer pair. */ for (i=0; i<sockpair_cnt; ++i) { + pj_ssize_t bytes;
items[i].ioqueue = ioqueue; items[i].buffer_size = buffer_size; @@ -241,24 +274,27 @@ static int perform_test(int sock_type, const char *type_name, } /* Start reading. */ - TRACE_((THIS_FILE, " pj_ioqueue_recv..")); - rc = pj_ioqueue_recv(ioqueue, items[i].server_key, - items[i].incoming_buffer, items[i].buffer_size, + TRACE_((THIS_FILE, " pj_ioqueue_recv.."));
+ bytes = items[i].buffer_size; + rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op, + items[i].incoming_buffer, &bytes, 0); - if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { + if (rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_recv", rc); return -73; } /* Start writing. */ - TRACE_((THIS_FILE, " pj_ioqueue_write..")); - rc = pj_ioqueue_write(ioqueue, items[i].client_key, - items[i].outgoing_buffer, items[i].buffer_size); + TRACE_((THIS_FILE, " pj_ioqueue_write.."));
+ bytes = items[i].buffer_size; + rc = pj_ioqueue_send(items[i].client_key, &items[i].recv_op, + items[i].outgoing_buffer, &bytes, 0); if (rc != PJ_SUCCESS && rc != PJ_EPENDING) { app_perror("...error: pj_ioqueue_write", rc); return -76; } - +
+ items[i].has_pending_send = (rc==PJ_EPENDING); } /* Create the threads. */ @@ -324,8 +360,8 @@ static int perform_test(int sock_type, const char *type_name, /* Close all sockets. */ TRACE_((THIS_FILE, " closing all sockets..")); for (i=0; i<sockpair_cnt; ++i) { - pj_ioqueue_unregister(ioqueue, items[i].server_key); - pj_ioqueue_unregister(ioqueue, items[i].client_key); + pj_ioqueue_unregister(items[i].server_key); + pj_ioqueue_unregister(items[i].client_key); pj_sock_close(items[i].server_fd); pj_sock_close(items[i].client_fd); } |