diff options
author | Benny Prijono <bennylp@teluu.com> | 2008-07-29 20:15:15 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2008-07-29 20:15:15 +0000 |
commit | 8fbda606f3e71e9d0d8b3ae6571db408b9dae48d (patch) | |
tree | 15d7a447e0c07752619e0d74332c59401fc50199 /pjlib/src/pj/activesock.c | |
parent | 02e33827d8cd91329007d2d2009965222d56ba53 (diff) |
Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2185 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjlib/src/pj/activesock.c')
-rw-r--r-- | pjlib/src/pj/activesock.c | 109 |
1 files changed, 107 insertions, 2 deletions
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index 45173c5b..4c3985db 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -51,16 +51,27 @@ struct accept_op int rem_addr_len; }; +struct send_data +{ + pj_uint8_t *data; + pj_ssize_t len; + pj_ssize_t sent; + unsigned flags; +}; + struct pj_activesock_t { pj_ioqueue_key_t *key; pj_bool_t stream_oriented; + pj_bool_t whole_data; pj_ioqueue_t *ioqueue; void *user_data; unsigned async_count; unsigned max_loop; pj_activesock_cb cb; + struct send_data send_data; + struct read_op *read_op; pj_uint32_t read_flags; enum read_type read_type; @@ -89,6 +100,7 @@ PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg) pj_bzero(cfg, sizeof(*cfg)); cfg->async_cnt = 1; cfg->concurrency = -1; + cfg->whole_data = PJ_TRUE; } @@ -115,6 +127,7 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, asock->ioqueue = ioqueue; asock->stream_oriented = (sock_type == pj_SOCK_STREAM()); asock->async_count = (opt? opt->async_cnt : 1); + asock->whole_data = (opt? opt->whole_data : 1); asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP; asock->user_data = user_data; pj_memcpy(&asock->cb, cb, sizeof(*cb)); @@ -134,7 +147,10 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool, return status; } - if (opt && opt->concurrency >= 0) { + if (asock->whole_data) { + /* Must disable concurrency otherwise there is a race condition */ + pj_ioqueue_set_concurrency(asock->key, 0); + } else if (opt && opt->concurrency >= 0) { pj_ioqueue_set_concurrency(asock->key, opt->concurrency); } @@ -443,6 +459,35 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key, } +static pj_status_t send_remaining(pj_activesock_t *asock, + pj_ioqueue_op_key_t *send_key) +{ + struct send_data *sd = (struct send_data*)send_key->activesock_data; + pj_status_t status; + + do { + pj_ssize_t size; + + size = sd->len - sd->sent; + status = pj_ioqueue_send(asock->key, send_key, + sd->data+sd->sent, &size, sd->flags); + if (status != PJ_SUCCESS) { + /* Pending or error */ + break; + } + + sd->sent += size; + if (sd->sent == sd->len) { + /* The whole data has been sent. */ + return PJ_SUCCESS; + } + + } while (sd->sent < sd->len); + + return status; +} + + PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, pj_ioqueue_op_key_t *send_key, const void *data, @@ -451,7 +496,42 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock, { PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL); - return pj_ioqueue_send(asock->key, send_key, data, size, flags); + send_key->activesock_data = NULL; + + if (asock->whole_data) { + pj_ssize_t whole; + pj_status_t status; + + whole = *size; + + status = pj_ioqueue_send(asock->key, send_key, data, size, flags); + if (status != PJ_SUCCESS) { + /* Pending or error */ + return status; + } + + if (*size == whole) { + /* The whole data has been sent. */ + return PJ_SUCCESS; + } + + /* Data was partially sent */ + asock->send_data.data = (pj_uint8_t*)data; + asock->send_data.len = whole; + asock->send_data.sent = *size; + asock->send_data.flags = flags; + send_key->activesock_data = &asock->send_data; + + /* Try again */ + status = send_remaining(asock, send_key); + if (status == PJ_SUCCESS) { + *size = whole; + } + return status; + + } else { + return pj_ioqueue_send(asock->key, send_key, data, size, flags); + } } @@ -479,6 +559,31 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); + if (bytes_sent > 0 && op_key->activesock_data) { + /* whole_data is requested. Make sure we send all the data */ + struct send_data *sd = (struct send_data*)op_key->activesock_data; + + sd->sent += bytes_sent; + if (sd->sent == sd->len) { + /* all has been sent */ + bytes_sent = sd->sent; + op_key->activesock_data = NULL; + } else { + /* send remaining data */ + pj_status_t status; + + status = send_remaining(asock, op_key); + if (status == PJ_EPENDING) + return; + else if (status == PJ_SUCCESS) + bytes_sent = sd->sent; + else + bytes_sent = -status; + + op_key->activesock_data = NULL; + } + } + if (asock->cb.on_data_sent) { pj_bool_t ret; |