diff options
Diffstat (limited to 'pjsip/src/pjsip/sip_transport_tcp.c')
-rw-r--r-- | pjsip/src/pjsip/sip_transport_tcp.c | 674 |
1 files changed, 502 insertions, 172 deletions
diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c index c168ccb0..42339c25 100644 --- a/pjsip/src/pjsip/sip_transport_tcp.c +++ b/pjsip/src/pjsip/sip_transport_tcp.c @@ -20,6 +20,7 @@ #include <pjsip/sip_endpoint.h> #include <pjsip/sip_errno.h> #include <pj/compat/socket.h> +#include <pj/addr_resolv.h> #include <pj/assert.h> #include <pj/ioqueue.h> #include <pj/lock.h> @@ -33,15 +34,21 @@ #define MAX_ASYNC_CNT 16 #define POOL_LIS_INIT 4000 -#define POOL_LIS_INC 4000 +#define POOL_LIS_INC 4001 #define POOL_TP_INIT 4000 -#define POOL_TP_INC 4000 +#define POOL_TP_INC 4002 struct tcp_listener; struct tcp_transport; +/* + * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to + * track pending/asynchronous accept() operation. TCP transport may have + * more than one pending accept() operations, depending on the value of + * async_cnt. + */ struct pending_accept { pj_ioqueue_op_key_t op_key; @@ -52,18 +59,16 @@ struct pending_accept pj_sockaddr_in remote_addr; }; -struct pending_connect -{ - pj_ioqueue_op_key_t op_key; - struct tcp_transport *transport; -}; - +/* + * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the + * SIP transport factory). + */ struct tcp_listener { pjsip_tpfactory factory; - char name[PJ_MAX_OBJ_NAME]; - pj_bool_t active; + char obj_name[PJ_MAX_OBJ_NAME]; + pj_bool_t is_registered; pjsip_endpoint *endpt; pjsip_tpmgr *tpmgr; pj_sock_t sock; @@ -73,23 +78,35 @@ struct tcp_listener }; -struct pending_tdata +/* + * This structure is used to keep delayed transmit operation in a list. + * A delayed transmission occurs when application sends tx_data when + * the TCP connect/establishment is still in progress. These delayed + * transmission will be "flushed" once the socket is connected (either + * successfully or with errors). + */ +struct delayed_tdata { - PJ_DECL_LIST_MEMBER(struct pending_tdata); + PJ_DECL_LIST_MEMBER(struct delayed_tdata); pjsip_tx_data_op_key *tdata_op_key; }; +/* + * This structure describes the TCP transport, and it's descendant of + * pjsip_transport. + */ struct tcp_transport { pjsip_transport base; + pj_bool_t is_server; struct tcp_listener *listener; pj_bool_t is_registered; pj_bool_t is_closing; + pj_status_t close_reason; pj_sock_t sock; pj_ioqueue_key_t *key; pj_bool_t has_pending_connect; - struct pending_connect connect_op; /* TCP transport can only have one rdata! @@ -99,19 +116,24 @@ struct tcp_transport pjsip_rx_data rdata; /* Pending transmission list. */ - struct pending_tdata tx_list; + struct delayed_tdata delayed_list; }; -/* - * This callback is called when #pj_ioqueue_accept completes. +/**************************************************************************** + * PROTOTYPES */ + +/* This callback is called when pending accept() operation completes. */ static void on_accept_complete( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_sock_t sock, pj_status_t status); -static pj_status_t lis_destroy(struct tcp_listener *listener); +/* This callback is called by transport manager to destroy listener */ +static pj_status_t lis_destroy(pjsip_tpfactory *factory); + +/* This callback is called by transport manager to create transport */ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, pjsip_tpmgr *mgr, pjsip_endpoint *endpt, @@ -119,12 +141,12 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, int addr_len, pjsip_transport **transport); - -static pj_status_t create_tcp_transport(struct tcp_listener *listener, - pj_sock_t sock, - const pj_sockaddr_in *local, - const pj_sockaddr_in *remote, - struct tcp_transport **p_tcp); +/* Common function to create and initialize transport */ +static pj_status_t tcp_create(struct tcp_listener *listener, + pj_sock_t sock, pj_bool_t is_server, + const pj_sockaddr_in *local, + const pj_sockaddr_in *remote, + struct tcp_transport **p_tcp); static void tcp_perror(const char *sender, const char *title, @@ -138,18 +160,41 @@ static void tcp_perror(const char *sender, const char *title, } +static void sockaddr_to_host_port( pj_pool_t *pool, + pjsip_host_port *host_port, + const pj_sockaddr_in *addr ) +{ + host_port->host.ptr = pj_pool_alloc(pool, 48); + host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s", + pj_inet_ntoa(addr->sin_addr)); + host_port->port = pj_ntohs(addr->sin_port); +} + + + +/**************************************************************************** + * The TCP listener/transport factory. + */ + +/* + * This is the public API to create, initialize, register, and start the + * TCP listener. + */ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, const pj_sockaddr_in *local, - unsigned async_cnt) + unsigned async_cnt, + pjsip_tpfactory **p_factory) { pj_pool_t *pool; struct tcp_listener *listener; pj_ioqueue_callback listener_cb; + pj_sockaddr_in *listener_addr; + int addr_len; unsigned i; pj_status_t status; /* Sanity check */ - PJ_ASSERT_RETURN(endpt && local && async_cnt, PJ_EINVAL); + PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT, @@ -158,14 +203,15 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener)); - pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port)); listener->factory.pool = pool; listener->factory.type = PJSIP_TRANSPORT_TCP; - pj_ansi_strcpy(listener->factory.type_name, "tcp"); + listener->factory.type_name = "tcp"; listener->factory.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP); listener->sock = PJ_INVALID_SOCKET; + pj_ansi_strcpy(listener->obj_name, "tcp"); + status = pj_lock_create_recursive_mutex(pool, "tcplis", &listener->factory.lock); if (status != PJ_SUCCESS) @@ -177,11 +223,52 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, if (status != PJ_SUCCESS) goto on_error; - pj_memcpy(&listener->factory.local_addr, local, sizeof(pj_sockaddr_in)); - status = pj_sock_bind(listener->sock, local, sizeof(*local)); + listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr; + if (local) { + pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in)); + } else { + pj_sockaddr_in_init(listener_addr, NULL, 0); + } + + status = pj_sock_bind(listener->sock, listener_addr, + sizeof(pj_sockaddr_in)); + if (status != PJ_SUCCESS) + goto on_error; + + /* Retrieve the bound address */ + addr_len = sizeof(pj_sockaddr_in); + status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len); if (status != PJ_SUCCESS) goto on_error; + /* If the address returns 0.0.0.0, use the first interface address + * as the transport's address. + */ + if (listener_addr->sin_addr.s_addr == 0) { + const pj_str_t *hostname; + struct pj_hostent he; + + hostname = pj_gethostname(); + status = pj_gethostbyname(hostname, &he); + if (status != PJ_SUCCESS) + goto on_error; + + listener_addr->sin_addr = *(pj_in_addr*)he.h_addr; + } + + pj_ansi_sprintf(listener->obj_name, "tcp:%d", + (int)pj_ntohs(listener_addr->sin_port)); + + /* Save the address name */ + sockaddr_to_host_port(listener->factory.pool, + &listener->factory.addr_name, listener_addr); + + /* Start listening to the address */ + status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG); + if (status != PJ_SUCCESS) + goto on_error; + + /* Register socket to ioqeuue */ pj_memset(&listener_cb, 0, sizeof(listener_cb)); listener_cb.on_accept_complete = &on_accept_complete; @@ -191,7 +278,21 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, if (status != PJ_SUCCESS) goto on_error; - /* Start pending accept() operation */ + /* Register to transport manager */ + listener->endpt = endpt; + listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); + listener->factory.create_transport = lis_create_transport; + listener->factory.destroy = lis_destroy; + listener->is_registered = PJ_TRUE; + status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, + &listener->factory); + if (status != PJ_SUCCESS) { + listener->is_registered = PJ_FALSE; + goto on_error; + } + + + /* Start pending accept() operations */ if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT; listener->async_cnt = async_cnt; @@ -200,45 +301,34 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt, sizeof(listener->accept_op[i].op_key)); listener->accept_op[i].listener = listener; - status = pj_ioqueue_accept(listener->key, - &listener->accept_op[i].op_key, - &listener->accept_op[i].new_sock, - &listener->accept_op[i].local_addr, - &listener->accept_op[i].remote_addr, - &listener->accept_op[i].addr_len); - if (status != PJ_SUCCESS && status != PJ_EPENDING) - goto on_error; + on_accept_complete(listener->key, &listener->accept_op[i].op_key, + listener->sock, PJ_EPENDING); } - /* Register to transport manager */ - listener->endpt = endpt; - listener->tpmgr = pjsip_endpt_get_tpmgr(endpt); - listener->factory.create_transport = lis_create_transport; - status = pjsip_tpmgr_register_tpfactory(listener->tpmgr, - &listener->factory); - if (status != PJ_SUCCESS) - goto on_error; - - /* Done! */ - listener->active = PJ_TRUE; + PJ_LOG(4,(listener->obj_name, + "SIP TCP listener ready for incoming connections at %s:%d", + pj_inet_ntoa(listener_addr->sin_addr), + (int)pj_ntohs(listener_addr->sin_port))); - PJ_LOG(4,(listener->name, - "SIP TCP transport listening for incoming connections at %s:%d", - pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port))); + /* Return the pointer to user */ + if (p_factory) *p_factory = &listener->factory; return PJ_SUCCESS; on_error: - lis_destroy(listener); + lis_destroy(&listener->factory); return status; } -static pj_status_t lis_destroy(struct tcp_listener *listener) +/* This callback is called by transport manager to destroy listener */ +static pj_status_t lis_destroy(pjsip_tpfactory *factory) { - if (listener->active) { + struct tcp_listener *listener = (struct tcp_listener *)factory; + + if (listener->is_registered) { pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory); - listener->active = PJ_FALSE; + listener->is_registered = PJ_FALSE; } if (listener->key) { @@ -258,9 +348,12 @@ static pj_status_t lis_destroy(struct tcp_listener *listener) } if (listener->factory.pool) { - PJ_LOG(4,(listener->name, "SIP TCP transport destroyed")); - pj_pool_release(listener->factory.pool); + pj_pool_t *pool = listener->factory.pool; + + PJ_LOG(4,(listener->obj_name, "SIP TCP listener destroyed")); + listener->factory.pool = NULL; + pj_pool_release(pool); } return PJ_SUCCESS; @@ -288,8 +381,12 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport, /* Called by transport manager to shutdown */ static pj_status_t tcp_shutdown(pjsip_transport *transport); -/* Called by transport manager to destroy */ -static pj_status_t tcp_destroy(pjsip_transport *transport); +/* Called by transport manager to destroy transport */ +static pj_status_t tcp_destroy_transport(pjsip_transport *transport); + +/* Utility to destroy transport */ +static pj_status_t tcp_destroy(pjsip_transport *transport, + pj_status_t reason); /* Callback from ioqueue on incoming packet */ static void on_read_complete(pj_ioqueue_key_t *key, @@ -306,25 +403,15 @@ static void on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status); -static void sockaddr_to_host_port( pj_pool_t *pool, - pjsip_host_port *host_port, - const pj_sockaddr_in *addr ) -{ - host_port->host.ptr = pj_pool_alloc(pool, 48); - host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s", - pj_inet_ntoa(addr->sin_addr)); - host_port->port = pj_ntohs(addr->sin_port); -} - - /* - * Utilities to create TCP transport. + * Common function to create TCP transport, called when pending accept() and + * pending connect() complete. */ -static pj_status_t create_tcp_transport(struct tcp_listener *listener, - pj_sock_t sock, - const pj_sockaddr_in *local, - const pj_sockaddr_in *remote, - struct tcp_transport **p_tcp) +static pj_status_t tcp_create( struct tcp_listener *listener, + pj_sock_t sock, pj_bool_t is_server, + const pj_sockaddr_in *local, + const pj_sockaddr_in *remote, + struct tcp_transport **p_tcp) { struct tcp_transport *tcp; pj_pool_t *pool; @@ -332,28 +419,37 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener, pj_ioqueue_callback tcp_callback; pj_status_t status; - pool = pjsip_endpt_create_pool(listener->endpt, "tcp", + + PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL); + + + pool = pjsip_endpt_create_pool(listener->endpt, "tcp", POOL_TP_INIT, POOL_TP_INC); + PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM); + /* * Create and initialize basic transport structure. */ tcp = pj_pool_zalloc(pool, sizeof(*tcp)); tcp->sock = sock; + tcp->is_server = is_server; tcp->listener = listener; - pj_list_init(&tcp->tx_list); - - - pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp); + pj_list_init(&tcp->delayed_list); tcp->base.pool = pool; + pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, + (is_server ? "tcps%p" :"tcpc%p"), tcp); + status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { goto on_error; + } status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { goto on_error; + } tcp->base.key.type = PJSIP_TRANSPORT_TCP; pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in)); @@ -374,7 +470,7 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener, tcp->base.tpmgr = listener->tpmgr; tcp->base.send_msg = &tcp_send_msg; tcp->base.do_shutdown = &tcp_shutdown; - tcp->base.destroy = &tcp_destroy; + tcp->base.destroy = &tcp_destroy_transport; /* Register socket to ioqueue */ @@ -386,37 +482,44 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener, ioqueue = pjsip_endpt_get_ioqueue(listener->endpt); status = pj_ioqueue_register_sock(pool, ioqueue, sock, tcp, &tcp_callback, &tcp->key); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { goto on_error; + } /* Register transport to transport manager */ status = pjsip_transport_register(listener->tpmgr, &tcp->base); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { goto on_error; + } tcp->is_registered = PJ_TRUE; /* Done setting up basic transport. */ *p_tcp = tcp; + PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created", + (tcp->is_server ? "server" : "client"))); + + return PJ_SUCCESS; + on_error: - tcp_destroy(&tcp->base); + tcp_destroy(&tcp->base, status); return status; } -/* Flush all pending send operations */ -static tcp_flush_pending_tx(struct tcp_transport *tcp) +/* Flush all delayed transmision once the socket is connected. */ +static void tcp_flush_pending_tx(struct tcp_transport *tcp) { pj_lock_acquire(tcp->base.lock); - while (!pj_list_empty(&tcp->tx_list)) { - struct pending_tdata *pending_tx; + while (!pj_list_empty(&tcp->delayed_list)) { + struct delayed_tdata *pending_tx; pjsip_tx_data *tdata; pj_ioqueue_op_key_t *op_key; pj_ssize_t size; pj_status_t status; - pending_tx = tcp->tx_list.next; + pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); tdata = pending_tx->tdata_op_key->tdata; @@ -436,29 +539,53 @@ static tcp_flush_pending_tx(struct tcp_transport *tcp) } +/* Called by transport manager to destroy transport */ +static pj_status_t tcp_destroy_transport(pjsip_transport *transport) +{ + struct tcp_transport *tcp = (struct tcp_transport*)transport; + + /* Transport would have been unregistered by now since this callback + * is called by transport manager. + */ + tcp->is_registered = PJ_FALSE; + + return tcp_destroy(transport, tcp->close_reason); +} + /* Destroy TCP transport */ -static pj_status_t tcp_destroy(pjsip_transport *transport) +static pj_status_t tcp_destroy(pjsip_transport *transport, + pj_status_t reason) { struct tcp_transport *tcp = (struct tcp_transport*)transport; - /* Cancel all pending transmits */ - while (!pj_list_empty(&tcp->tx_list)) { - struct pending_tdata *pending_tx; + if (tcp->close_reason == 0) + tcp->close_reason = reason; + + if (tcp->is_registered) { + tcp->is_registered = PJ_FALSE; + pjsip_transport_destroy(transport); + + /* pjsip_transport_destroy will recursively call this function + * again. + */ + return PJ_SUCCESS; + } + + /* Mark transport as closing */ + tcp->is_closing = PJ_TRUE; + + /* Cancel all delayed transmits */ + while (!pj_list_empty(&tcp->delayed_list)) { + struct delayed_tdata *pending_tx; pj_ioqueue_op_key_t *op_key; - pending_tx = tcp->tx_list.next; + pending_tx = tcp->delayed_list.next; pj_list_erase(pending_tx); op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key; - on_write_complete(tcp->key, op_key, - -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN)); - } - - if (tcp->is_registered) { - pjsip_transport_destroy(transport); - tcp->is_registered = PJ_FALSE; + on_write_complete(tcp->key, op_key, -reason); } if (tcp->rdata.tp_info.pool) { @@ -469,6 +596,12 @@ static pj_status_t tcp_destroy(pjsip_transport *transport) if (tcp->key) { pj_ioqueue_unregister(tcp->key); tcp->key = NULL; + tcp->sock = PJ_INVALID_SOCKET; + } + + if (tcp->sock != PJ_INVALID_SOCKET) { + pj_sock_close(tcp->sock); + tcp->sock = PJ_INVALID_SOCKET; } if (tcp->base.lock) { @@ -482,9 +615,26 @@ static pj_status_t tcp_destroy(pjsip_transport *transport) } if (tcp->base.pool) { - PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed")); - pj_pool_release(tcp->base.pool); + pj_pool_t *pool; + + if (reason != PJ_SUCCESS) { + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(reason, errmsg, sizeof(errmsg)); + PJ_LOG(4,(tcp->base.obj_name, + "TCP transport destroyed with reason %d: %s", + reason, errmsg)); + + } else { + + PJ_LOG(4,(tcp->base.obj_name, + "TCP transport destroyed normally")); + + } + + pool = tcp->base.pool; tcp->base.pool = NULL; + pj_pool_release(pool); } return PJ_SUCCESS; @@ -493,7 +643,8 @@ static pj_status_t tcp_destroy(pjsip_transport *transport) /* * This utility function creates receive data buffers and start - * asynchronous recv() operations from the socket. + * asynchronous recv() operations from the socket. It is called after + * accept() or connect() operation complete. */ static pj_status_t tcp_start_read(struct tcp_transport *tcp) { @@ -531,7 +682,7 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp) status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key, tcp->rdata.pkt_info.packet, &size, PJ_IOQUEUE_ALWAYS_ASYNC); - if (status != PJ_SUCCESS) { + if (status != PJ_SUCCESS && status != PJ_EPENDING) { tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status); return status; } @@ -593,19 +744,19 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, ((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr; /* Create the transport descriptor */ - status = create_tcp_transport(listener, sock, &local_addr, - (pj_sockaddr_in*)rem_addr, &tcp); + status = tcp_create(listener, sock, PJ_FALSE, &local_addr, + (pj_sockaddr_in*)rem_addr, &tcp); if (status != PJ_SUCCESS) return status; - + + /* Start asynchronous connect() operation */ tcp->has_pending_connect = PJ_TRUE; - pj_ioqueue_op_key_init(&tcp->connect_op.op_key, - sizeof(tcp->connect_op.op_key)); - tcp->connect_op.transport = tcp; status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in)); - if (status != PJ_SUCCESS) { - tcp_destroy(&tcp->base); + if (status == PJ_SUCCESS) { + tcp->has_pending_connect = PJ_FALSE; + } else if (status != PJ_EPENDING) { + tcp_destroy(&tcp->base, status); return status; } @@ -629,6 +780,17 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory, } } + if (tcp->has_pending_connect) { + PJ_LOG(4,(tcp->base.obj_name, + "TCP transport %.*s:%d is connecting to %.*s:%d...", + (int)tcp->base.local_name.host.slen, + tcp->base.local_name.host.ptr, + tcp->base.local_name.port, + (int)tcp->base.remote_name.host.slen, + tcp->base.remote_name.host.ptr, + tcp->base.remote_name.port)); + } + /* Done */ *p_transport = &tcp->base; @@ -653,31 +815,75 @@ static void on_accept_complete( pj_ioqueue_key_t *key, listener = pj_ioqueue_get_user_data(key); accept_op = (struct pending_accept*) op_key; + /* + * Loop while there is immediate connection or when there is error. + */ do { - if (status != PJ_SUCCESS) { - tcp_perror(listener->name, "Error in accept()", status); - + if (status == PJ_EPENDING) { + /* + * This can only happen when this function is called during + * initialization to kick off asynchronous accept(). + */ + + } else if (status != PJ_SUCCESS) { + + /* + * Error in accept(). + */ + tcp_perror(listener->obj_name, "Error in accept()", status); + + /* + * Prevent endless accept() error loop by limiting the + * number of consecutive errors. Once the number of errors + * is equal to maximum, we treat this as permanent error, and + * we stop the accept() operation. + */ ++err_cnt; - if (err_cnt >= 5) { - PJ_LOG(1, (listener->name, + if (err_cnt >= 10) { + PJ_LOG(1, (listener->obj_name, "Too many errors, listener stopping")); } - goto start_next_accept; - } + } else { - status = create_tcp_transport( listener, sock, - &accept_op->local_addr, - &accept_op->remote_addr, &tcp); - if (status == PJ_SUCCESS) { - status = tcp_start_read(tcp); - if (status != PJ_SUCCESS) { - PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); - tcp_destroy(&tcp->base); + if (sock == PJ_INVALID_SOCKET) { + sock = accept_op->new_sock; + PJ_LOG(4,(listener->obj_name, + "Warning: ioqueue reports -1 in on_accept_complete()" + " sock argument")); + } + + PJ_LOG(4,(listener->obj_name, + "TCP listener %.*s:%d: got incoming TCP connection " + "from %s:%d, sock=%d", + (int)listener->factory.addr_name.host.slen, + listener->factory.addr_name.host.ptr, + listener->factory.addr_name.port, + pj_inet_ntoa(accept_op->remote_addr.sin_addr), + pj_ntohs(accept_op->remote_addr.sin_port), + sock)); + + /* + * Incoming connections! + * Create TCP transport for the new socket. + */ + status = tcp_create( listener, sock, PJ_TRUE, + &accept_op->local_addr, + &accept_op->remote_addr, &tcp); + if (status == PJ_SUCCESS) { + status = tcp_start_read(tcp); + if (status != PJ_SUCCESS) { + PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled")); + tcp_destroy(&tcp->base, status); + } } } -start_next_accept: + /* + * Start the next asynchronous accept() operation. + */ + accept_op->addr_len = sizeof(pj_sockaddr_in); + accept_op->new_sock = PJ_INVALID_SOCKET; status = pj_ioqueue_accept(listener->key, &accept_op->op_key, @@ -686,27 +892,51 @@ start_next_accept: &accept_op->remote_addr, &accept_op->addr_len); + /* + * Loop while we have immediate connection or when there is error. + */ + } while (status != PJ_EPENDING); } -/* Callback from ioqueue when packet is sent */ +/* + * Callback from ioqueue when packet is sent. + */ static void on_write_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_sent) { - struct tcp_transport *tp = pj_ioqueue_get_user_data(key); + struct tcp_transport *tcp = pj_ioqueue_get_user_data(key); pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; tdata_op_key->tdata = NULL; + /* Check for error/closure */ + if (bytes_sent <= 0) { + pj_status_t status; + + PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", + bytes_sent)); + + status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) : + -bytes_sent; + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); + } + if (tdata_op_key->callback) { - tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); + /* + * Notify sip_transport.c that packet has been sent. + */ + tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent); } } -/* This callback is called by transport manager to send SIP message */ +/* + * This callback is called by transport manager to send SIP message + */ static pj_status_t tcp_send_msg(pjsip_transport *transport, pjsip_tx_data *tdata, const pj_sockaddr_t *rem_addr, @@ -718,7 +948,8 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport, { struct tcp_transport *tcp = (struct tcp_transport*)transport; pj_ssize_t size; - pj_status_t status; + pj_bool_t delayed = PJ_FALSE; + pj_status_t status = PJ_SUCCESS; /* Sanity check */ PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); @@ -737,38 +968,74 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport, tdata->op_key.callback = callback; /* If asynchronous connect() has not completed yet, just put the - * transmit data in the pending transmission list. + * transmit data in the pending transmission list since we can not + * use the socket yet. */ - pj_lock_acquire(tcp->base.lock); - if (tcp->has_pending_connect) { - struct pending_tdata *pending_tdata; - /* Pust to list */ - pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata)); - pending_tdata->tdata_op_key = &tdata->op_key; + /* + * Looks like connect() is still in progress. Check again (this time + * with holding the lock) to be sure. + */ + pj_lock_acquire(tcp->base.lock); - pj_list_push_back(&tcp->tx_list, pending_tdata); - status = PJ_EPENDING; + if (tcp->has_pending_connect) { + struct delayed_tdata *delayed_tdata; - } else { - /* send to ioqueue! */ + /* + * connect() is still in progress. Put the transmit data to + * the delayed list. + */ + delayed_tdata = pj_pool_alloc(tdata->pool, + sizeof(*delayed_tdata)); + delayed_tdata->tdata_op_key = &tdata->op_key; + + pj_list_push_back(&tcp->delayed_list, delayed_tdata); + status = PJ_EPENDING; + + /* Prevent pj_ioqueue_send() to be called below */ + delayed = PJ_TRUE; + } + + pj_lock_release(tcp->base.lock); + } + + if (!delayed) { + /* + * Transport is ready to go. Send the packet to ioqueue to be + * sent asynchronously. + */ size = tdata->buf.cur - tdata->buf.start; status = pj_ioqueue_send(tcp->key, (pj_ioqueue_op_key_t*)&tdata->op_key, tdata->buf.start, &size, 0); - if (status != PJ_EPENDING) + if (status != PJ_EPENDING) { + /* Not pending (could be immediate success or error) */ tdata->op_key.tdata = NULL; - } - pj_lock_release(tcp->base.lock); + /* Shutdown transport on closure/errors */ + if (size <= 0) { + + PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d", + size)); + + if (status == PJ_SUCCESS) + status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); + } + } + } return status; } -/* This callback is called by transport manager to shutdown transport */ +/* + * This callback is called by transport manager to shutdown transport. + * This normally is only used by UDP transport. + */ static pj_status_t tcp_shutdown(pjsip_transport *transport) { @@ -779,7 +1046,9 @@ static pj_status_t tcp_shutdown(pjsip_transport *transport) } -/* Callback from ioqueue on incoming packet */ +/* + * Callback from ioqueue that an incoming data is received from the socket. + */ static void on_read_complete(pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) @@ -787,13 +1056,14 @@ static void on_read_complete(pj_ioqueue_key_t *key, enum { MAX_IMMEDIATE_PACKET = 10 }; pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; pjsip_rx_data *rdata = rdata_op_key->rdata; - struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport; + struct tcp_transport *tcp = + (struct tcp_transport*)rdata->tp_info.transport; int i; pj_status_t status; /* Don't do anything if transport is closing. */ - if (tp->is_closing) { - tp->is_closing++; + if (tcp->is_closing) { + tcp->is_closing++; return; } @@ -806,7 +1076,9 @@ static void on_read_complete(pj_ioqueue_key_t *key, for (i=0;; ++i) { pj_uint32_t flags; - /* Report the packet to transport manager. */ + /* Houston, we have packet! Report the packet to transport manager + * to be parsed. + */ if (bytes_read > 0) { pj_size_t size_eaten; @@ -815,6 +1087,10 @@ static void on_read_complete(pj_ioqueue_key_t *key, rdata->pkt_info.zero = 0; pj_gettimeofday(&rdata->pkt_info.timestamp); + /* Report to transport manager. + * The transport manager will tell us how many bytes of the packet + * have been processed (as valid SIP message). + */ size_eaten = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata); @@ -833,24 +1109,45 @@ static void on_read_complete(pj_ioqueue_key_t *key, } else if (bytes_read == 0) { /* Transport is closed */ - PJ_LOG(4,(tp->base.obj_name, "tcp connection closed")); - tcp_destroy(&tp->base); + PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed")); + + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) + tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN); + pjsip_transport_shutdown(&tcp->base); + return; - } else if (bytes_read < 0) { + //} else if (bytes_read < 0) { + } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && + -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && + -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) + { /* Report error to endpoint. */ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, rdata->tp_info.transport->obj_name, - -bytes_read, "tcp recv() error")); + -bytes_read, "TCP recv() error")); + + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read; + pjsip_transport_shutdown(&tcp->base); - /* Transport error, close transport */ - tcp_destroy(&tp->base); return; } if (i >= MAX_IMMEDIATE_PACKET) { - /* Force ioqueue_recv() to return PJ_EPENDING */ + /* Receive quota reached. Force ioqueue_recv() to + * return PJ_EPENDING + */ flags = PJ_IOQUEUE_ALWAYS_ASYNC; } else { flags = 0; @@ -867,6 +1164,7 @@ static void on_read_complete(pj_ioqueue_key_t *key, &bytes_read, flags); if (status == PJ_SUCCESS) { + /* Continue loop. */ pj_assert(i < MAX_IMMEDIATE_PACKET); @@ -879,30 +1177,56 @@ static void on_read_complete(pj_ioqueue_key_t *key, rdata->tp_info.transport->obj_name, status, "tcp recv() error")); - /* Transport error, close transport */ - tcp_destroy(&tp->base); + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); + return; } } } -/* Callback from ioqueue when connect completes */ +/* + * Callback from ioqueue when asynchronous connect() operation completes. + */ static void on_connect_complete(pj_ioqueue_key_t *key, pj_status_t status) { - struct pending_connect *connect_op = (struct pending_connect *)key; - struct tcp_transport *tcp = connect_op->transport; + struct tcp_transport *tcp; pj_sockaddr_in addr; int addrlen; + tcp = pj_ioqueue_get_user_data(key); + + PJ_LOG(4,(tcp->base.obj_name, + "TCP transport %.*s:%d is connected to %.*s:%d", + (int)tcp->base.local_name.host.slen, + tcp->base.local_name.host.ptr, + tcp->base.local_name.port, + (int)tcp->base.remote_name.host.slen, + tcp->base.remote_name.host.ptr, + tcp->base.remote_name.port)); + /* Mark that pending connect() operation has completed. */ tcp->has_pending_connect = PJ_FALSE; /* Check connect() status */ if (status != PJ_SUCCESS) { + tcp_perror(tcp->base.obj_name, "TCP connect() error", status); - tcp_destroy(&tcp->base); + + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); return; } @@ -925,7 +1249,13 @@ static void on_connect_complete(pj_ioqueue_key_t *key, /* Start pending read */ status = tcp_start_read(tcp); if (status != PJ_SUCCESS) { - tcp_destroy(&tcp->base); + /* We can not destroy the transport since high level objects may + * still keep reference to this transport. So we can only + * instruct transport manager to gracefully start the shutdown + * procedure for this transport. + */ + if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status; + pjsip_transport_shutdown(&tcp->base); return; } |