summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2008-08-04 10:52:51 +0000
committerBenny Prijono <bennylp@teluu.com>2008-08-04 10:52:51 +0000
commit8a14826b0e6725b8c5eab7752fd30811510e8c24 (patch)
tree49af4efd31b12e5bca764254040e41b84de2ec92
parentf2b03bec9115d384456098eada81efc9707df24a (diff)
Changed SIP transport to use active socket to fix ticket #579: "Data loss with TCP sockets (thanks Helmut Wolf for the report)". Also added SIP more TCP transport tests to reproduce the bug
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2188 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjsip/src/pjsip/sip_transport_tcp.c595
-rw-r--r--pjsip/src/test-pjsip/transport_tcp_test.c6
-rw-r--r--pjsip/src/test-pjsip/transport_test.c116
3 files changed, 319 insertions, 398 deletions
diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c
index 14842484..53b42280 100644
--- a/pjsip/src/pjsip/sip_transport_tcp.c
+++ b/pjsip/src/pjsip/sip_transport_tcp.c
@@ -21,8 +21,8 @@
#include <pjsip/sip_errno.h>
#include <pj/compat/socket.h>
#include <pj/addr_resolv.h>
+#include <pj/activesock.h>
#include <pj/assert.h>
-#include <pj/ioqueue.h>
#include <pj/lock.h>
#include <pj/log.h>
#include <pj/os.h>
@@ -46,25 +46,6 @@ struct tcp_transport;
/*
- * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to
- * track pending/asynchronous accept() operation. TCP transport may have
- * more than one pending accept() operations, depending on the value of
- * async_cnt.
- */
-struct pending_accept
-{
- pj_ioqueue_op_key_t op_key;
- struct tcp_listener *listener;
- unsigned index;
- pj_pool_t *pool;
- pj_sock_t new_sock;
- int addr_len;
- pj_sockaddr_in local_addr;
- pj_sockaddr_in remote_addr;
-};
-
-
-/*
* This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
* SIP transport factory).
*/
@@ -74,10 +55,7 @@ struct tcp_listener
pj_bool_t is_registered;
pjsip_endpoint *endpt;
pjsip_tpmgr *tpmgr;
- pj_sock_t sock;
- pj_ioqueue_key_t *key;
- unsigned async_cnt;
- struct pending_accept *accept_op[MAX_ASYNC_CNT];
+ pj_activesock_t *asock;
};
@@ -114,7 +92,7 @@ struct tcp_transport
pj_bool_t is_closing;
pj_status_t close_reason;
pj_sock_t sock;
- pj_ioqueue_key_t *key;
+ pj_activesock_t *asock;
pj_bool_t has_pending_connect;
/* Keep-alive timer. */
@@ -139,16 +117,10 @@ struct tcp_transport
*/
/* This callback is called when pending accept() operation completes. */
-static void on_accept_complete( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t sock,
- pj_status_t status);
-
-/* Handle accept() completion */
-static pj_status_t handle_accept(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t sock,
- pj_status_t status);
+static pj_bool_t on_accept_complete(pj_activesock_t *asock,
+ pj_sock_t newsock,
+ const pj_sockaddr_t *src_addr,
+ int src_addr_len);
/* This callback is called by transport manager to destroy listener */
static pj_status_t lis_destroy(pjsip_tpfactory *factory);
@@ -185,11 +157,10 @@ static void sockaddr_to_host_port( pj_pool_t *pool,
pjsip_host_port *host_port,
const pj_sockaddr_in *addr )
{
- enum { M = 48 };
- host_port->host.ptr = (char*) pj_pool_alloc(pool, M);
- host_port->host.slen = pj_ansi_snprintf( host_port->host.ptr, M, "%s",
- pj_inet_ntoa(addr->sin_addr));
- host_port->port = pj_ntohs(addr->sin_port);
+ host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4);
+ pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 2);
+ host_port->host.slen = pj_ansi_strlen(host_port->host.ptr);
+ host_port->port = pj_sockaddr_get_port(addr);
}
@@ -209,11 +180,12 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
pjsip_tpfactory **p_factory)
{
pj_pool_t *pool;
+ pj_sock_t sock = PJ_INVALID_SOCKET;
struct tcp_listener *listener;
- pj_ioqueue_callback listener_cb;
+ pj_activesock_cfg asock_cfg;
+ pj_activesock_cb listener_cb;
pj_sockaddr_in *listener_addr;
int addr_len;
- unsigned i;
pj_status_t status;
/* Sanity check */
@@ -244,7 +216,6 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
listener->factory.type_name = "tcp";
listener->factory.flag =
pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);
- listener->sock = PJ_INVALID_SOCKET;
pj_ansi_strcpy(listener->factory.obj_name, "tcplis");
@@ -255,8 +226,7 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
/* Create and bind socket */
- status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0,
- &listener->sock);
+ status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock);
if (status != PJ_SUCCESS)
goto on_error;
@@ -267,14 +237,13 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
pj_sockaddr_in_init(listener_addr, NULL, 0);
}
- status = pj_sock_bind(listener->sock, listener_addr,
- sizeof(pj_sockaddr_in));
+ status = pj_sock_bind(sock, listener_addr, sizeof(pj_sockaddr_in));
if (status != PJ_SUCCESS)
goto on_error;
/* Retrieve the bound address */
addr_len = sizeof(pj_sockaddr_in);
- status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len);
+ status = pj_sock_getsockname(sock, listener_addr, &addr_len);
if (status != PJ_SUCCESS)
goto on_error;
@@ -320,19 +289,22 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
/* Start listening to the address */
- status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG);
+ status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG);
if (status != PJ_SUCCESS)
goto on_error;
- /* Register socket to ioqeuue */
+ /* Create active socket */
+ if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT;
+ pj_activesock_cfg_default(&asock_cfg);
+ asock_cfg.async_cnt = async_cnt;
+
pj_bzero(&listener_cb, sizeof(listener_cb));
listener_cb.on_accept_complete = &on_accept_complete;
- status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt),
- listener->sock, listener,
- &listener_cb, &listener->key);
- if (status != PJ_SUCCESS)
- goto on_error;
+ status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
+ pjsip_endpt_get_ioqueue(endpt),
+ &listener_cb, listener,
+ &listener->asock);
/* Register to transport manager */
listener->endpt = endpt;
@@ -347,34 +319,10 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
goto on_error;
}
-
/* Start pending accept() operations */
- if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT;
- listener->async_cnt = async_cnt;
-
- for (i=0; i<async_cnt; ++i) {
- pj_pool_t *pool;
-
- pool = pjsip_endpt_create_pool(endpt, "tcps%p", POOL_TP_INIT,
- POOL_TP_INIT);
- if (!pool) {
- status = PJ_ENOMEM;
- goto on_error;
- }
-
- listener->accept_op[i] = PJ_POOL_ZALLOC_T(pool,
- struct pending_accept);
- pj_ioqueue_op_key_init(&listener->accept_op[i]->op_key,
- sizeof(listener->accept_op[i]->op_key));
- listener->accept_op[i]->pool = pool;
- listener->accept_op[i]->listener = listener;
- listener->accept_op[i]->index = i;
-
- status = handle_accept(listener->key, &listener->accept_op[i]->op_key,
- listener->sock, PJ_EPENDING);
- if (status != PJ_SUCCESS)
- goto on_error;
- }
+ status = pj_activesock_start_accept(listener->asock, pool);
+ if (status != PJ_SUCCESS)
+ goto on_error;
PJ_LOG(4,(listener->factory.obj_name,
"SIP TCP listener ready for incoming connections at %.*s:%d",
@@ -388,6 +336,8 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt,
return PJ_SUCCESS;
on_error:
+ if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET)
+ pj_sock_close(sock);
lis_destroy(&listener->factory);
return status;
}
@@ -410,22 +360,15 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
static pj_status_t lis_destroy(pjsip_tpfactory *factory)
{
struct tcp_listener *listener = (struct tcp_listener *)factory;
- unsigned i;
if (listener->is_registered) {
pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
listener->is_registered = PJ_FALSE;
}
- if (listener->key) {
- pj_ioqueue_unregister(listener->key);
- listener->key = NULL;
- listener->sock = PJ_INVALID_SOCKET;
- }
-
- if (listener->sock != PJ_INVALID_SOCKET) {
- pj_sock_close(listener->sock);
- listener->sock = PJ_INVALID_SOCKET;
+ if (listener->asock) {
+ pj_activesock_close(listener->asock);
+ listener->asock = NULL;
}
if (listener->factory.lock) {
@@ -433,14 +376,6 @@ static pj_status_t lis_destroy(pjsip_tpfactory *factory)
listener->factory.lock = NULL;
}
- for (i=0; i<PJ_ARRAY_SIZE(listener->accept_op); ++i) {
- if (listener->accept_op[i] && listener->accept_op[i]->pool) {
- pj_pool_t *pool = listener->accept_op[i]->pool;
- listener->accept_op[i]->pool = NULL;
- pj_pool_release(pool);
- }
- }
-
if (listener->factory.pool) {
pj_pool_t *pool = listener->factory.pool;
@@ -480,19 +415,21 @@ static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
static pj_status_t tcp_destroy(pjsip_transport *transport,
pj_status_t reason);
-/* Callback from ioqueue on incoming packet */
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read);
+/* Callback on incoming data */
+static pj_bool_t on_data_read(pj_activesock_t *asock,
+ void *data,
+ pj_size_t size,
+ pj_status_t status,
+ pj_size_t *remainder);
-/* Callback from ioqueue when packet is sent */
-static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_sent);
+/* Callback when packet is sent */
+static pj_bool_t on_data_sent(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *send_key,
+ pj_ssize_t sent);
-/* Callback from ioqueue when connect completes */
-static void on_connect_complete(pj_ioqueue_key_t *key,
- pj_status_t status);
+/* Callback when connect completes */
+static pj_bool_t on_connect_complete(pj_activesock_t *asock,
+ pj_status_t status);
/* TCP keep-alive timer callback */
static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e);
@@ -510,7 +447,8 @@ static pj_status_t tcp_create( struct tcp_listener *listener,
{
struct tcp_transport *tcp;
pj_ioqueue_t *ioqueue;
- pj_ioqueue_callback tcp_callback;
+ pj_activesock_cfg asock_cfg;
+ pj_activesock_cb tcp_callback;
const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA;
pj_status_t status;
@@ -528,8 +466,8 @@ static pj_status_t tcp_create( struct tcp_listener *listener,
* Create and initialize basic transport structure.
*/
tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport);
- tcp->sock = sock;
tcp->is_server = is_server;
+ tcp->sock = sock;
/*tcp->listener = listener;*/
pj_list_init(&tcp->delayed_list);
tcp->base.pool = pool;
@@ -569,15 +507,18 @@ static pj_status_t tcp_create( struct tcp_listener *listener,
tcp->base.destroy = &tcp_destroy_transport;
- /* Register socket to ioqueue */
- pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback));
- tcp_callback.on_read_complete = &on_read_complete;
- tcp_callback.on_write_complete = &on_write_complete;
+ /* Create active socket */
+ pj_activesock_cfg_default(&asock_cfg);
+ asock_cfg.async_cnt = 1;
+
+ pj_bzero(&tcp_callback, sizeof(tcp_callback));
+ tcp_callback.on_data_read = &on_data_read;
+ tcp_callback.on_data_sent = &on_data_sent;
tcp_callback.on_connect_complete = &on_connect_complete;
ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
- status = pj_ioqueue_register_sock(pool, ioqueue, sock,
- tcp, &tcp_callback, &tcp->key);
+ status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg,
+ ioqueue, &tcp_callback, tcp, &tcp->asock);
if (status != PJ_SUCCESS) {
goto on_error;
}
@@ -627,13 +568,12 @@ static void tcp_flush_pending_tx(struct tcp_transport *tcp)
tdata = pending_tx->tdata_op_key->tdata;
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
- /* send to ioqueue! */
+ /* send! */
size = tdata->buf.cur - tdata->buf.start;
- status = pj_ioqueue_send(tcp->key, op_key,
- tdata->buf.start, &size, 0);
-
+ status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start,
+ &size, 0);
if (status != PJ_EPENDING) {
- on_write_complete(tcp->key, op_key, size);
+ on_data_sent(tcp->asock, op_key, size);
}
}
@@ -693,7 +633,7 @@ static pj_status_t tcp_destroy(pjsip_transport *transport,
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
- on_write_complete(tcp->key, op_key, -reason);
+ on_data_sent(tcp->asock, op_key, -reason);
}
if (tcp->rdata.tp_info.pool) {
@@ -701,13 +641,11 @@ static pj_status_t tcp_destroy(pjsip_transport *transport,
tcp->rdata.tp_info.pool = NULL;
}
- if (tcp->key) {
- pj_ioqueue_unregister(tcp->key);
- tcp->key = NULL;
+ if (tcp->asock) {
+ pj_activesock_close(tcp->asock);
+ tcp->asock = NULL;
tcp->sock = PJ_INVALID_SOCKET;
- }
-
- if (tcp->sock != PJ_INVALID_SOCKET) {
+ } else if (tcp->sock != PJ_INVALID_SOCKET) {
pj_sock_close(tcp->sock);
tcp->sock = PJ_INVALID_SOCKET;
}
@@ -759,6 +697,7 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp)
pj_pool_t *pool;
pj_ssize_t size;
pj_sockaddr_in *rem_addr;
+ void *readbuf[1];
pj_status_t status;
/* Init rdata */
@@ -787,11 +726,12 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp)
tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port);
size = sizeof(tcp->rdata.pkt_info.packet);
- status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,
- tcp->rdata.pkt_info.packet, &size,
- PJ_IOQUEUE_ALWAYS_ASYNC);
+ readbuf[0] = tcp->rdata.pkt_info.packet;
+ status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size,
+ readbuf, 0);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
- PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d",
+ PJ_LOG(4, (tcp->base.obj_name,
+ "pj_activesock_start_read() error, status=%d",
status));
return status;
}
@@ -861,7 +801,8 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
/* Start asynchronous connect() operation */
tcp->has_pending_connect = PJ_TRUE;
- status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in));
+ status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr,
+ sizeof(pj_sockaddr_in));
if (status == PJ_SUCCESS) {
tcp->has_pending_connect = PJ_FALSE;
} else if (status != PJ_EPENDING) {
@@ -873,7 +814,7 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
* set is different now that asynchronous connect() is started.
*/
addr_len = sizeof(pj_sockaddr_in);
- if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) {
+ if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) {
pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr;
/* Some systems (like old Win32 perhaps) may not set local address
@@ -908,157 +849,72 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
/*
- * This callback is called by ioqueue when pending accept() operation has
- * completed.
+ * This callback is called by active socket when pending accept() operation
+ * has completed.
*/
-static void on_accept_complete( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t sock,
- pj_status_t status)
-{
- handle_accept(key, op_key, sock, status);
-}
-
-
-/* Handle accept() completion */
-static pj_status_t handle_accept(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_sock_t sock,
- pj_status_t status)
+static pj_bool_t on_accept_complete(pj_activesock_t *asock,
+ pj_sock_t sock,
+ const pj_sockaddr_t *src_addr,
+ int src_addr_len)
{
struct tcp_listener *listener;
struct tcp_transport *tcp;
- struct pending_accept *accept_op;
- int err_cnt = 0;
-
- listener = (struct tcp_listener*) pj_ioqueue_get_user_data(key);
- accept_op = (struct pending_accept*) op_key;
-
- /*
- * Loop while there is immediate connection or when there is error.
- */
- do {
- if (status == PJ_EPENDING) {
- /*
- * This can only happen when this function is called during
- * initialization to kick off asynchronous accept().
- */
+ char addr[PJ_INET6_ADDRSTRLEN+10];
+ pj_status_t status;
- } else if (status != PJ_SUCCESS) {
+ PJ_UNUSED_ARG(src_addr_len);
- /*
- * Error in accept().
- */
- tcp_perror(listener->factory.obj_name, "Error in accept()",
- status);
+ listener = (struct tcp_listener*) pj_activesock_get_user_data(asock);
- /*
- * Prevent endless accept() error loop by limiting the
- * number of consecutive errors. Once the number of errors
- * is equal to maximum, we treat this as permanent error, and
- * we stop the accept() operation.
- */
- ++err_cnt;
- if (err_cnt >= 20) {
- PJ_LOG(1, (listener->factory.obj_name,
- "Too many errors, LISTENER IS STOPPING!"));
- return status;
- }
+ PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE);
+ PJ_LOG(4,(listener->factory.obj_name,
+ "TCP listener %.*s:%d: got incoming TCP connection "
+ "from %s, sock=%d",
+ (int)listener->factory.addr_name.host.slen,
+ listener->factory.addr_name.host.ptr,
+ listener->factory.addr_name.port,
+ pj_sockaddr_print(src_addr, addr, sizeof(addr), 3),
+ sock));
+
+ /*
+ * Incoming connection!
+ * Create TCP transport for the new socket.
+ */
+ status = tcp_create( listener, NULL, sock, PJ_TRUE,
+ (const pj_sockaddr_in*)&listener->factory.local_addr,
+ (const pj_sockaddr_in*)src_addr, &tcp);
+ if (status == PJ_SUCCESS) {
+ status = tcp_start_read(tcp);
+ if (status != PJ_SUCCESS) {
+ PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
+ tcp_destroy(&tcp->base, status);
} else {
- pj_pool_t *pool;
- struct pending_accept *new_op;
-
- if (sock == PJ_INVALID_SOCKET) {
- sock = accept_op->new_sock;
+ /* Start keep-alive timer */
+ if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
+ pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
+ pjsip_endpt_schedule_timer(listener->endpt,
+ &tcp->ka_timer,
+ &delay);
+ tcp->ka_timer.id = PJ_TRUE;
+ pj_gettimeofday(&tcp->last_activity);
}
-
- if (sock == PJ_INVALID_SOCKET) {
- pj_assert(!"Should not happen. status should be error");
- goto next_accept;
- }
-
- PJ_LOG(4,(listener->factory.obj_name,
- "TCP listener %.*s:%d: got incoming TCP connection "
- "from %s:%d, sock=%d",
- (int)listener->factory.addr_name.host.slen,
- listener->factory.addr_name.host.ptr,
- listener->factory.addr_name.port,
- pj_inet_ntoa(accept_op->remote_addr.sin_addr),
- pj_ntohs(accept_op->remote_addr.sin_port),
- sock));
-
- /* Create new accept_opt */
- pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p",
- POOL_TP_INIT, POOL_TP_INC);
- new_op = PJ_POOL_ZALLOC_T(pool, struct pending_accept);
- new_op->pool = pool;
- new_op->listener = listener;
- new_op->index = accept_op->index;
- pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key));
- listener->accept_op[accept_op->index] = new_op;
-
- /*
- * Incoming connections!
- * Create TCP transport for the new socket.
- */
- status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE,
- &accept_op->local_addr,
- &accept_op->remote_addr, &tcp);
- if (status == PJ_SUCCESS) {
- status = tcp_start_read(tcp);
- if (status != PJ_SUCCESS) {
- PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
- tcp_destroy(&tcp->base, status);
- } else {
- /* Start keep-alive timer */
- if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) {
- pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0};
- pjsip_endpt_schedule_timer(listener->endpt,
- &tcp->ka_timer,
- &delay);
- tcp->ka_timer.id = PJ_TRUE;
- pj_gettimeofday(&tcp->last_activity);
- }
- }
- }
-
- accept_op = new_op;
}
+ }
-next_accept:
- /*
- * Start the next asynchronous accept() operation.
- */
- accept_op->addr_len = sizeof(pj_sockaddr_in);
- accept_op->new_sock = PJ_INVALID_SOCKET;
-
- status = pj_ioqueue_accept(listener->key,
- &accept_op->op_key,
- &accept_op->new_sock,
- &accept_op->local_addr,
- &accept_op->remote_addr,
- &accept_op->addr_len);
-
- /*
- * Loop while we have immediate connection or when there is error.
- */
-
- } while (status != PJ_EPENDING);
-
- return PJ_SUCCESS;
+ return PJ_TRUE;
}
/*
* Callback from ioqueue when packet is sent.
*/
-static void on_write_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_sent)
+static pj_bool_t on_data_sent(pj_activesock_t *asock,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
{
struct tcp_transport *tcp = (struct tcp_transport*)
- pj_ioqueue_get_user_data(key);
+ pj_activesock_get_user_data(asock);
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
/* Note that op_key may be the op_key from keep-alive, thus
@@ -1078,6 +934,7 @@ static void on_write_complete(pj_ioqueue_key_t *key,
/* Mark last activity time */
pj_gettimeofday(&tcp->last_activity);
+
}
/* Check for error/closure */
@@ -1091,8 +948,11 @@ static void on_write_complete(pj_ioqueue_key_t *key,
-bytes_sent;
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
+
+ return PJ_FALSE;
}
+ return PJ_TRUE;
}
@@ -1166,9 +1026,9 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport,
* sent asynchronously.
*/
size = tdata->buf.cur - tdata->buf.start;
- status = pj_ioqueue_send(tcp->key,
- (pj_ioqueue_op_key_t*)&tdata->op_key,
- tdata->buf.start, &size, 0);
+ status = pj_activesock_send(tcp->asock,
+ (pj_ioqueue_op_key_t*)&tdata->op_key,
+ tdata->buf.start, &size, 0);
if (status != PJ_EPENDING) {
/* Not pending (could be immediate success or error) */
@@ -1212,158 +1072,97 @@ static pj_status_t tcp_shutdown(pjsip_transport *transport)
/*
* Callback from ioqueue that an incoming data is received from the socket.
*/
-static void on_read_complete(pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read)
+static pj_bool_t on_data_read(pj_activesock_t *asock,
+ void *data,
+ pj_size_t size,
+ pj_status_t status,
+ pj_size_t *remainder)
{
enum { MAX_IMMEDIATE_PACKET = 10 };
- pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
- pjsip_rx_data *rdata = rdata_op_key->rdata;
- struct tcp_transport *tcp =
- (struct tcp_transport*)rdata->tp_info.transport;
- int i;
- pj_status_t status;
+ struct tcp_transport *tcp;
+ pjsip_rx_data *rdata;
+
+ PJ_UNUSED_ARG(data);
+
+ tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
+ rdata = &tcp->rdata;
/* Don't do anything if transport is closing. */
if (tcp->is_closing) {
tcp->is_closing++;
- return;
+ return PJ_FALSE;
}
- /*
- * The idea of the loop is to process immediate data received by
- * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When
- * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to
- * complete asynchronously, to allow other sockets to get their data.
+ /* Houston, we have packet! Report the packet to transport manager
+ * to be parsed.
*/
- for (i=0;; ++i) {
- pj_uint32_t flags;
-
- /* Houston, we have packet! Report the packet to transport manager
- * to be parsed.
- */
- if (bytes_read > 0) {
- pj_size_t size_eaten;
-
- /* Mark this as an activity */
- pj_gettimeofday(&tcp->last_activity);
-
- /* Init pkt_info part. */
- rdata->pkt_info.len += bytes_read;
- rdata->pkt_info.zero = 0;
- pj_gettimeofday(&rdata->pkt_info.timestamp);
-
- /* Report to transport manager.
- * The transport manager will tell us how many bytes of the packet
- * have been processed (as valid SIP message).
- */
- size_eaten =
- pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
- rdata);
-
- pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
-
- /* Move unprocessed data to the front of the buffer */
- if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) {
- pj_memmove(rdata->pkt_info.packet,
- rdata->pkt_info.packet + size_eaten,
- rdata->pkt_info.len - size_eaten);
- }
-
- rdata->pkt_info.len -= size_eaten;
-
- } else if (bytes_read == 0) {
-
- /* Transport is closed */
- PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
-
- /* We can not destroy the transport since high level objects may
- * still keep reference to this transport. So we can only
- * instruct transport manager to gracefully start the shutdown
- * procedure for this transport.
- */
- if (tcp->close_reason==PJ_SUCCESS)
- tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
- pjsip_transport_shutdown(&tcp->base);
-
- return;
+ if (status == PJ_SUCCESS) {
+ pj_size_t size_eaten;
- //} else if (bytes_read < 0) {
- } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
- -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
- -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
- {
+ /* Mark this as an activity */
+ pj_gettimeofday(&tcp->last_activity);
- /* Socket error. */
- PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));
+ pj_assert((void*)rdata->pkt_info.packet == data);
- /* We can not destroy the transport since high level objects may
- * still keep reference to this transport. So we can only
- * instruct transport manager to gracefully start the shutdown
- * procedure for this transport.
- */
- if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read;
- pjsip_transport_shutdown(&tcp->base);
+ /* Init pkt_info part. */
+ rdata->pkt_info.len = size;
+ rdata->pkt_info.zero = 0;
+ pj_gettimeofday(&rdata->pkt_info.timestamp);
- return;
- }
-
- if (i >= MAX_IMMEDIATE_PACKET) {
- /* Receive quota reached. Force ioqueue_recv() to
- * return PJ_EPENDING
- */
- flags = PJ_IOQUEUE_ALWAYS_ASYNC;
- } else {
- flags = 0;
+ /* Report to transport manager.
+ * The transport manager will tell us how many bytes of the packet
+ * have been processed (as valid SIP message).
+ */
+ size_eaten =
+ pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
+ rdata);
+
+ pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len);
+
+ /* Move unprocessed data to the front of the buffer */
+ *remainder = size - size_eaten;
+ if (*remainder > 0 && *remainder != size) {
+ pj_memmove(rdata->pkt_info.packet,
+ rdata->pkt_info.packet + size_eaten,
+ *remainder);
}
- /* Reset pool. */
- pj_pool_reset(rdata->tp_info.pool);
-
- /* Read next packet. */
- bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len;
- rdata->pkt_info.src_addr_len = sizeof(pj_sockaddr_in);
- status = pj_ioqueue_recv(key, op_key,
- rdata->pkt_info.packet+rdata->pkt_info.len,
- &bytes_read, flags);
-
- if (status == PJ_SUCCESS) {
+ } else {
- /* Continue loop. */
- pj_assert(i < MAX_IMMEDIATE_PACKET);
+ /* Transport is closed */
+ PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
+
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS)
+ tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
- } else if (status == PJ_EPENDING) {
- break;
+ return PJ_FALSE;
- } else {
- /* Socket error */
- PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset"));
+ }
- /* We can not destroy the transport since high level objects may
- * still keep reference to this transport. So we can only
- * instruct transport manager to gracefully start the shutdown
- * procedure for this transport.
- */
- if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
- pjsip_transport_shutdown(&tcp->base);
+ /* Reset pool. */
+ pj_pool_reset(rdata->tp_info.pool);
- return;
- }
- }
+ return PJ_TRUE;
}
/*
* Callback from ioqueue when asynchronous connect() operation completes.
*/
-static void on_connect_complete(pj_ioqueue_key_t *key,
- pj_status_t status)
+static pj_bool_t on_connect_complete(pj_activesock_t *asock,
+ pj_status_t status)
{
struct tcp_transport *tcp;
pj_sockaddr_in addr;
int addrlen;
- tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key);
+ tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock);
/* Mark that pending connect() operation has completed. */
tcp->has_pending_connect = PJ_FALSE;
@@ -1383,7 +1182,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
- on_write_complete(tcp->key, op_key, -status);
+ on_data_sent(tcp->asock, op_key, -status);
}
/* We can not destroy the transport since high level objects may
@@ -1393,7 +1192,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
- return;
+ return PJ_FALSE;
}
PJ_LOG(4,(tcp->base.obj_name,
@@ -1432,7 +1231,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
*/
if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
pjsip_transport_shutdown(&tcp->base);
- return;
+ return PJ_FALSE;
}
/* Flush all pending send operations */
@@ -1446,6 +1245,8 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
tcp->ka_timer.id = PJ_TRUE;
pj_gettimeofday(&tcp->last_activity);
}
+
+ return PJ_TRUE;
}
/* Transport keep-alive timer callback */
@@ -1482,8 +1283,8 @@ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e)
/* Send the data */
size = tcp->ka_pkt.slen;
- status = pj_ioqueue_send(tcp->key, &tcp->ka_op_key.key,
- tcp->ka_pkt.ptr, &size, 0);
+ status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key,
+ tcp->ka_pkt.ptr, &size, 0);
if (status != PJ_SUCCESS && status != PJ_EPENDING) {
tcp_perror(tcp->base.obj_name,
diff --git a/pjsip/src/test-pjsip/transport_tcp_test.c b/pjsip/src/test-pjsip/transport_tcp_test.c
index 3594aff0..2a091caa 100644
--- a/pjsip/src/test-pjsip/transport_tcp_test.c
+++ b/pjsip/src/test-pjsip/transport_tcp_test.c
@@ -83,7 +83,11 @@ int transport_tcp_test(void)
/* Check again that reference counter is 1. */
if (pj_atomic_get(tcp->ref_cnt) != 1)
- return -70;
+ return -40;
+
+ /* Load test */
+ if (transport_load_test(url) != 0)
+ return -60;
/* Basic transport's send/receive loopback test. */
for (i=0; i<SEND_RECV_LOOP; ++i) {
diff --git a/pjsip/src/test-pjsip/transport_test.c b/pjsip/src/test-pjsip/transport_test.c
index a283ec73..a7aa8905 100644
--- a/pjsip/src/test-pjsip/transport_test.c
+++ b/pjsip/src/test-pjsip/transport_test.c
@@ -641,3 +641,119 @@ int transport_rt_test( pjsip_transport_type_e tp_type,
return 0;
}
+
+///////////////////////////////////////////////////////////////////////////////
+/*
+ * Transport load testing
+ */
+static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata);
+
+static struct mod_load_test
+{
+ pjsip_module mod;
+ pj_uint32_t next_seq;
+ pj_bool_t err;
+} mod_load =
+{
+ {
+ NULL, NULL, /* prev and next */
+ { "mod-load-test", 13}, /* Name. */
+ -1, /* Id */
+ PJSIP_MOD_PRIORITY_TSX_LAYER-1, /* Priority */
+ NULL, /* load() */
+ NULL, /* start() */
+ NULL, /* stop() */
+ NULL, /* unload() */
+ &load_on_rx_request, /* on_rx_request() */
+ NULL, /* on_rx_response() */
+ NULL, /* tsx_handler() */
+ }
+};
+
+
+static pj_bool_t load_on_rx_request(pjsip_rx_data *rdata)
+{
+ if (rdata->msg_info.cseq->cseq != mod_load.next_seq) {
+ PJ_LOG(1,("THIS_FILE", " err: expecting cseq %u, got %u",
+ mod_load.next_seq, rdata->msg_info.cseq->cseq));
+ mod_load.err = PJ_TRUE;
+ mod_load.next_seq = rdata->msg_info.cseq->cseq + 1;
+ } else
+ mod_load.next_seq++;
+ return PJ_TRUE;
+}
+
+int transport_load_test(char *target_url)
+{
+ enum { COUNT = 2000 };
+ unsigned i;
+ pj_status_t status;
+
+ /* exhaust packets */
+ do {
+ pj_time_val delay = {1, 0};
+ i = 0;
+ pjsip_endpt_handle_events2(endpt, &delay, &i);
+ } while (i != 0);
+
+ PJ_LOG(3,(THIS_FILE, " transport load test..."));
+
+ if (mod_load.mod.id == -1) {
+ status = pjsip_endpt_register_module( endpt, &mod_load.mod);
+ if (status != PJ_SUCCESS) {
+ app_perror("error registering module", status);
+ return -1;
+ }
+ }
+ mod_load.err = PJ_FALSE;
+ mod_load.next_seq = 0;
+
+ for (i=0; i<COUNT && !mod_load.err; ++i) {
+ pj_str_t target, from, call_id;
+ pjsip_tx_data *tdata;
+
+ target = pj_str(target_url);
+ from = pj_str("<sip:user@host>");
+ call_id = pj_str("thecallid");
+ status = pjsip_endpt_create_request(endpt, &pjsip_invite_method,
+ &target, &from,
+ &target, &from, &call_id,
+ i, NULL, &tdata );
+ if (status != PJ_SUCCESS) {
+ app_perror("error creating request", status);
+ goto on_return;
+ }
+
+ status = pjsip_endpt_send_request_stateless(endpt, tdata, NULL, NULL);
+ if (status != PJ_SUCCESS) {
+ app_perror("error sending request", status);
+ goto on_return;
+ }
+ }
+
+ do {
+ pj_time_val delay = {1, 0};
+ i = 0;
+ pjsip_endpt_handle_events2(endpt, &delay, &i);
+ } while (i != 0);
+
+ if (mod_load.next_seq != COUNT) {
+ PJ_LOG(1,("THIS_FILE", " err: expecting %u msg, got only %u",
+ COUNT, mod_load.next_seq));
+ status = -2;
+ goto on_return;
+ }
+
+on_return:
+ if (mod_load.mod.id != -1) {
+ pjsip_endpt_unregister_module( endpt, &mod_load.mod);
+ mod_load.mod.id = -1;
+ }
+ if (status != PJ_SUCCESS || mod_load.err) {
+ return -2;
+ }
+ PJ_LOG(3,(THIS_FILE, " success"));
+ return 0;
+}
+
+