From 8a14826b0e6725b8c5eab7752fd30811510e8c24 Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Mon, 4 Aug 2008 10:52:51 +0000 Subject: Changed SIP transport to use active socket to fix ticket #579: "Data loss with TCP sockets (thanks Helmut Wolf for the report)". Also added SIP more TCP transport tests to reproduce the bug git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@2188 74dad513-b988-da41-8d7b-12977e46ad98 --- pjsip/src/pjsip/sip_transport_tcp.c | 595 ++++++++++-------------------- pjsip/src/test-pjsip/transport_tcp_test.c | 6 +- pjsip/src/test-pjsip/transport_test.c | 116 ++++++ 3 files changed, 319 insertions(+), 398 deletions(-) (limited to 'pjsip') diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c index 14842484..53b42280 100644 --- a/pjsip/src/pjsip/sip_transport_tcp.c +++ b/pjsip/src/pjsip/sip_transport_tcp.c @@ -21,8 +21,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -45,25 +45,6 @@ struct tcp_listener; struct tcp_transport; -/* - * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to - * track pending/asynchronous accept() operation. TCP transport may have - * more than one pending accept() operations, depending on the value of - * async_cnt. - */ -struct pending_accept -{ - pj_ioqueue_op_key_t op_key; - struct tcp_listener *listener; - unsigned index; - pj_pool_t *pool; - pj_sock_t new_sock; - int addr_len; - pj_sockaddr_in local_addr; - pj_sockaddr_in remote_addr; -}; - - /* * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the * SIP transport factory). @@ -74,10 +55,7 @@ struct tcp_listener pj_bool_t is_registered; pjsip_endpoint *endpt; pjsip_tpmgr *tpmgr; - pj_sock_t sock; - pj_ioqueue_key_t *key; - unsigned async_cnt; - struct pending_accept *accept_op[MAX_ASYNC_CNT]; + pj_activesock_t *asock; }; @@ -114,7 +92,7 @@ struct tcp_transport pj_bool_t is_closing; pj_status_t close_reason; pj_sock_t sock; - pj_ioqueue_key_t *key; + pj_activesock_t *asock; pj_bool_t has_pending_connect; /* Keep-alive timer. */ @@ -139,16 +117,10 @@ struct tcp_transport */ /* This callback is called when pending accept() operation completes. */ -static void on_accept_complete( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t sock, - pj_status_t status); - -/* Handle accept() completion */ -static pj_status_t handle_accept(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t sock, - pj_status_t status); +static pj_bool_t on_accept_complete(pj_activesock_t *asock, + pj_sock_t newsock, + const pj_sockaddr_t *src_addr, + int src_addr_len); /* This callback is called by transport manager to destroy listener */ static pj_status_t lis_destroy(pjsip_tpfactory *factory); @@ -185,11 +157,10 @@ static void sockaddr_to_host_port( pj_pool_t *pool, pjsip_host_port *host_port, const pj_sockaddr_in *addr ) { - enum { M = 48 }; - host_port->host.ptr = (char*) pj_pool_alloc(pool, M); - host_port->host.slen = pj_ansi_snprintf( host_port->host.ptr, M, "%s", - pj_inet_ntoa(addr->sin_addr)); - host_port->port = pj_ntohs(addr->sin_port); + host_port->host.ptr = (char*) pj_pool_alloc(pool, PJ_INET6_ADDRSTRLEN+4); + pj_sockaddr_print(addr, host_port->host.ptr, PJ_INET6_ADDRSTRLEN+4, 2); + host_port->host.slen = pj_ansi_strlen(host_port->host.ptr); + host_port->port = pj_sockaddr_get_port(addr); } @@ -209,11 +180,12 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, pjsip_tpfactory **p_factory) { pj_pool_t *pool; + pj_sock_t sock = PJ_INVALID_SOCKET; struct tcp_listener *listener; - pj_ioqueue_callback listener_cb; + pj_activesock_cfg asock_cfg; + pj_activesock_cb listener_cb; pj_sockaddr_in *listener_addr; int addr_len; - unsigned i; pj_status_t status; /* Sanity check */ @@ -244,7 +216,6 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, listener->factory.type_name = "tcp"; listener->factory.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); - listener->sock = PJ_INVALID_SOCKET; pj_ansi_strcpy(listener->factory.obj_name, "tcplis"); @@ -255,8 +226,7 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, /* Create and bind socket */ - status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, - &listener->sock); + status = pj_sock_socket(pj_AF_INET(), pj_SOCK_STREAM(), 0, &sock); if (status != PJ_SUCCESS) goto on_error; @@ -267,14 +237,13 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, pj_sockaddr_in_init(listener_addr, NULL, 0); } - status = pj_sock_bind(listener->sock, listener_addr, - sizeof(pj_sockaddr_in)); + status = pj_sock_bind(sock, listener_addr, sizeof(pj_sockaddr_in)); if (status != PJ_SUCCESS) goto on_error; /* Retrieve the bound address */ addr_len = sizeof(pj_sockaddr_in); - status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len); + status = pj_sock_getsockname(sock, listener_addr, &addr_len); if (status != PJ_SUCCESS) goto on_error; @@ -320,19 +289,22 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, /* Start listening to the address */ - status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG); + status = pj_sock_listen(sock, PJSIP_TCP_TRANSPORT_BACKLOG); if (status != PJ_SUCCESS) goto on_error; - /* Register socket to ioqeuue */ + /* Create active socket */ + if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.async_cnt = async_cnt; + pj_bzero(&listener_cb, sizeof(listener_cb)); listener_cb.on_accept_complete = &on_accept_complete; - status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt), - listener->sock, listener, - &listener_cb, &listener->key); - if (status != PJ_SUCCESS) - goto on_error; + status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, + pjsip_endpt_get_ioqueue(endpt), + &listener_cb, listener, + &listener->asock); /* Register to transport manager */ listener->endpt = endpt; @@ -347,34 +319,10 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, goto on_error; } - /* Start pending accept() operations */ - if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; - listener->async_cnt = async_cnt; - - for (i=0; iaccept_op[i] = PJ_POOL_ZALLOC_T(pool, - struct pending_accept); - pj_ioqueue_op_key_init(&listener->accept_op[i]->op_key, - sizeof(listener->accept_op[i]->op_key)); - listener->accept_op[i]->pool = pool; - listener->accept_op[i]->listener = listener; - listener->accept_op[i]->index = i; - - status = handle_accept(listener->key, &listener->accept_op[i]->op_key, - listener->sock, PJ_EPENDING); - if (status != PJ_SUCCESS) - goto on_error; - } + status = pj_activesock_start_accept(listener->asock, pool); + if (status != PJ_SUCCESS) + goto on_error; PJ_LOG(4,(listener->factory.obj_name, "SIP TCP listener ready for incoming connections at %.*s:%d", @@ -388,6 +336,8 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start2(pjsip_endpoint *endpt, return PJ_SUCCESS; on_error: + if (listener->asock==NULL && sock!=PJ_INVALID_SOCKET) + pj_sock_close(sock); lis_destroy(&listener->factory); return status; } @@ -410,22 +360,15 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, static pj_status_t lis_destroy(pjsip_tpfactory *factory) { struct tcp_listener *listener = (struct tcp_listener *)factory; - unsigned i; if (listener->is_registered) { pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory); listener->is_registered = PJ_FALSE; } - if (listener->key) { - pj_ioqueue_unregister(listener->key); - listener->key = NULL; - listener->sock = PJ_INVALID_SOCKET; - } - - if (listener->sock != PJ_INVALID_SOCKET) { - pj_sock_close(listener->sock); - listener->sock = PJ_INVALID_SOCKET; + if (listener->asock) { + pj_activesock_close(listener->asock); + listener->asock = NULL; } if (listener->factory.lock) { @@ -433,14 +376,6 @@ static pj_status_t lis_destroy(pjsip_tpfactory *factory) listener->factory.lock = NULL; } - for (i=0; iaccept_op); ++i) { - if (listener->accept_op[i] && listener->accept_op[i]->pool) { - pj_pool_t *pool = listener->accept_op[i]->pool; - listener->accept_op[i]->pool = NULL; - pj_pool_release(pool); - } - } - if (listener->factory.pool) { pj_pool_t *pool = listener->factory.pool; @@ -480,19 +415,21 @@ static pj_status_t tcp_destroy_transport(pjsip_transport *transport); static pj_status_t tcp_destroy(pjsip_transport *transport, pj_status_t reason); -/* Callback from ioqueue on incoming packet */ -static void on_read_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_read); +/* Callback on incoming data */ +static pj_bool_t on_data_read(pj_activesock_t *asock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder); -/* Callback from ioqueue when packet is sent */ -static void on_write_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_sent); +/* Callback when packet is sent */ +static pj_bool_t on_data_sent(pj_activesock_t *asock, + pj_ioqueue_op_key_t *send_key, + pj_ssize_t sent); -/* Callback from ioqueue when connect completes */ -static void on_connect_complete(pj_ioqueue_key_t *key, - pj_status_t status); +/* Callback when connect completes */ +static pj_bool_t on_connect_complete(pj_activesock_t *asock, + pj_status_t status); /* TCP keep-alive timer callback */ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e); @@ -510,7 +447,8 @@ static pj_status_t tcp_create( struct tcp_listener *listener, { struct tcp_transport *tcp; pj_ioqueue_t *ioqueue; - pj_ioqueue_callback tcp_callback; + pj_activesock_cfg asock_cfg; + pj_activesock_cb tcp_callback; const pj_str_t ka_pkt = PJSIP_TCP_KEEP_ALIVE_DATA; pj_status_t status; @@ -528,8 +466,8 @@ static pj_status_t tcp_create( struct tcp_listener *listener, * Create and initialize basic transport structure. */ tcp = PJ_POOL_ZALLOC_T(pool, struct tcp_transport); - tcp->sock = sock; tcp->is_server = is_server; + tcp->sock = sock; /*tcp->listener = listener;*/ pj_list_init(&tcp->delayed_list); tcp->base.pool = pool; @@ -569,15 +507,18 @@ static pj_status_t tcp_create( struct tcp_listener *listener, tcp->base.destroy = &tcp_destroy_transport; - /* Register socket to ioqueue */ - pj_bzero(&tcp_callback, sizeof(pj_ioqueue_callback)); - tcp_callback.on_read_complete = &on_read_complete; - tcp_callback.on_write_complete = &on_write_complete; + /* Create active socket */ + pj_activesock_cfg_default(&asock_cfg); + asock_cfg.async_cnt = 1; + + pj_bzero(&tcp_callback, sizeof(tcp_callback)); + tcp_callback.on_data_read = &on_data_read; + tcp_callback.on_data_sent = &on_data_sent; tcp_callback.on_connect_complete = &on_connect_complete; ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); - status = pj_ioqueue_register_sock(pool, ioqueue, sock, - tcp, &tcp_callback, &tcp->key); + status = pj_activesock_create(pool, sock, pj_SOCK_STREAM(), &asock_cfg, + ioqueue, &tcp_callback, tcp, &tcp->asock); if (status != PJ_SUCCESS) { goto on_error; } @@ -627,13 +568,12 @@ static void tcp_flush_pending_tx(struct tcp_transport *tcp) tdata = pending_tx->tdata_op_key->tdata; op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; - /* send to ioqueue! */ + /* send! */ size = tdata->buf.cur - tdata->buf.start; - status = pj_ioqueue_send(tcp->key, op_key, - tdata->buf.start, &size, 0); - + status = pj_activesock_send(tcp->asock, op_key, tdata->buf.start, + &size, 0); if (status != PJ_EPENDING) { - on_write_complete(tcp->key, op_key, size); + on_data_sent(tcp->asock, op_key, size); } } @@ -693,7 +633,7 @@ static pj_status_t tcp_destroy(pjsip_transport *transport, op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; - on_write_complete(tcp->key, op_key, -reason); + on_data_sent(tcp->asock, op_key, -reason); } if (tcp->rdata.tp_info.pool) { @@ -701,13 +641,11 @@ static pj_status_t tcp_destroy(pjsip_transport *transport, tcp->rdata.tp_info.pool = NULL; } - if (tcp->key) { - pj_ioqueue_unregister(tcp->key); - tcp->key = NULL; + if (tcp->asock) { + pj_activesock_close(tcp->asock); + tcp->asock = NULL; tcp->sock = PJ_INVALID_SOCKET; - } - - if (tcp->sock != PJ_INVALID_SOCKET) { + } else if (tcp->sock != PJ_INVALID_SOCKET) { pj_sock_close(tcp->sock); tcp->sock = PJ_INVALID_SOCKET; } @@ -759,6 +697,7 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp) pj_pool_t *pool; pj_ssize_t size; pj_sockaddr_in *rem_addr; + void *readbuf[1]; pj_status_t status; /* Init rdata */ @@ -787,11 +726,12 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp) tcp->rdata.pkt_info.src_port = pj_ntohs(rem_addr->sin_port); size = sizeof(tcp->rdata.pkt_info.packet); - status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, - tcp->rdata.pkt_info.packet, &size, - PJ_IOQUEUE_ALWAYS_ASYNC); + readbuf[0] = tcp->rdata.pkt_info.packet; + status = pj_activesock_start_read2(tcp->asock, tcp->base.pool, size, + readbuf, 0); if (status != PJ_SUCCESS && status != PJ_EPENDING) { - PJ_LOG(4, (tcp->base.obj_name, "ioqueue recv() error, status=%d", + PJ_LOG(4, (tcp->base.obj_name, + "pj_activesock_start_read() error, status=%d", status)); return status; } @@ -861,7 +801,8 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, /* Start asynchronous connect() operation */ tcp->has_pending_connect = PJ_TRUE; - status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); + status = pj_activesock_start_connect(tcp->asock, tcp->base.pool, rem_addr, + sizeof(pj_sockaddr_in)); if (status == PJ_SUCCESS) { tcp->has_pending_connect = PJ_FALSE; } else if (status != PJ_EPENDING) { @@ -873,7 +814,7 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, * set is different now that asynchronous connect() is started. */ addr_len = sizeof(pj_sockaddr_in); - if (pj_sock_getsockname(tcp->sock, &local_addr, &addr_len)==PJ_SUCCESS) { + if (pj_sock_getsockname(sock, &local_addr, &addr_len)==PJ_SUCCESS) { pj_sockaddr_in *tp_addr = (pj_sockaddr_in*)&tcp->base.local_addr; /* Some systems (like old Win32 perhaps) may not set local address @@ -908,157 +849,72 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, /* - * This callback is called by ioqueue when pending accept() operation has - * completed. + * This callback is called by active socket when pending accept() operation + * has completed. */ -static void on_accept_complete( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t sock, - pj_status_t status) -{ - handle_accept(key, op_key, sock, status); -} - - -/* Handle accept() completion */ -static pj_status_t handle_accept(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_sock_t sock, - pj_status_t status) +static pj_bool_t on_accept_complete(pj_activesock_t *asock, + pj_sock_t sock, + const pj_sockaddr_t *src_addr, + int src_addr_len) { struct tcp_listener *listener; struct tcp_transport *tcp; - struct pending_accept *accept_op; - int err_cnt = 0; - - listener = (struct tcp_listener*) pj_ioqueue_get_user_data(key); - accept_op = (struct pending_accept*) op_key; - - /* - * Loop while there is immediate connection or when there is error. - */ - do { - if (status == PJ_EPENDING) { - /* - * This can only happen when this function is called during - * initialization to kick off asynchronous accept(). - */ + char addr[PJ_INET6_ADDRSTRLEN+10]; + pj_status_t status; - } else if (status != PJ_SUCCESS) { + PJ_UNUSED_ARG(src_addr_len); - /* - * Error in accept(). - */ - tcp_perror(listener->factory.obj_name, "Error in accept()", - status); + listener = (struct tcp_listener*) pj_activesock_get_user_data(asock); - /* - * Prevent endless accept() error loop by limiting the - * number of consecutive errors. Once the number of errors - * is equal to maximum, we treat this as permanent error, and - * we stop the accept() operation. - */ - ++err_cnt; - if (err_cnt >= 20) { - PJ_LOG(1, (listener->factory.obj_name, - "Too many errors, LISTENER IS STOPPING!")); - return status; - } + PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_TRUE); + PJ_LOG(4,(listener->factory.obj_name, + "TCP listener %.*s:%d: got incoming TCP connection " + "from %s, sock=%d", + (int)listener->factory.addr_name.host.slen, + listener->factory.addr_name.host.ptr, + listener->factory.addr_name.port, + pj_sockaddr_print(src_addr, addr, sizeof(addr), 3), + sock)); + + /* + * Incoming connection! + * Create TCP transport for the new socket. + */ + status = tcp_create( listener, NULL, sock, PJ_TRUE, + (const pj_sockaddr_in*)&listener->factory.local_addr, + (const pj_sockaddr_in*)src_addr, &tcp); + if (status == PJ_SUCCESS) { + status = tcp_start_read(tcp); + if (status != PJ_SUCCESS) { + PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); + tcp_destroy(&tcp->base, status); } else { - pj_pool_t *pool; - struct pending_accept *new_op; - - if (sock == PJ_INVALID_SOCKET) { - sock = accept_op->new_sock; + /* Start keep-alive timer */ + if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { + pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; + pjsip_endpt_schedule_timer(listener->endpt, + &tcp->ka_timer, + &delay); + tcp->ka_timer.id = PJ_TRUE; + pj_gettimeofday(&tcp->last_activity); } - - if (sock == PJ_INVALID_SOCKET) { - pj_assert(!"Should not happen. status should be error"); - goto next_accept; - } - - PJ_LOG(4,(listener->factory.obj_name, - "TCP listener %.*s:%d: got incoming TCP connection " - "from %s:%d, sock=%d", - (int)listener->factory.addr_name.host.slen, - listener->factory.addr_name.host.ptr, - listener->factory.addr_name.port, - pj_inet_ntoa(accept_op->remote_addr.sin_addr), - pj_ntohs(accept_op->remote_addr.sin_port), - sock)); - - /* Create new accept_opt */ - pool = pjsip_endpt_create_pool(listener->endpt, "tcps%p", - POOL_TP_INIT, POOL_TP_INC); - new_op = PJ_POOL_ZALLOC_T(pool, struct pending_accept); - new_op->pool = pool; - new_op->listener = listener; - new_op->index = accept_op->index; - pj_ioqueue_op_key_init(&new_op->op_key, sizeof(new_op->op_key)); - listener->accept_op[accept_op->index] = new_op; - - /* - * Incoming connections! - * Create TCP transport for the new socket. - */ - status = tcp_create( listener, accept_op->pool, sock, PJ_TRUE, - &accept_op->local_addr, - &accept_op->remote_addr, &tcp); - if (status == PJ_SUCCESS) { - status = tcp_start_read(tcp); - if (status != PJ_SUCCESS) { - PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); - tcp_destroy(&tcp->base, status); - } else { - /* Start keep-alive timer */ - if (PJSIP_TCP_KEEP_ALIVE_INTERVAL) { - pj_time_val delay = {PJSIP_TCP_KEEP_ALIVE_INTERVAL, 0}; - pjsip_endpt_schedule_timer(listener->endpt, - &tcp->ka_timer, - &delay); - tcp->ka_timer.id = PJ_TRUE; - pj_gettimeofday(&tcp->last_activity); - } - } - } - - accept_op = new_op; } + } -next_accept: - /* - * Start the next asynchronous accept() operation. - */ - accept_op->addr_len = sizeof(pj_sockaddr_in); - accept_op->new_sock = PJ_INVALID_SOCKET; - - status = pj_ioqueue_accept(listener->key, - &accept_op->op_key, - &accept_op->new_sock, - &accept_op->local_addr, - &accept_op->remote_addr, - &accept_op->addr_len); - - /* - * Loop while we have immediate connection or when there is error. - */ - - } while (status != PJ_EPENDING); - - return PJ_SUCCESS; + return PJ_TRUE; } /* * Callback from ioqueue when packet is sent. */ -static void on_write_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_sent) +static pj_bool_t on_data_sent(pj_activesock_t *asock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent) { struct tcp_transport *tcp = (struct tcp_transport*) - pj_ioqueue_get_user_data(key); + pj_activesock_get_user_data(asock); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; /* Note that op_key may be the op_key from keep-alive, thus @@ -1078,6 +934,7 @@ static void on_write_complete(pj_ioqueue_key_t *key, /* Mark last activity time */ pj_gettimeofday(&tcp->last_activity); + } /* Check for error/closure */ @@ -1091,8 +948,11 @@ static void on_write_complete(pj_ioqueue_key_t *key, -bytes_sent; if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); + + return PJ_FALSE; } + return PJ_TRUE; } @@ -1166,9 +1026,9 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport, * sent asynchronously. */ size = tdata->buf.cur - tdata->buf.start; - status = pj_ioqueue_send(tcp->key, - (pj_ioqueue_op_key_t*)&tdata->op_key, - tdata->buf.start, &size, 0); + status = pj_activesock_send(tcp->asock, + (pj_ioqueue_op_key_t*)&tdata->op_key, + tdata->buf.start, &size, 0); if (status != PJ_EPENDING) { /* Not pending (could be immediate success or error) */ @@ -1212,158 +1072,97 @@ static pj_status_t tcp_shutdown(pjsip_transport *transport) /* * Callback from ioqueue that an incoming data is received from the socket. */ -static void on_read_complete(pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, - pj_ssize_t bytes_read) +static pj_bool_t on_data_read(pj_activesock_t *asock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) { enum { MAX_IMMEDIATE_PACKET = 10 }; - pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; - pjsip_rx_data *rdata = rdata_op_key->rdata; - struct tcp_transport *tcp = - (struct tcp_transport*)rdata->tp_info.transport; - int i; - pj_status_t status; + struct tcp_transport *tcp; + pjsip_rx_data *rdata; + + PJ_UNUSED_ARG(data); + + tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); + rdata = &tcp->rdata; /* Don't do anything if transport is closing. */ if (tcp->is_closing) { tcp->is_closing++; - return; + return PJ_FALSE; } - /* - * The idea of the loop is to process immediate data received by - * pj_ioqueue_recv(), as long as i < MAX_IMMEDIATE_PACKET. When - * i is >= MAX_IMMEDIATE_PACKET, we force the recv() operation to - * complete asynchronously, to allow other sockets to get their data. + /* Houston, we have packet! Report the packet to transport manager + * to be parsed. */ - for (i=0;; ++i) { - pj_uint32_t flags; - - /* Houston, we have packet! Report the packet to transport manager - * to be parsed. - */ - if (bytes_read > 0) { - pj_size_t size_eaten; - - /* Mark this as an activity */ - pj_gettimeofday(&tcp->last_activity); - - /* Init pkt_info part. */ - rdata->pkt_info.len += bytes_read; - rdata->pkt_info.zero = 0; - pj_gettimeofday(&rdata->pkt_info.timestamp); - - /* Report to transport manager. - * The transport manager will tell us how many bytes of the packet - * have been processed (as valid SIP message). - */ - size_eaten = - pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, - rdata); - - pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); - - /* Move unprocessed data to the front of the buffer */ - if (size_eaten>0 && size_eaten<(pj_size_t)rdata->pkt_info.len) { - pj_memmove(rdata->pkt_info.packet, - rdata->pkt_info.packet + size_eaten, - rdata->pkt_info.len - size_eaten); - } - - rdata->pkt_info.len -= size_eaten; - - } else if (bytes_read == 0) { - - /* Transport is closed */ - PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); - - /* We can not destroy the transport since high level objects may - * still keep reference to this transport. So we can only - * instruct transport manager to gracefully start the shutdown - * procedure for this transport. - */ - if (tcp->close_reason==PJ_SUCCESS) - tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); - pjsip_transport_shutdown(&tcp->base); - - return; + if (status == PJ_SUCCESS) { + pj_size_t size_eaten; - //} else if (bytes_read < 0) { - } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && - -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && - -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) - { + /* Mark this as an activity */ + pj_gettimeofday(&tcp->last_activity); - /* Socket error. */ - PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); + pj_assert((void*)rdata->pkt_info.packet == data); - /* We can not destroy the transport since high level objects may - * still keep reference to this transport. So we can only - * instruct transport manager to gracefully start the shutdown - * procedure for this transport. - */ - if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; - pjsip_transport_shutdown(&tcp->base); + /* Init pkt_info part. */ + rdata->pkt_info.len = size; + rdata->pkt_info.zero = 0; + pj_gettimeofday(&rdata->pkt_info.timestamp); - return; - } - - if (i >= MAX_IMMEDIATE_PACKET) { - /* Receive quota reached. Force ioqueue_recv() to - * return PJ_EPENDING - */ - flags = PJ_IOQUEUE_ALWAYS_ASYNC; - } else { - flags = 0; + /* Report to transport manager. + * The transport manager will tell us how many bytes of the packet + * have been processed (as valid SIP message). + */ + size_eaten = + pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, + rdata); + + pj_assert(size_eaten <= (pj_size_t)rdata->pkt_info.len); + + /* Move unprocessed data to the front of the buffer */ + *remainder = size - size_eaten; + if (*remainder > 0 && *remainder != size) { + pj_memmove(rdata->pkt_info.packet, + rdata->pkt_info.packet + size_eaten, + *remainder); } - /* Reset pool. */ - pj_pool_reset(rdata->tp_info.pool); - - /* Read next packet. */ - bytes_read = sizeof(rdata->pkt_info.packet) - rdata->pkt_info.len; - rdata->pkt_info.src_addr_len = sizeof(pj_sockaddr_in); - status = pj_ioqueue_recv(key, op_key, - rdata->pkt_info.packet+rdata->pkt_info.len, - &bytes_read, flags); - - if (status == PJ_SUCCESS) { + } else { - /* Continue loop. */ - pj_assert(i < MAX_IMMEDIATE_PACKET); + /* Transport is closed */ + PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); + + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) + tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); - } else if (status == PJ_EPENDING) { - break; + return PJ_FALSE; - } else { - /* Socket error */ - PJ_LOG(4,(tcp->base.obj_name, "TCP connection reset")); + } - /* We can not destroy the transport since high level objects may - * still keep reference to this transport. So we can only - * instruct transport manager to gracefully start the shutdown - * procedure for this transport. - */ - if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; - pjsip_transport_shutdown(&tcp->base); + /* Reset pool. */ + pj_pool_reset(rdata->tp_info.pool); - return; - } - } + return PJ_TRUE; } /* * Callback from ioqueue when asynchronous connect() operation completes. */ -static void on_connect_complete(pj_ioqueue_key_t *key, - pj_status_t status) +static pj_bool_t on_connect_complete(pj_activesock_t *asock, + pj_status_t status) { struct tcp_transport *tcp; pj_sockaddr_in addr; int addrlen; - tcp = (struct tcp_transport*) pj_ioqueue_get_user_data(key); + tcp = (struct tcp_transport*) pj_activesock_get_user_data(asock); /* Mark that pending connect() operation has completed. */ tcp->has_pending_connect = PJ_FALSE; @@ -1383,7 +1182,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key, op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; - on_write_complete(tcp->key, op_key, -status); + on_data_sent(tcp->asock, op_key, -status); } /* We can not destroy the transport since high level objects may @@ -1393,7 +1192,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key, */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); - return; + return PJ_FALSE; } PJ_LOG(4,(tcp->base.obj_name, @@ -1432,7 +1231,7 @@ static void on_connect_complete(pj_ioqueue_key_t *key, */ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; pjsip_transport_shutdown(&tcp->base); - return; + return PJ_FALSE; } /* Flush all pending send operations */ @@ -1446,6 +1245,8 @@ static void on_connect_complete(pj_ioqueue_key_t *key, tcp->ka_timer.id = PJ_TRUE; pj_gettimeofday(&tcp->last_activity); } + + return PJ_TRUE; } /* Transport keep-alive timer callback */ @@ -1482,8 +1283,8 @@ static void tcp_keep_alive_timer(pj_timer_heap_t *th, pj_timer_entry *e) /* Send the data */ size = tcp->ka_pkt.slen; - status = pj_ioqueue_send(tcp->key, &tcp->ka_op_key.key, - tcp->ka_pkt.ptr, &size, 0); + status = pj_activesock_send(tcp->asock, &tcp->ka_op_key.key, + tcp->ka_pkt.ptr, &size, 0); if (status != PJ_SUCCESS && status != PJ_EPENDING) { tcp_perror(tcp->base.obj_name, diff --git a/pjsip/src/test-pjsip/transport_tcp_test.c b/pjsip/src/test-pjsip/transport_tcp_test.c index 3594aff0..2a091caa 100644 --- a/pjsip/src/test-pjsip/transport_tcp_test.c +++ b/pjsip/src/test-pjsip/transport_tcp_test.c @@ -83,7 +83,11 @@ int transport_tcp_test(void) /* Check again that reference counter is 1. */ if (pj_atomic_get(tcp->ref_cnt) != 1) - return -70; + return -40; + + /* Load test */ + if (transport_load_test(url) != 0) + return -60; /* Basic transport's send/receive loopback test. */ for (i=0; imsg_info.cseq->cseq != mod_load.next_seq) { + PJ_LOG(1,("THIS_FILE", " err: expecting cseq %u, got %u", + mod_load.next_seq, rdata->msg_info.cseq->cseq)); + mod_load.err = PJ_TRUE; + mod_load.next_seq = rdata->msg_info.cseq->cseq + 1; + } else + mod_load.next_seq++; + return PJ_TRUE; +} + +int transport_load_test(char *target_url) +{ + enum { COUNT = 2000 }; + unsigned i; + pj_status_t status; + + /* exhaust packets */ + do { + pj_time_val delay = {1, 0}; + i = 0; + pjsip_endpt_handle_events2(endpt, &delay, &i); + } while (i != 0); + + PJ_LOG(3,(THIS_FILE, " transport load test...")); + + if (mod_load.mod.id == -1) { + status = pjsip_endpt_register_module( endpt, &mod_load.mod); + if (status != PJ_SUCCESS) { + app_perror("error registering module", status); + return -1; + } + } + mod_load.err = PJ_FALSE; + mod_load.next_seq = 0; + + for (i=0; i"); + call_id = pj_str("thecallid"); + status = pjsip_endpt_create_request(endpt, &pjsip_invite_method, + &target, &from, + &target, &from, &call_id, + i, NULL, &tdata ); + if (status != PJ_SUCCESS) { + app_perror("error creating request", status); + goto on_return; + } + + status = pjsip_endpt_send_request_stateless(endpt, tdata, NULL, NULL); + if (status != PJ_SUCCESS) { + app_perror("error sending request", status); + goto on_return; + } + } + + do { + pj_time_val delay = {1, 0}; + i = 0; + pjsip_endpt_handle_events2(endpt, &delay, &i); + } while (i != 0); + + if (mod_load.next_seq != COUNT) { + PJ_LOG(1,("THIS_FILE", " err: expecting %u msg, got only %u", + COUNT, mod_load.next_seq)); + status = -2; + goto on_return; + } + +on_return: + if (mod_load.mod.id != -1) { + pjsip_endpt_unregister_module( endpt, &mod_load.mod); + mod_load.mod.id = -1; + } + if (status != PJ_SUCCESS || mod_load.err) { + return -2; + } + PJ_LOG(3,(THIS_FILE, " success")); + return 0; +} + + -- cgit v1.2.3