summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-07-29 20:15:15 +0000
committerBenny Prijono <bennylp@teluu.com>2008-07-29 20:15:15 +0000
commit8fbda606f3e71e9d0d8b3ae6571db408b9dae48d (patch)
tree15d7a447e0c07752619e0d74332c59401fc50199
parent02e33827d8cd91329007d2d2009965222d56ba53 (diff)
Initial work for ticket #579: added option to make the active socket sends all the (TCP) data before calling completion callback
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2185 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjlib/include/pj/activesock.h15
-rw-r--r--pjlib/include/pj/ioqueue.h1
-rw-r--r--pjlib/src/pj/activesock.c109
-rw-r--r--pjlib/src/pjlib-test/activesock.c219
4 files changed, 342 insertions, 2 deletions
diff --git a/pjlib/include/pj/activesock.h b/pjlib/include/pj/activesock.h
index dd5d8270..f465de83 100644
--- a/pjlib/include/pj/activesock.h
+++ b/pjlib/include/pj/activesock.h
@@ -201,6 +201,21 @@ typedef struct pj_activesock_cfg
*/
int concurrency;
+ /**
+ * If this option is specified, the active socket will make sure that
+ * asynchronous send operation with stream oriented socket will only
+ * call the callback after all data has been sent. This means that the
+ * active socket will automatically resend the remaining data until
+ * all data has been sent.
+ *
+ * Please note that when this option is specified, it is possible that
+ * error is reported after partial data has been sent. Also setting
+ * this will disable the ioqueue concurrency for the socket.
+ *
+ * Default value is 1.
+ */
+ pj_bool_t whole_data;
+
} pj_activesock_cfg;
diff --git a/pjlib/include/pj/ioqueue.h b/pjlib/include/pj/ioqueue.h
index d8df170e..8cfc090c 100644
--- a/pjlib/include/pj/ioqueue.h
+++ b/pjlib/include/pj/ioqueue.h
@@ -208,6 +208,7 @@ PJ_BEGIN_DECL
typedef struct pj_ioqueue_op_key_t
{
void *internal__[32]; /**< Internal I/O Queue data. */
+ void *activesock_data; /**< Active socket data. */
void *user_data; /**< Application data. */
} pj_ioqueue_op_key_t;
diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c
index 45173c5b..4c3985db 100644
--- a/pjlib/src/pj/activesock.c
+++ b/pjlib/src/pj/activesock.c
@@ -51,16 +51,27 @@ struct accept_op
int rem_addr_len;
};
+struct send_data
+{
+ pj_uint8_t *data;
+ pj_ssize_t len;
+ pj_ssize_t sent;
+ unsigned flags;
+};
+
struct pj_activesock_t
{
pj_ioqueue_key_t *key;
pj_bool_t stream_oriented;
+ pj_bool_t whole_data;
pj_ioqueue_t *ioqueue;
void *user_data;
unsigned async_count;
unsigned max_loop;
pj_activesock_cb cb;
+ struct send_data send_data;
+
struct read_op *read_op;
pj_uint32_t read_flags;
enum read_type read_type;
@@ -89,6 +100,7 @@ PJ_DEF(void) pj_activesock_cfg_default(pj_activesock_cfg *cfg)
pj_bzero(cfg, sizeof(*cfg));
cfg->async_cnt = 1;
cfg->concurrency = -1;
+ cfg->whole_data = PJ_TRUE;
}
@@ -115,6 +127,7 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
asock->ioqueue = ioqueue;
asock->stream_oriented = (sock_type == pj_SOCK_STREAM());
asock->async_count = (opt? opt->async_cnt : 1);
+ asock->whole_data = (opt? opt->whole_data : 1);
asock->max_loop = PJ_ACTIVESOCK_MAX_LOOP;
asock->user_data = user_data;
pj_memcpy(&asock->cb, cb, sizeof(*cb));
@@ -134,7 +147,10 @@ PJ_DEF(pj_status_t) pj_activesock_create( pj_pool_t *pool,
return status;
}
- if (opt && opt->concurrency >= 0) {
+ if (asock->whole_data) {
+ /* Must disable concurrency otherwise there is a race condition */
+ pj_ioqueue_set_concurrency(asock->key, 0);
+ } else if (opt && opt->concurrency >= 0) {
pj_ioqueue_set_concurrency(asock->key, opt->concurrency);
}
@@ -443,6 +459,35 @@ static void ioqueue_on_read_complete(pj_ioqueue_key_t *key,
}
+static pj_status_t send_remaining(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key)
+{
+ struct send_data *sd = (struct send_data*)send_key->activesock_data;
+ pj_status_t status;
+
+ do {
+ pj_ssize_t size;
+
+ size = sd->len - sd->sent;
+ status = pj_ioqueue_send(asock->key, send_key,
+ sd->data+sd->sent, &size, sd->flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ break;
+ }
+
+ sd->sent += size;
+ if (sd->sent == sd->len) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ } while (sd->sent < sd->len);
+
+ return status;
+}
+
+
PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
pj_ioqueue_op_key_t *send_key,
const void *data,
@@ -451,7 +496,42 @@ PJ_DEF(pj_status_t) pj_activesock_send( pj_activesock_t *asock,
{
PJ_ASSERT_RETURN(asock && send_key && data && size, PJ_EINVAL);
- return pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ send_key->activesock_data = NULL;
+
+ if (asock->whole_data) {
+ pj_ssize_t whole;
+ pj_status_t status;
+
+ whole = *size;
+
+ status = pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ if (status != PJ_SUCCESS) {
+ /* Pending or error */
+ return status;
+ }
+
+ if (*size == whole) {
+ /* The whole data has been sent. */
+ return PJ_SUCCESS;
+ }
+
+ /* Data was partially sent */
+ asock->send_data.data = (pj_uint8_t*)data;
+ asock->send_data.len = whole;
+ asock->send_data.sent = *size;
+ asock->send_data.flags = flags;
+ send_key->activesock_data = &asock->send_data;
+
+ /* Try again */
+ status = send_remaining(asock, send_key);
+ if (status == PJ_SUCCESS) {
+ *size = whole;
+ }
+ return status;
+
+ } else {
+ return pj_ioqueue_send(asock->key, send_key, data, size, flags);
+ }
}
@@ -479,6 +559,31 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key,
asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key);
+ if (bytes_sent > 0 && op_key->activesock_data) {
+ /* whole_data is requested. Make sure we send all the data */
+ struct send_data *sd = (struct send_data*)op_key->activesock_data;
+
+ sd->sent += bytes_sent;
+ if (sd->sent == sd->len) {
+ /* all has been sent */
+ bytes_sent = sd->sent;
+ op_key->activesock_data = NULL;
+ } else {
+ /* send remaining data */
+ pj_status_t status;
+
+ status = send_remaining(asock, op_key);
+ if (status == PJ_EPENDING)
+ return;
+ else if (status == PJ_SUCCESS)
+ bytes_sent = sd->sent;
+ else
+ bytes_sent = -status;
+
+ op_key->activesock_data = NULL;
+ }
+ }
+
if (asock->cb.on_data_sent) {
pj_bool_t ret;
diff --git a/pjlib/src/pjlib-test/activesock.c b/pjlib/src/pjlib-test/activesock.c
index e106b71d..2eedca0a 100644
--- a/pjlib/src/pjlib-test/activesock.c
+++ b/pjlib/src/pjlib-test/activesock.c
@@ -246,15 +246,234 @@ on_return:
}
+
+#define SIGNATURE 0xdeadbeef
+struct tcp_pkt
+{
+ pj_uint32_t signature;
+ pj_uint32_t seq;
+ char fill[513];
+};
+
+struct tcp_state
+{
+ pj_bool_t err;
+ pj_bool_t sent;
+ pj_uint32_t next_recv_seq;
+ pj_uint8_t pkt[600];
+};
+
+struct send_key
+{
+ pj_ioqueue_op_key_t op_key;
+};
+
+
+static pj_bool_t tcp_on_data_read(pj_activesock_t *asock,
+ void *data,
+ pj_size_t size,
+ pj_status_t status,
+ pj_size_t *remainder)
+{
+ struct tcp_state *st = (struct tcp_state*) pj_activesock_get_user_data(asock);
+ char *next = (char*) data;
+
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
+ PJ_LOG(1,("", " err: status=%d", status));
+ st->err = PJ_TRUE;
+ return PJ_FALSE;
+ }
+
+ while (size >= sizeof(struct tcp_pkt)) {
+ struct tcp_pkt *tcp_pkt = (struct tcp_pkt*) next;
+
+ if (tcp_pkt->signature != SIGNATURE) {
+ PJ_LOG(1,("", " err: invalid signature at seq=%d",
+ st->next_recv_seq));
+ st->err = PJ_TRUE;
+ return PJ_FALSE;
+ }
+ if (tcp_pkt->seq != st->next_recv_seq) {
+ PJ_LOG(1,("", " err: wrong sequence"));
+ st->err = PJ_TRUE;
+ return PJ_FALSE;
+ }
+
+ st->next_recv_seq++;
+ next += sizeof(struct tcp_pkt);
+ size -= sizeof(struct tcp_pkt);
+ }
+
+ if (size) {
+ pj_memmove(data, next, size);
+ *remainder = size;
+ }
+
+ return PJ_TRUE;
+}
+
+static pj_bool_t tcp_on_data_sent(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t sent)
+{
+ struct tcp_state *st=(struct tcp_state*)pj_activesock_get_user_data(asock);
+
+ st->sent = 1;
+
+ if (sent < 1) {
+ st->err = PJ_TRUE;
+ return PJ_FALSE;
+ }
+
+ return PJ_TRUE;
+}
+
+static int tcp_perf_test(void)
+{
+ enum { COUNT=100000 };
+ pj_pool_t *pool = NULL;
+ pj_ioqueue_t *ioqueue = NULL;
+ pj_sock_t sock1=PJ_INVALID_SOCKET, sock2=PJ_INVALID_SOCKET;
+ pj_activesock_t *asock1 = NULL, *asock2 = NULL;
+ pj_activesock_cb cb;
+ struct tcp_state *state1, *state2;
+ unsigned i;
+ pj_status_t status;
+
+ pool = pj_pool_create(mem, "tcpperf", 256, 256, NULL);
+
+ status = app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock1,
+ &sock2);
+ if (status != PJ_SUCCESS) {
+ status = -100;
+ goto on_return;
+ }
+
+ status = pj_ioqueue_create(pool, 4, &ioqueue);
+ if (status != PJ_SUCCESS) {
+ status = -110;
+ goto on_return;
+ }
+
+ pj_bzero(&cb, sizeof(cb));
+ cb.on_data_read = &tcp_on_data_read;
+ cb.on_data_sent = &tcp_on_data_sent;
+
+ state1 = PJ_POOL_ZALLOC_T(pool, struct tcp_state);
+ status = pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), NULL, ioqueue,
+ &cb, state1, &asock1);
+ if (status != PJ_SUCCESS) {
+ status = -120;
+ goto on_return;
+ }
+
+ state2 = PJ_POOL_ZALLOC_T(pool, struct tcp_state);
+ status = pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), NULL, ioqueue,
+ &cb, state2, &asock2);
+ if (status != PJ_SUCCESS) {
+ status = -130;
+ goto on_return;
+ }
+
+ status = pj_activesock_start_read(asock1, pool, 1000, 0);
+ if (status != PJ_SUCCESS) {
+ status = -140;
+ goto on_return;
+ }
+
+ /* Send packet as quickly as possible */
+ for (i=0; i<COUNT && !state1->err && !state2->err; ++i) {
+ struct tcp_pkt *pkt;
+ struct send_key send_key[2], *op_key;
+ pj_ssize_t len;
+
+ pkt = (struct tcp_pkt*)state2->pkt;
+ pkt->signature = SIGNATURE;
+ pkt->seq = i;
+ pj_memset(pkt->fill, 'a', sizeof(pkt->fill));
+
+ op_key = &send_key[i%2];
+ pj_ioqueue_op_key_init(&op_key->op_key, sizeof(*op_key));
+
+ state2->sent = PJ_FALSE;
+ len = sizeof(*pkt);
+ status = pj_activesock_send(asock2, &op_key->op_key, pkt, &len, 0);
+ if (status == PJ_EPENDING) {
+ do {
+ pj_ioqueue_poll(ioqueue, NULL);
+ } while (!state2->sent);
+ } else if (status != PJ_SUCCESS) {
+ PJ_LOG(1,("", " err: send status=%d", status));
+ status = -180;
+ break;
+ } else if (status == PJ_SUCCESS) {
+ if (len != sizeof(*pkt)) {
+ PJ_LOG(1,("", " err: shouldn't report partial sent"));
+ status = -190;
+ break;
+ }
+ }
+ }
+
+ /* Wait until everything has been sent/received */
+ if (state1->next_recv_seq < COUNT) {
+ pj_time_val delay = {0, 100};
+ while (pj_ioqueue_poll(ioqueue, &delay) > 0)
+ ;
+ }
+
+ if (status == PJ_EPENDING)
+ status = PJ_SUCCESS;
+
+ if (status != 0)
+ goto on_return;
+
+ if (state1->err) {
+ status = -183;
+ goto on_return;
+ }
+ if (state2->err) {
+ status = -186;
+ goto on_return;
+ }
+ if (state1->next_recv_seq != COUNT) {
+ PJ_LOG(3,("", " err: only %u packets received, expecting %u",
+ state1->next_recv_seq, COUNT));
+ status = -195;
+ goto on_return;
+ }
+
+on_return:
+ if (asock2)
+ pj_activesock_close(asock2);
+ if (asock1)
+ pj_activesock_close(asock1);
+ if (ioqueue)
+ pj_ioqueue_destroy(ioqueue);
+ if (pool)
+ pj_pool_release(pool);
+
+ return status;
+}
+
+
+
int activesock_test(void)
{
int ret;
+ ret = (int)&udp_ping_pong_test;
+
PJ_LOG(3,("", "..udp ping/pong test"));
ret = udp_ping_pong_test();
if (ret != 0)
return ret;
+ PJ_LOG(3,("", "..tcp perf test"));
+ ret = tcp_perf_test();
+ if (ret != 0)
+ return ret;
+
return 0;
}