summaryrefslogtreecommitdiff
path: root/pjsip/src/pjsip/sip_transport_tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjsip/src/pjsip/sip_transport_tcp.c')
-rw-r--r--pjsip/src/pjsip/sip_transport_tcp.c674
1 files changed, 502 insertions, 172 deletions
diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c
index c168ccb0..42339c25 100644
--- a/pjsip/src/pjsip/sip_transport_tcp.c
+++ b/pjsip/src/pjsip/sip_transport_tcp.c
@@ -20,6 +20,7 @@
#include <pjsip/sip_endpoint.h>
#include <pjsip/sip_errno.h>
#include <pj/compat/socket.h>
+#include <pj/addr_resolv.h>
#include <pj/assert.h>
#include <pj/ioqueue.h>
#include <pj/lock.h>
@@ -33,15 +34,21 @@
#define MAX_ASYNC_CNT 16
#define POOL_LIS_INIT 4000
-#define POOL_LIS_INC 4000
+#define POOL_LIS_INC 4001
#define POOL_TP_INIT 4000
-#define POOL_TP_INC 4000
+#define POOL_TP_INC 4002
struct tcp_listener;
struct tcp_transport;
+/*
+ * This structure is "descendant" of pj_ioqueue_op_key_t, and it is used to
+ * track pending/asynchronous accept() operation. TCP transport may have
+ * more than one pending accept() operations, depending on the value of
+ * async_cnt.
+ */
struct pending_accept
{
pj_ioqueue_op_key_t op_key;
@@ -52,18 +59,16 @@ struct pending_accept
pj_sockaddr_in remote_addr;
};
-struct pending_connect
-{
- pj_ioqueue_op_key_t op_key;
- struct tcp_transport *transport;
-};
-
+/*
+ * This is the TCP listener, which is a "descendant" of pjsip_tpfactory (the
+ * SIP transport factory).
+ */
struct tcp_listener
{
pjsip_tpfactory factory;
- char name[PJ_MAX_OBJ_NAME];
- pj_bool_t active;
+ char obj_name[PJ_MAX_OBJ_NAME];
+ pj_bool_t is_registered;
pjsip_endpoint *endpt;
pjsip_tpmgr *tpmgr;
pj_sock_t sock;
@@ -73,23 +78,35 @@ struct tcp_listener
};
-struct pending_tdata
+/*
+ * This structure is used to keep delayed transmit operation in a list.
+ * A delayed transmission occurs when application sends tx_data when
+ * the TCP connect/establishment is still in progress. These delayed
+ * transmission will be "flushed" once the socket is connected (either
+ * successfully or with errors).
+ */
+struct delayed_tdata
{
- PJ_DECL_LIST_MEMBER(struct pending_tdata);
+ PJ_DECL_LIST_MEMBER(struct delayed_tdata);
pjsip_tx_data_op_key *tdata_op_key;
};
+/*
+ * This structure describes the TCP transport, and it's descendant of
+ * pjsip_transport.
+ */
struct tcp_transport
{
pjsip_transport base;
+ pj_bool_t is_server;
struct tcp_listener *listener;
pj_bool_t is_registered;
pj_bool_t is_closing;
+ pj_status_t close_reason;
pj_sock_t sock;
pj_ioqueue_key_t *key;
pj_bool_t has_pending_connect;
- struct pending_connect connect_op;
/* TCP transport can only have one rdata!
@@ -99,19 +116,24 @@ struct tcp_transport
pjsip_rx_data rdata;
/* Pending transmission list. */
- struct pending_tdata tx_list;
+ struct delayed_tdata delayed_list;
};
-/*
- * This callback is called when #pj_ioqueue_accept completes.
+/****************************************************************************
+ * PROTOTYPES
*/
+
+/* This callback is called when pending accept() operation completes. */
static void on_accept_complete( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t sock,
pj_status_t status);
-static pj_status_t lis_destroy(struct tcp_listener *listener);
+/* This callback is called by transport manager to destroy listener */
+static pj_status_t lis_destroy(pjsip_tpfactory *factory);
+
+/* This callback is called by transport manager to create transport */
static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
pjsip_tpmgr *mgr,
pjsip_endpoint *endpt,
@@ -119,12 +141,12 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
int addr_len,
pjsip_transport **transport);
-
-static pj_status_t create_tcp_transport(struct tcp_listener *listener,
- pj_sock_t sock,
- const pj_sockaddr_in *local,
- const pj_sockaddr_in *remote,
- struct tcp_transport **p_tcp);
+/* Common function to create and initialize transport */
+static pj_status_t tcp_create(struct tcp_listener *listener,
+ pj_sock_t sock, pj_bool_t is_server,
+ const pj_sockaddr_in *local,
+ const pj_sockaddr_in *remote,
+ struct tcp_transport **p_tcp);
static void tcp_perror(const char *sender, const char *title,
@@ -138,18 +160,41 @@ static void tcp_perror(const char *sender, const char *title,
}
+static void sockaddr_to_host_port( pj_pool_t *pool,
+ pjsip_host_port *host_port,
+ const pj_sockaddr_in *addr )
+{
+ host_port->host.ptr = pj_pool_alloc(pool, 48);
+ host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",
+ pj_inet_ntoa(addr->sin_addr));
+ host_port->port = pj_ntohs(addr->sin_port);
+}
+
+
+
+/****************************************************************************
+ * The TCP listener/transport factory.
+ */
+
+/*
+ * This is the public API to create, initialize, register, and start the
+ * TCP listener.
+ */
PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
const pj_sockaddr_in *local,
- unsigned async_cnt)
+ unsigned async_cnt,
+ pjsip_tpfactory **p_factory)
{
pj_pool_t *pool;
struct tcp_listener *listener;
pj_ioqueue_callback listener_cb;
+ pj_sockaddr_in *listener_addr;
+ int addr_len;
unsigned i;
pj_status_t status;
/* Sanity check */
- PJ_ASSERT_RETURN(endpt && local && async_cnt, PJ_EINVAL);
+ PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_LIS_INIT,
@@ -158,14 +203,15 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener));
- pj_ansi_sprintf(listener->name, "tcp:%d", (int)pj_ntohs(local->sin_port));
listener->factory.pool = pool;
listener->factory.type = PJSIP_TRANSPORT_TCP;
- pj_ansi_strcpy(listener->factory.type_name, "tcp");
+ listener->factory.type_name = "tcp";
listener->factory.flag =
pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);
listener->sock = PJ_INVALID_SOCKET;
+ pj_ansi_strcpy(listener->obj_name, "tcp");
+
status = pj_lock_create_recursive_mutex(pool, "tcplis",
&listener->factory.lock);
if (status != PJ_SUCCESS)
@@ -177,11 +223,52 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
if (status != PJ_SUCCESS)
goto on_error;
- pj_memcpy(&listener->factory.local_addr, local, sizeof(pj_sockaddr_in));
- status = pj_sock_bind(listener->sock, local, sizeof(*local));
+ listener_addr = (pj_sockaddr_in*)&listener->factory.local_addr;
+ if (local) {
+ pj_memcpy(listener_addr, local, sizeof(pj_sockaddr_in));
+ } else {
+ pj_sockaddr_in_init(listener_addr, NULL, 0);
+ }
+
+ status = pj_sock_bind(listener->sock, listener_addr,
+ sizeof(pj_sockaddr_in));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Retrieve the bound address */
+ addr_len = sizeof(pj_sockaddr_in);
+ status = pj_sock_getsockname(listener->sock, listener_addr, &addr_len);
if (status != PJ_SUCCESS)
goto on_error;
+ /* If the address returns 0.0.0.0, use the first interface address
+ * as the transport's address.
+ */
+ if (listener_addr->sin_addr.s_addr == 0) {
+ const pj_str_t *hostname;
+ struct pj_hostent he;
+
+ hostname = pj_gethostname();
+ status = pj_gethostbyname(hostname, &he);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ listener_addr->sin_addr = *(pj_in_addr*)he.h_addr;
+ }
+
+ pj_ansi_sprintf(listener->obj_name, "tcp:%d",
+ (int)pj_ntohs(listener_addr->sin_port));
+
+ /* Save the address name */
+ sockaddr_to_host_port(listener->factory.pool,
+ &listener->factory.addr_name, listener_addr);
+
+ /* Start listening to the address */
+ status = pj_sock_listen(listener->sock, PJSIP_TCP_TRANSPORT_BACKLOG);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
/* Register socket to ioqeuue */
pj_memset(&listener_cb, 0, sizeof(listener_cb));
listener_cb.on_accept_complete = &on_accept_complete;
@@ -191,7 +278,21 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
if (status != PJ_SUCCESS)
goto on_error;
- /* Start pending accept() operation */
+ /* Register to transport manager */
+ listener->endpt = endpt;
+ listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
+ listener->factory.create_transport = lis_create_transport;
+ listener->factory.destroy = lis_destroy;
+ listener->is_registered = PJ_TRUE;
+ status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
+ &listener->factory);
+ if (status != PJ_SUCCESS) {
+ listener->is_registered = PJ_FALSE;
+ goto on_error;
+ }
+
+
+ /* Start pending accept() operations */
if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT;
listener->async_cnt = async_cnt;
@@ -200,45 +301,34 @@ PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
sizeof(listener->accept_op[i].op_key));
listener->accept_op[i].listener = listener;
- status = pj_ioqueue_accept(listener->key,
- &listener->accept_op[i].op_key,
- &listener->accept_op[i].new_sock,
- &listener->accept_op[i].local_addr,
- &listener->accept_op[i].remote_addr,
- &listener->accept_op[i].addr_len);
- if (status != PJ_SUCCESS && status != PJ_EPENDING)
- goto on_error;
+ on_accept_complete(listener->key, &listener->accept_op[i].op_key,
+ listener->sock, PJ_EPENDING);
}
- /* Register to transport manager */
- listener->endpt = endpt;
- listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
- listener->factory.create_transport = lis_create_transport;
- status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
- &listener->factory);
- if (status != PJ_SUCCESS)
- goto on_error;
-
- /* Done! */
- listener->active = PJ_TRUE;
+ PJ_LOG(4,(listener->obj_name,
+ "SIP TCP listener ready for incoming connections at %s:%d",
+ pj_inet_ntoa(listener_addr->sin_addr),
+ (int)pj_ntohs(listener_addr->sin_port)));
- PJ_LOG(4,(listener->name,
- "SIP TCP transport listening for incoming connections at %s:%d",
- pj_inet_ntoa(local->sin_addr), (int)pj_ntohs(local->sin_port)));
+ /* Return the pointer to user */
+ if (p_factory) *p_factory = &listener->factory;
return PJ_SUCCESS;
on_error:
- lis_destroy(listener);
+ lis_destroy(&listener->factory);
return status;
}
-static pj_status_t lis_destroy(struct tcp_listener *listener)
+/* This callback is called by transport manager to destroy listener */
+static pj_status_t lis_destroy(pjsip_tpfactory *factory)
{
- if (listener->active) {
+ struct tcp_listener *listener = (struct tcp_listener *)factory;
+
+ if (listener->is_registered) {
pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
- listener->active = PJ_FALSE;
+ listener->is_registered = PJ_FALSE;
}
if (listener->key) {
@@ -258,9 +348,12 @@ static pj_status_t lis_destroy(struct tcp_listener *listener)
}
if (listener->factory.pool) {
- PJ_LOG(4,(listener->name, "SIP TCP transport destroyed"));
- pj_pool_release(listener->factory.pool);
+ pj_pool_t *pool = listener->factory.pool;
+
+ PJ_LOG(4,(listener->obj_name, "SIP TCP listener destroyed"));
+
listener->factory.pool = NULL;
+ pj_pool_release(pool);
}
return PJ_SUCCESS;
@@ -288,8 +381,12 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport,
/* Called by transport manager to shutdown */
static pj_status_t tcp_shutdown(pjsip_transport *transport);
-/* Called by transport manager to destroy */
-static pj_status_t tcp_destroy(pjsip_transport *transport);
+/* Called by transport manager to destroy transport */
+static pj_status_t tcp_destroy_transport(pjsip_transport *transport);
+
+/* Utility to destroy transport */
+static pj_status_t tcp_destroy(pjsip_transport *transport,
+ pj_status_t reason);
/* Callback from ioqueue on incoming packet */
static void on_read_complete(pj_ioqueue_key_t *key,
@@ -306,25 +403,15 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
pj_status_t status);
-static void sockaddr_to_host_port( pj_pool_t *pool,
- pjsip_host_port *host_port,
- const pj_sockaddr_in *addr )
-{
- host_port->host.ptr = pj_pool_alloc(pool, 48);
- host_port->host.slen = pj_ansi_sprintf( host_port->host.ptr, "%s",
- pj_inet_ntoa(addr->sin_addr));
- host_port->port = pj_ntohs(addr->sin_port);
-}
-
-
/*
- * Utilities to create TCP transport.
+ * Common function to create TCP transport, called when pending accept() and
+ * pending connect() complete.
*/
-static pj_status_t create_tcp_transport(struct tcp_listener *listener,
- pj_sock_t sock,
- const pj_sockaddr_in *local,
- const pj_sockaddr_in *remote,
- struct tcp_transport **p_tcp)
+static pj_status_t tcp_create( struct tcp_listener *listener,
+ pj_sock_t sock, pj_bool_t is_server,
+ const pj_sockaddr_in *local,
+ const pj_sockaddr_in *remote,
+ struct tcp_transport **p_tcp)
{
struct tcp_transport *tcp;
pj_pool_t *pool;
@@ -332,28 +419,37 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener,
pj_ioqueue_callback tcp_callback;
pj_status_t status;
- pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
+
+ PJ_ASSERT_RETURN(sock != PJ_INVALID_SOCKET, PJ_EINVAL);
+
+
+ pool = pjsip_endpt_create_pool(listener->endpt, "tcp",
POOL_TP_INIT, POOL_TP_INC);
+ PJ_ASSERT_RETURN(pool != NULL, PJ_ENOMEM);
+
/*
* Create and initialize basic transport structure.
*/
tcp = pj_pool_zalloc(pool, sizeof(*tcp));
tcp->sock = sock;
+ tcp->is_server = is_server;
tcp->listener = listener;
- pj_list_init(&tcp->tx_list);
-
-
- pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME, "tcp%p", tcp);
+ pj_list_init(&tcp->delayed_list);
tcp->base.pool = pool;
+ pj_ansi_snprintf(tcp->base.obj_name, PJ_MAX_OBJ_NAME,
+ (is_server ? "tcps%p" :"tcpc%p"), tcp);
+
status = pj_atomic_create(pool, 0, &tcp->base.ref_cnt);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
goto on_error;
+ }
status = pj_lock_create_recursive_mutex(pool, "tcp", &tcp->base.lock);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
goto on_error;
+ }
tcp->base.key.type = PJSIP_TRANSPORT_TCP;
pj_memcpy(&tcp->base.key.rem_addr, remote, sizeof(pj_sockaddr_in));
@@ -374,7 +470,7 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener,
tcp->base.tpmgr = listener->tpmgr;
tcp->base.send_msg = &tcp_send_msg;
tcp->base.do_shutdown = &tcp_shutdown;
- tcp->base.destroy = &tcp_destroy;
+ tcp->base.destroy = &tcp_destroy_transport;
/* Register socket to ioqueue */
@@ -386,37 +482,44 @@ static pj_status_t create_tcp_transport(struct tcp_listener *listener,
ioqueue = pjsip_endpt_get_ioqueue(listener->endpt);
status = pj_ioqueue_register_sock(pool, ioqueue, sock,
tcp, &tcp_callback, &tcp->key);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
goto on_error;
+ }
/* Register transport to transport manager */
status = pjsip_transport_register(listener->tpmgr, &tcp->base);
- if (status != PJ_SUCCESS)
+ if (status != PJ_SUCCESS) {
goto on_error;
+ }
tcp->is_registered = PJ_TRUE;
/* Done setting up basic transport. */
*p_tcp = tcp;
+ PJ_LOG(4,(tcp->base.obj_name, "TCP %s transport created",
+ (tcp->is_server ? "server" : "client")));
+
+ return PJ_SUCCESS;
+
on_error:
- tcp_destroy(&tcp->base);
+ tcp_destroy(&tcp->base, status);
return status;
}
-/* Flush all pending send operations */
-static tcp_flush_pending_tx(struct tcp_transport *tcp)
+/* Flush all delayed transmision once the socket is connected. */
+static void tcp_flush_pending_tx(struct tcp_transport *tcp)
{
pj_lock_acquire(tcp->base.lock);
- while (!pj_list_empty(&tcp->tx_list)) {
- struct pending_tdata *pending_tx;
+ while (!pj_list_empty(&tcp->delayed_list)) {
+ struct delayed_tdata *pending_tx;
pjsip_tx_data *tdata;
pj_ioqueue_op_key_t *op_key;
pj_ssize_t size;
pj_status_t status;
- pending_tx = tcp->tx_list.next;
+ pending_tx = tcp->delayed_list.next;
pj_list_erase(pending_tx);
tdata = pending_tx->tdata_op_key->tdata;
@@ -436,29 +539,53 @@ static tcp_flush_pending_tx(struct tcp_transport *tcp)
}
+/* Called by transport manager to destroy transport */
+static pj_status_t tcp_destroy_transport(pjsip_transport *transport)
+{
+ struct tcp_transport *tcp = (struct tcp_transport*)transport;
+
+ /* Transport would have been unregistered by now since this callback
+ * is called by transport manager.
+ */
+ tcp->is_registered = PJ_FALSE;
+
+ return tcp_destroy(transport, tcp->close_reason);
+}
+
/* Destroy TCP transport */
-static pj_status_t tcp_destroy(pjsip_transport *transport)
+static pj_status_t tcp_destroy(pjsip_transport *transport,
+ pj_status_t reason)
{
struct tcp_transport *tcp = (struct tcp_transport*)transport;
- /* Cancel all pending transmits */
- while (!pj_list_empty(&tcp->tx_list)) {
- struct pending_tdata *pending_tx;
+ if (tcp->close_reason == 0)
+ tcp->close_reason = reason;
+
+ if (tcp->is_registered) {
+ tcp->is_registered = PJ_FALSE;
+ pjsip_transport_destroy(transport);
+
+ /* pjsip_transport_destroy will recursively call this function
+ * again.
+ */
+ return PJ_SUCCESS;
+ }
+
+ /* Mark transport as closing */
+ tcp->is_closing = PJ_TRUE;
+
+ /* Cancel all delayed transmits */
+ while (!pj_list_empty(&tcp->delayed_list)) {
+ struct delayed_tdata *pending_tx;
pj_ioqueue_op_key_t *op_key;
- pending_tx = tcp->tx_list.next;
+ pending_tx = tcp->delayed_list.next;
pj_list_erase(pending_tx);
op_key = (pj_ioqueue_op_key_t*)pending_tx->tdata_op_key;
- on_write_complete(tcp->key, op_key,
- -PJ_RETURN_OS_ERROR(OSERR_ENOTCONN));
- }
-
- if (tcp->is_registered) {
- pjsip_transport_destroy(transport);
- tcp->is_registered = PJ_FALSE;
+ on_write_complete(tcp->key, op_key, -reason);
}
if (tcp->rdata.tp_info.pool) {
@@ -469,6 +596,12 @@ static pj_status_t tcp_destroy(pjsip_transport *transport)
if (tcp->key) {
pj_ioqueue_unregister(tcp->key);
tcp->key = NULL;
+ tcp->sock = PJ_INVALID_SOCKET;
+ }
+
+ if (tcp->sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(tcp->sock);
+ tcp->sock = PJ_INVALID_SOCKET;
}
if (tcp->base.lock) {
@@ -482,9 +615,26 @@ static pj_status_t tcp_destroy(pjsip_transport *transport)
}
if (tcp->base.pool) {
- PJ_LOG(4,(tcp->base.obj_name, "TCP transport destroyed"));
- pj_pool_release(tcp->base.pool);
+ pj_pool_t *pool;
+
+ if (reason != PJ_SUCCESS) {
+ char errmsg[PJ_ERR_MSG_SIZE];
+
+ pj_strerror(reason, errmsg, sizeof(errmsg));
+ PJ_LOG(4,(tcp->base.obj_name,
+ "TCP transport destroyed with reason %d: %s",
+ reason, errmsg));
+
+ } else {
+
+ PJ_LOG(4,(tcp->base.obj_name,
+ "TCP transport destroyed normally"));
+
+ }
+
+ pool = tcp->base.pool;
tcp->base.pool = NULL;
+ pj_pool_release(pool);
}
return PJ_SUCCESS;
@@ -493,7 +643,8 @@ static pj_status_t tcp_destroy(pjsip_transport *transport)
/*
* This utility function creates receive data buffers and start
- * asynchronous recv() operations from the socket.
+ * asynchronous recv() operations from the socket. It is called after
+ * accept() or connect() operation complete.
*/
static pj_status_t tcp_start_read(struct tcp_transport *tcp)
{
@@ -531,7 +682,7 @@ static pj_status_t tcp_start_read(struct tcp_transport *tcp)
status = pj_ioqueue_recv(tcp->key, &tcp->rdata.tp_info.op_key.op_key,
tcp->rdata.pkt_info.packet, &size,
PJ_IOQUEUE_ALWAYS_ASYNC);
- if (status != PJ_SUCCESS) {
+ if (status != PJ_SUCCESS && status != PJ_EPENDING) {
tcp_perror(tcp->base.obj_name, "ioqueue recv() error", status);
return status;
}
@@ -593,19 +744,19 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
((pj_sockaddr_in*)&listener->factory.local_addr)->sin_addr.s_addr;
/* Create the transport descriptor */
- status = create_tcp_transport(listener, sock, &local_addr,
- (pj_sockaddr_in*)rem_addr, &tcp);
+ status = tcp_create(listener, sock, PJ_FALSE, &local_addr,
+ (pj_sockaddr_in*)rem_addr, &tcp);
if (status != PJ_SUCCESS)
return status;
-
+
+
/* Start asynchronous connect() operation */
tcp->has_pending_connect = PJ_TRUE;
- pj_ioqueue_op_key_init(&tcp->connect_op.op_key,
- sizeof(tcp->connect_op.op_key));
- tcp->connect_op.transport = tcp;
status = pj_ioqueue_connect(tcp->key, rem_addr, sizeof(pj_sockaddr_in));
- if (status != PJ_SUCCESS) {
- tcp_destroy(&tcp->base);
+ if (status == PJ_SUCCESS) {
+ tcp->has_pending_connect = PJ_FALSE;
+ } else if (status != PJ_EPENDING) {
+ tcp_destroy(&tcp->base, status);
return status;
}
@@ -629,6 +780,17 @@ static pj_status_t lis_create_transport(pjsip_tpfactory *factory,
}
}
+ if (tcp->has_pending_connect) {
+ PJ_LOG(4,(tcp->base.obj_name,
+ "TCP transport %.*s:%d is connecting to %.*s:%d...",
+ (int)tcp->base.local_name.host.slen,
+ tcp->base.local_name.host.ptr,
+ tcp->base.local_name.port,
+ (int)tcp->base.remote_name.host.slen,
+ tcp->base.remote_name.host.ptr,
+ tcp->base.remote_name.port));
+ }
+
/* Done */
*p_transport = &tcp->base;
@@ -653,31 +815,75 @@ static void on_accept_complete( pj_ioqueue_key_t *key,
listener = pj_ioqueue_get_user_data(key);
accept_op = (struct pending_accept*) op_key;
+ /*
+ * Loop while there is immediate connection or when there is error.
+ */
do {
- if (status != PJ_SUCCESS) {
- tcp_perror(listener->name, "Error in accept()", status);
-
+ if (status == PJ_EPENDING) {
+ /*
+ * This can only happen when this function is called during
+ * initialization to kick off asynchronous accept().
+ */
+
+ } else if (status != PJ_SUCCESS) {
+
+ /*
+ * Error in accept().
+ */
+ tcp_perror(listener->obj_name, "Error in accept()", status);
+
+ /*
+ * Prevent endless accept() error loop by limiting the
+ * number of consecutive errors. Once the number of errors
+ * is equal to maximum, we treat this as permanent error, and
+ * we stop the accept() operation.
+ */
++err_cnt;
- if (err_cnt >= 5) {
- PJ_LOG(1, (listener->name,
+ if (err_cnt >= 10) {
+ PJ_LOG(1, (listener->obj_name,
"Too many errors, listener stopping"));
}
- goto start_next_accept;
- }
+ } else {
- status = create_tcp_transport( listener, sock,
- &accept_op->local_addr,
- &accept_op->remote_addr, &tcp);
- if (status == PJ_SUCCESS) {
- status = tcp_start_read(tcp);
- if (status != PJ_SUCCESS) {
- PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
- tcp_destroy(&tcp->base);
+ if (sock == PJ_INVALID_SOCKET) {
+ sock = accept_op->new_sock;
+ PJ_LOG(4,(listener->obj_name,
+ "Warning: ioqueue reports -1 in on_accept_complete()"
+ " sock argument"));
+ }
+
+ PJ_LOG(4,(listener->obj_name,
+ "TCP listener %.*s:%d: got incoming TCP connection "
+ "from %s:%d, sock=%d",
+ (int)listener->factory.addr_name.host.slen,
+ listener->factory.addr_name.host.ptr,
+ listener->factory.addr_name.port,
+ pj_inet_ntoa(accept_op->remote_addr.sin_addr),
+ pj_ntohs(accept_op->remote_addr.sin_port),
+ sock));
+
+ /*
+ * Incoming connections!
+ * Create TCP transport for the new socket.
+ */
+ status = tcp_create( listener, sock, PJ_TRUE,
+ &accept_op->local_addr,
+ &accept_op->remote_addr, &tcp);
+ if (status == PJ_SUCCESS) {
+ status = tcp_start_read(tcp);
+ if (status != PJ_SUCCESS) {
+ PJ_LOG(3,(tcp->base.obj_name, "New transport cancelled"));
+ tcp_destroy(&tcp->base, status);
+ }
}
}
-start_next_accept:
+ /*
+ * Start the next asynchronous accept() operation.
+ */
+ accept_op->addr_len = sizeof(pj_sockaddr_in);
+ accept_op->new_sock = PJ_INVALID_SOCKET;
status = pj_ioqueue_accept(listener->key,
&accept_op->op_key,
@@ -686,27 +892,51 @@ start_next_accept:
&accept_op->remote_addr,
&accept_op->addr_len);
+ /*
+ * Loop while we have immediate connection or when there is error.
+ */
+
} while (status != PJ_EPENDING);
}
-/* Callback from ioqueue when packet is sent */
+/*
+ * Callback from ioqueue when packet is sent.
+ */
static void on_write_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_sent)
{
- struct tcp_transport *tp = pj_ioqueue_get_user_data(key);
+ struct tcp_transport *tcp = pj_ioqueue_get_user_data(key);
pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
tdata_op_key->tdata = NULL;
+ /* Check for error/closure */
+ if (bytes_sent <= 0) {
+ pj_status_t status;
+
+ PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
+ bytes_sent));
+
+ status = (bytes_sent == 0) ? PJ_RETURN_OS_ERROR(OSERR_ENOTCONN) :
+ -bytes_sent;
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
+ }
+
if (tdata_op_key->callback) {
- tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
+ /*
+ * Notify sip_transport.c that packet has been sent.
+ */
+ tdata_op_key->callback(&tcp->base, tdata_op_key->token, bytes_sent);
}
}
-/* This callback is called by transport manager to send SIP message */
+/*
+ * This callback is called by transport manager to send SIP message
+ */
static pj_status_t tcp_send_msg(pjsip_transport *transport,
pjsip_tx_data *tdata,
const pj_sockaddr_t *rem_addr,
@@ -718,7 +948,8 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport,
{
struct tcp_transport *tcp = (struct tcp_transport*)transport;
pj_ssize_t size;
- pj_status_t status;
+ pj_bool_t delayed = PJ_FALSE;
+ pj_status_t status = PJ_SUCCESS;
/* Sanity check */
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
@@ -737,38 +968,74 @@ static pj_status_t tcp_send_msg(pjsip_transport *transport,
tdata->op_key.callback = callback;
/* If asynchronous connect() has not completed yet, just put the
- * transmit data in the pending transmission list.
+ * transmit data in the pending transmission list since we can not
+ * use the socket yet.
*/
- pj_lock_acquire(tcp->base.lock);
-
if (tcp->has_pending_connect) {
- struct pending_tdata *pending_tdata;
- /* Pust to list */
- pending_tdata = pj_pool_alloc(tdata->pool, sizeof(*pending_tdata));
- pending_tdata->tdata_op_key = &tdata->op_key;
+ /*
+ * Looks like connect() is still in progress. Check again (this time
+ * with holding the lock) to be sure.
+ */
+ pj_lock_acquire(tcp->base.lock);
- pj_list_push_back(&tcp->tx_list, pending_tdata);
- status = PJ_EPENDING;
+ if (tcp->has_pending_connect) {
+ struct delayed_tdata *delayed_tdata;
- } else {
- /* send to ioqueue! */
+ /*
+ * connect() is still in progress. Put the transmit data to
+ * the delayed list.
+ */
+ delayed_tdata = pj_pool_alloc(tdata->pool,
+ sizeof(*delayed_tdata));
+ delayed_tdata->tdata_op_key = &tdata->op_key;
+
+ pj_list_push_back(&tcp->delayed_list, delayed_tdata);
+ status = PJ_EPENDING;
+
+ /* Prevent pj_ioqueue_send() to be called below */
+ delayed = PJ_TRUE;
+ }
+
+ pj_lock_release(tcp->base.lock);
+ }
+
+ if (!delayed) {
+ /*
+ * Transport is ready to go. Send the packet to ioqueue to be
+ * sent asynchronously.
+ */
size = tdata->buf.cur - tdata->buf.start;
status = pj_ioqueue_send(tcp->key,
(pj_ioqueue_op_key_t*)&tdata->op_key,
tdata->buf.start, &size, 0);
- if (status != PJ_EPENDING)
+ if (status != PJ_EPENDING) {
+ /* Not pending (could be immediate success or error) */
tdata->op_key.tdata = NULL;
- }
- pj_lock_release(tcp->base.lock);
+ /* Shutdown transport on closure/errors */
+ if (size <= 0) {
+
+ PJ_LOG(5,(tcp->base.obj_name, "TCP send() error, sent=%d",
+ size));
+
+ if (status == PJ_SUCCESS)
+ status = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
+ }
+ }
+ }
return status;
}
-/* This callback is called by transport manager to shutdown transport */
+/*
+ * This callback is called by transport manager to shutdown transport.
+ * This normally is only used by UDP transport.
+ */
static pj_status_t tcp_shutdown(pjsip_transport *transport)
{
@@ -779,7 +1046,9 @@ static pj_status_t tcp_shutdown(pjsip_transport *transport)
}
-/* Callback from ioqueue on incoming packet */
+/*
+ * Callback from ioqueue that an incoming data is received from the socket.
+ */
static void on_read_complete(pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_read)
@@ -787,13 +1056,14 @@ static void on_read_complete(pj_ioqueue_key_t *key,
enum { MAX_IMMEDIATE_PACKET = 10 };
pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
pjsip_rx_data *rdata = rdata_op_key->rdata;
- struct tcp_transport *tp = (struct tcp_transport*)rdata->tp_info.transport;
+ struct tcp_transport *tcp =
+ (struct tcp_transport*)rdata->tp_info.transport;
int i;
pj_status_t status;
/* Don't do anything if transport is closing. */
- if (tp->is_closing) {
- tp->is_closing++;
+ if (tcp->is_closing) {
+ tcp->is_closing++;
return;
}
@@ -806,7 +1076,9 @@ static void on_read_complete(pj_ioqueue_key_t *key,
for (i=0;; ++i) {
pj_uint32_t flags;
- /* Report the packet to transport manager. */
+ /* Houston, we have packet! Report the packet to transport manager
+ * to be parsed.
+ */
if (bytes_read > 0) {
pj_size_t size_eaten;
@@ -815,6 +1087,10 @@ static void on_read_complete(pj_ioqueue_key_t *key,
rdata->pkt_info.zero = 0;
pj_gettimeofday(&rdata->pkt_info.timestamp);
+ /* Report to transport manager.
+ * The transport manager will tell us how many bytes of the packet
+ * have been processed (as valid SIP message).
+ */
size_eaten =
pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
rdata);
@@ -833,24 +1109,45 @@ static void on_read_complete(pj_ioqueue_key_t *key,
} else if (bytes_read == 0) {
/* Transport is closed */
- PJ_LOG(4,(tp->base.obj_name, "tcp connection closed"));
- tcp_destroy(&tp->base);
+ PJ_LOG(4,(tcp->base.obj_name, "TCP connection closed"));
+
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS)
+ tcp->close_reason = PJ_RETURN_OS_ERROR(OSERR_ENOTCONN);
+ pjsip_transport_shutdown(&tcp->base);
+
return;
- } else if (bytes_read < 0) {
+ //} else if (bytes_read < 0) {
+ } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
+ -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
+ -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
+ {
/* Report error to endpoint. */
PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
rdata->tp_info.transport->obj_name,
- -bytes_read, "tcp recv() error"));
+ -bytes_read, "TCP recv() error"));
+
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = -bytes_read;
+ pjsip_transport_shutdown(&tcp->base);
- /* Transport error, close transport */
- tcp_destroy(&tp->base);
return;
}
if (i >= MAX_IMMEDIATE_PACKET) {
- /* Force ioqueue_recv() to return PJ_EPENDING */
+ /* Receive quota reached. Force ioqueue_recv() to
+ * return PJ_EPENDING
+ */
flags = PJ_IOQUEUE_ALWAYS_ASYNC;
} else {
flags = 0;
@@ -867,6 +1164,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
&bytes_read, flags);
if (status == PJ_SUCCESS) {
+
/* Continue loop. */
pj_assert(i < MAX_IMMEDIATE_PACKET);
@@ -879,30 +1177,56 @@ static void on_read_complete(pj_ioqueue_key_t *key,
rdata->tp_info.transport->obj_name,
status, "tcp recv() error"));
- /* Transport error, close transport */
- tcp_destroy(&tp->base);
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
+
return;
}
}
}
-/* Callback from ioqueue when connect completes */
+/*
+ * Callback from ioqueue when asynchronous connect() operation completes.
+ */
static void on_connect_complete(pj_ioqueue_key_t *key,
pj_status_t status)
{
- struct pending_connect *connect_op = (struct pending_connect *)key;
- struct tcp_transport *tcp = connect_op->transport;
+ struct tcp_transport *tcp;
pj_sockaddr_in addr;
int addrlen;
+ tcp = pj_ioqueue_get_user_data(key);
+
+ PJ_LOG(4,(tcp->base.obj_name,
+ "TCP transport %.*s:%d is connected to %.*s:%d",
+ (int)tcp->base.local_name.host.slen,
+ tcp->base.local_name.host.ptr,
+ tcp->base.local_name.port,
+ (int)tcp->base.remote_name.host.slen,
+ tcp->base.remote_name.host.ptr,
+ tcp->base.remote_name.port));
+
/* Mark that pending connect() operation has completed. */
tcp->has_pending_connect = PJ_FALSE;
/* Check connect() status */
if (status != PJ_SUCCESS) {
+
tcp_perror(tcp->base.obj_name, "TCP connect() error", status);
- tcp_destroy(&tcp->base);
+
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
return;
}
@@ -925,7 +1249,13 @@ static void on_connect_complete(pj_ioqueue_key_t *key,
/* Start pending read */
status = tcp_start_read(tcp);
if (status != PJ_SUCCESS) {
- tcp_destroy(&tcp->base);
+ /* We can not destroy the transport since high level objects may
+ * still keep reference to this transport. So we can only
+ * instruct transport manager to gracefully start the shutdown
+ * procedure for this transport.
+ */
+ if (tcp->close_reason==PJ_SUCCESS) tcp->close_reason = status;
+ pjsip_transport_shutdown(&tcp->base);
return;
}