summaryrefslogtreecommitdiff
path: root/pjlib/src/pj/activesock.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjlib/src/pj/activesock.c')
-rw-r--r--pjlib/src/pj/activesock.c109
1 files changed, 107 insertions, 2 deletions
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c
index 45173c5b..4c3985db 100644
--- a/pjlib/src/pj/activesock.c
+++ b/pjlib/src/pj/activesock.c
@@ -51,16 +51,27 @@ struct accept_op
int rem_addr_len;
};
+struct send_data
+{
+ pj_uint8_t *data;
+ pj_ssize_t len;
+ pj_ssize_t sent;
+ unsigned flags;
+};
+
struct pj_activesock_t
{
pj_ioqueue_key_t *key;
pj_bool_t stream_oriented;
+ pj_bool_t whole_data;
pj_ioqueue_t *ioqueue;
void *user_data;
unsigned async_count;
unsigned max_loop;
pj_activesock_cb cb;
+ struct send_data send_data;
+
struct read_op *read_op;
pj_uint32_t read_flags;
enum read_type read_type;
@@ -89,6 +100,7 @@ PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
pj_bzero(cfg, sizeof(*cfg));
cfg->async_cnt = 1;
cfg->concurrency = -1;
+ cfg->whole_data = PJ_TRUE;
}
@@ -115,6 +127,7 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
asock->ioqueue = ioqueue;
asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
asock->async_count = (opt? opt->async_cnt : 1);
+ asock->whole_data = (opt? opt->whole_data : 1);
asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
asock->user_data = user_data;
pj_memcpy(&asock->cb, cb, sizeof(*cb));
@@ -134,7 +147,10 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
return status;
}
- if (opt && opt->concurrency >= 0) {
+ if (asock->whole_data) {
+ /* Must disable concurrency otherwise there is a race condition */
+ pj_ioqueue_set_concurrency(asock->key, 0);
+ } else if (opt && opt->concurrency >= 0) {
pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
}
@@ -443,6 +459,35 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
}
+static pj_status_t send_remaining(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key)
+{
+ struct send_data *sd = (struct send_data*)send_key->activesock_data;
+ pj_status_t status;
+
+ do {
+ pj_ssize_t size;
+
+ size = sd->len - sd->sent;
+ status = pj_ioqueue_send(asock->key, send_key,
+ sd->data+sd->sent, &size, sd->flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ break;
+ }
+
+ sd->sent += size;
+ if (sd->sent == sd->len) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ } while (sd->sent < sd->len);
+
+ return status;
+}
+
+
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
pj_ioqueue_op_key_t *send_key,
const void *data,
@@ -451,7 +496,42 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
{
PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
- return pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ send_key->activesock_data = NULL;
+
+ if (asock->whole_data) {
+ pj_ssize_t whole;
+ pj_status_t status;
+
+ whole = *size;
+
+ status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ return status;
+ }
+
+ if (*size == whole) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ /* Data was partially sent */
+ asock->send_data.data = (pj_uint8_t*)data;
+ asock->send_data.len = whole;
+ asock->send_data.sent = *size;
+ asock->send_data.flags = flags;
+ send_key->activesock_data = &asock->send_data;
+
+ /* Try again */
+ status = send_remaining(asock, send_key);
+ if (status == PJ_SUCCESS) {
+ *size = whole;
+ }
+ return status;
+
+ } else {
+ return pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ }
}
@@ -479,6 +559,31 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ if (bytes_sent > 0 && op_key->activesock_data) {
+ /* whole_data is requested. Make sure we send all the data */
+ struct send_data *sd = (struct send_data*)op_key->activesock_data;
+
+ sd->sent += bytes_sent;
+ if (sd->sent == sd->len) {
+ /* all has been sent */
+ bytes_sent = sd->sent;
+ op_key->activesock_data = NULL;
+ } else {
+ /* send remaining data */
+ pj_status_t status;
+
+ status = send_remaining(asock, op_key);
+ if (status == PJ_EPENDING)
+ return;
+ else if (status == PJ_SUCCESS)
+ bytes_sent = sd->sent;
+ else
+ bytes_sent = -status;
+
+ op_key->activesock_data = NULL;
+ }
+ }
+
if (asock->cb.on_data_sent) {
pj_bool_t ret;