diff options
author | Benny Prijono <bennylp@teluu.com> | 2008-07-29 20:15:15 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2008-07-29 20:15:15 +0000 |
commit | 8fbda606f3e71e9d0d8b3ae6571db408b9dae48d (patch) | |
tree | 15d7a447e0c07752619e0d74332c59401fc50199 /pjlib | |
parent | 02e33827d8cd91329007d2d2009965222d56ba53 (diff) |
Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2185 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib')
-rw-r--r-- | pjlib/include/pj/activesock.h | 15 | ||||
-rw-r--r-- | pjlib/include/pj/ioqueue.h | 1 | ||||
-rw-r--r-- | pjlib/src/pj/activesock.c | 109 | ||||
-rw-r--r-- | pjlib/src/pjlib-test/activesock.c | 219 |
4 files changed, 342 insertions, 2 deletions
diff --git a/pjlib/include/pj/activesock.h b/pjlib/include/pj/activesock.h index dd5d8270..f465de83 100644 --- a/pjlib/include/pj/activesock.h +++ b/pjlib/include/pj/activesock.h @@ -201,6 +201,21 @@ typedef struct pj_activesock_cfg */ int concurrency; + /** + * If this option is specified, the active socket will make sure that + * asynchronous send operation with stream oriented socket will only + * call the callback after all data has been sent. This means that the + * active socket will automatically resend the remaining data until + * all data has been sent. + * + * Please note that when this option is specified, it is possible that + * error is reported after partial data has been sent. Also setting + * this will disable the ioqueue concurrency for the socket. + * + * Default value is 1. + */ + pj_bool_t whole_data; + } pj_activesock_cfg; diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h index d8df170e..8cfc090c 100644 --- a/pjlib/include/pj/ioqueue.h +++ b/pjlib/include/pj/ioqueue.h @@ -208,6 +208,7 @@ PJ_BEGIN_DECL typedef struct pj_ioqueue_op_key_t { void *internal__[32]; /**< Internal I/O Queue data. */ + void *activesock_data; /**< Active socket data. */ void *user_data; /**< Application data. */ } pj_ioqueue_op_key_t; diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index 45173c5b..4c3985db 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -51,16 +51,27 @@ struct accept_op int rem_addr_len; }; +struct send_data +{ + pj_uint8_t *data; + pj_ssize_t len; + pj_ssize_t sent; + unsigned flags; +}; + struct pj_activesock_t { pj_ioqueue_key_t *key; pj_bool_t stream_oriented; + pj_bool_t whole_data; pj_ioqueue_t *ioqueue; void *user_data; unsigned async_count; unsigned max_loop; pj_activesock_cb cb; + struct send_data send_data; + struct read_op *read_op; pj_uint32_t read_flags; enum read_type read_type; @@ -89,6 +100,7 @@ PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) pj_bzero(cfg, sizeof(*cfg)); cfg->async_cnt = 1; cfg->concurrency = -1; + cfg->whole_data = PJ_TRUE; } @@ -115,6 +127,7 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, asock->ioqueue = ioqueue; asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); asock->async_count = (opt? opt->async_cnt : 1); + asock->whole_data = (opt? opt->whole_data : 1); asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; asock->user_data = user_data; pj_memcpy(&asock->cb, cb, sizeof(*cb)); @@ -134,7 +147,10 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, return status; } - if (opt && opt->concurrency >= 0) { + if (asock->whole_data) { + /* Must disable concurrency otherwise there is a race condition */ + pj_ioqueue_set_concurrency(asock->key, 0); + } else if (opt && opt->concurrency >= 0) { pj_ioqueue_set_concurrency(asock->key, opt->concurrency); } @@ -443,6 +459,35 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, } +static pj_status_t send_remaining(pj_activesock_t *asock, + pj_ioqueue_op_key_t *send_key) +{ + struct send_data *sd = (struct send_data*)send_key->activesock_data; + pj_status_t status; + + do { + pj_ssize_t size; + + size = sd->len - sd->sent; + status = pj_ioqueue_send(asock->key, send_key, + sd->data+sd->sent, &size, sd->flags); + if (status != PJ_SUCCESS) { + /* Pending or error */ + break; + } + + sd->sent += size; + if (sd->sent == sd->len) { + /* The whole data has been sent. */ + return PJ_SUCCESS; + } + + } while (sd->sent < sd->len); + + return status; +} + + PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key, const void *data, @@ -451,7 +496,42 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, { PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); - return pj_ioqueue_send(asock->key, send_key, data, size, flags); + send_key->activesock_data = NULL; + + if (asock->whole_data) { + pj_ssize_t whole; + pj_status_t status; + + whole = *size; + + status = pj_ioqueue_send(asock->key, send_key, data, size, flags); + if (status != PJ_SUCCESS) { + /* Pending or error */ + return status; + } + + if (*size == whole) { + /* The whole data has been sent. */ + return PJ_SUCCESS; + } + + /* Data was partially sent */ + asock->send_data.data = (pj_uint8_t*)data; + asock->send_data.len = whole; + asock->send_data.sent = *size; + asock->send_data.flags = flags; + send_key->activesock_data = &asock->send_data; + + /* Try again */ + status = send_remaining(asock, send_key); + if (status == PJ_SUCCESS) { + *size = whole; + } + return status; + + } else { + return pj_ioqueue_send(asock->key, send_key, data, size, flags); + } } @@ -479,6 +559,31 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + if (bytes_sent > 0 && op_key->activesock_data) { + /* whole_data is requested. Make sure we send all the data */ + struct send_data *sd = (struct send_data*)op_key->activesock_data; + + sd->sent += bytes_sent; + if (sd->sent == sd->len) { + /* all has been sent */ + bytes_sent = sd->sent; + op_key->activesock_data = NULL; + } else { + /* send remaining data */ + pj_status_t status; + + status = send_remaining(asock, op_key); + if (status == PJ_EPENDING) + return; + else if (status == PJ_SUCCESS) + bytes_sent = sd->sent; + else + bytes_sent = -status; + + op_key->activesock_data = NULL; + } + } + if (asock->cb.on_data_sent) { pj_bool_t ret; diff --git a/pjlib/src/pjlib-test/activesock.c b/pjlib/src/pjlib-test/activesock.c index e106b71d..2eedca0a 100644 --- a/pjlib/src/pjlib-test/activesock.c +++ b/pjlib/src/pjlib-test/activesock.c @@ -246,15 +246,234 @@ on_return: } + +#define SIGNATURE 0xdeadbeef +struct tcp_pkt +{ + pj_uint32_t signature; + pj_uint32_t seq; + char fill[513]; +}; + +struct tcp_state +{ + pj_bool_t err; + pj_bool_t sent; + pj_uint32_t next_recv_seq; + pj_uint8_t pkt[600]; +}; + +struct send_key +{ + pj_ioqueue_op_key_t op_key; +}; + + +static pj_bool_t tcp_on_data_read(pj_activesock_t *asock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) +{ + struct tcp_state *st = (struct tcp_state*) pj_activesock_get_user_data(asock); + char *next = (char*) data; + + if (status != PJ_SUCCESS && status != PJ_EPENDING) { + PJ_LOG(1,("", " err: status=%d", status)); + st->err = PJ_TRUE; + return PJ_FALSE; + } + + while (size >= sizeof(struct tcp_pkt)) { + struct tcp_pkt *tcp_pkt = (struct tcp_pkt*) next; + + if (tcp_pkt->signature != SIGNATURE) { + PJ_LOG(1,("", " err: invalid signature at seq=%d", + st->next_recv_seq)); + st->err = PJ_TRUE; + return PJ_FALSE; + } + if (tcp_pkt->seq != st->next_recv_seq) { + PJ_LOG(1,("", " err: wrong sequence")); + st->err = PJ_TRUE; + return PJ_FALSE; + } + + st->next_recv_seq++; + next += sizeof(struct tcp_pkt); + size -= sizeof(struct tcp_pkt); + } + + if (size) { + pj_memmove(data, next, size); + *remainder = size; + } + + return PJ_TRUE; +} + +static pj_bool_t tcp_on_data_sent(pj_activesock_t *asock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct tcp_state *st=(struct tcp_state*)pj_activesock_get_user_data(asock); + + st->sent = 1; + + if (sent < 1) { + st->err = PJ_TRUE; + return PJ_FALSE; + } + + return PJ_TRUE; +} + +static int tcp_perf_test(void) +{ + enum { COUNT=100000 }; + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_sock_t sock1=PJ_INVALID_SOCKET, sock2=PJ_INVALID_SOCKET; + pj_activesock_t *asock1 = NULL, *asock2 = NULL; + pj_activesock_cb cb; + struct tcp_state *state1, *state2; + unsigned i; + pj_status_t status; + + pool = pj_pool_create(mem, "tcpperf", 256, 256, NULL); + + status = app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock1, + &sock2); + if (status != PJ_SUCCESS) { + status = -100; + goto on_return; + } + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + status = -110; + goto on_return; + } + + pj_bzero(&cb, sizeof(cb)); + cb.on_data_read = &tcp_on_data_read; + cb.on_data_sent = &tcp_on_data_sent; + + state1 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); + status = pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), NULL, ioqueue, + &cb, state1, &asock1); + if (status != PJ_SUCCESS) { + status = -120; + goto on_return; + } + + state2 = PJ_POOL_ZALLOC_T(pool, struct tcp_state); + status = pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), NULL, ioqueue, + &cb, state2, &asock2); + if (status != PJ_SUCCESS) { + status = -130; + goto on_return; + } + + status = pj_activesock_start_read(asock1, pool, 1000, 0); + if (status != PJ_SUCCESS) { + status = -140; + goto on_return; + } + + /* Send packet as quickly as possible */ + for (i=0; i<COUNT && !state1->err && !state2->err; ++i) { + struct tcp_pkt *pkt; + struct send_key send_key[2], *op_key; + pj_ssize_t len; + + pkt = (struct tcp_pkt*)state2->pkt; + pkt->signature = SIGNATURE; + pkt->seq = i; + pj_memset(pkt->fill, 'a', sizeof(pkt->fill)); + + op_key = &send_key[i%2]; + pj_ioqueue_op_key_init(&op_key->op_key, sizeof(*op_key)); + + state2->sent = PJ_FALSE; + len = sizeof(*pkt); + status = pj_activesock_send(asock2, &op_key->op_key, pkt, &len, 0); + if (status == PJ_EPENDING) { + do { + pj_ioqueue_poll(ioqueue, NULL); + } while (!state2->sent); + } else if (status != PJ_SUCCESS) { + PJ_LOG(1,("", " err: send status=%d", status)); + status = -180; + break; + } else if (status == PJ_SUCCESS) { + if (len != sizeof(*pkt)) { + PJ_LOG(1,("", " err: shouldn't report partial sent")); + status = -190; + break; + } + } + } + + /* Wait until everything has been sent/received */ + if (state1->next_recv_seq < COUNT) { + pj_time_val delay = {0, 100}; + while (pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (status == PJ_EPENDING) + status = PJ_SUCCESS; + + if (status != 0) + goto on_return; + + if (state1->err) { + status = -183; + goto on_return; + } + if (state2->err) { + status = -186; + goto on_return; + } + if (state1->next_recv_seq != COUNT) { + PJ_LOG(3,("", " err: only %u packets received, expecting %u", + state1->next_recv_seq, COUNT)); + status = -195; + goto on_return; + } + +on_return: + if (asock2) + pj_activesock_close(asock2); + if (asock1) + pj_activesock_close(asock1); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return status; +} + + + int activesock_test(void) { int ret; + ret = (int)&udp_ping_pong_test; + PJ_LOG(3,("", "..udp ping/pong test")); ret = udp_ping_pong_test(); if (ret != 0) return ret; + PJ_LOG(3,("", "..tcp perf test")); + ret = tcp_perf_test(); + if (ret != 0) + return ret; + return 0; } |