summaryrefslogtreecommitdiff
path: root/pjsip/src
diff options
context:
space:
mode:
Diffstat (limited to 'pjsip/src')
-rw-r--r--pjsip/src/pjsip/sip_transport.c59
-rw-r--r--pjsip/src/pjsip/sip_transport_tcp.c220
-rw-r--r--pjsip/src/pjsip/sip_transport_udp.c16
-rw-r--r--pjsip/src/test-pjsip/transport_udp_test.c2
4 files changed, 288 insertions, 9 deletions
diff --git a/pjsip/src/pjsip/sip_transport.c b/pjsip/src/pjsip/sip_transport.c
index 10d11007..9583e049 100644
--- a/pjsip/src/pjsip/sip_transport.c
+++ b/pjsip/src/pjsip/sip_transport.c
@@ -516,7 +516,7 @@ static void transport_idle_callback(pj_timer_heap_t *timer_heap,
PJ_UNUSED_ARG(timer_heap);
entry->id = PJ_FALSE;
- pjsip_transport_unregister(tp->tpmgr, tp);
+ pjsip_transport_destroy(tp);
}
/*
@@ -554,7 +554,18 @@ PJ_DEF(pj_status_t) pjsip_transport_dec_ref( pjsip_transport *tp )
pj_lock_acquire(tp->tpmgr->lock);
/* Verify again. */
if (pj_atomic_get(tp->ref_cnt) == 0) {
- pj_time_val delay = { PJSIP_TRANSPORT_IDLE_TIME, 0 };
+ pj_time_val delay;
+
+ /* If transport is in graceful shutdown, then this is the
+ * last user who uses the transport. Schedule to destroy the
+ * transport immediately. Otherwise schedule idle timer.
+ */
+ if (tp->is_shutdown) {
+ delay.sec = delay.msec = 0;
+ } else {
+ delay.sec = PJSIP_TRANSPORT_IDLE_TIME;
+ delay.msec = 0;
+ }
pj_assert(tp->idle_timer.id == 0);
tp->idle_timer.id = PJ_TRUE;
@@ -623,17 +634,53 @@ static pj_status_t destroy_transport( pjsip_tpmgr *mgr,
return tp->destroy(tp);
}
+
+/*
+ * Start graceful shutdown procedure for this transport.
+ */
+PJ_DEF(pj_status_t) pjsip_transport_shutdown(pjsip_transport *tp)
+{
+ pjsip_tpmgr *mgr;
+ pj_status_t status;
+
+ pj_lock_acquire(tp->lock);
+
+ mgr = tp->tpmgr;
+ pj_lock_acquire(mgr->lock);
+
+ /* Do nothing if transport is being shutdown already */
+ if (tp->is_shutdown) {
+ pj_lock_release(tp->lock);
+ pj_lock_release(mgr->lock);
+ return PJ_SUCCESS;
+ }
+
+ status = PJ_SUCCESS;
+
+ /* Instruct transport to shutdown itself */
+ if (tp->do_shutdown)
+ status = tp->do_shutdown(tp);
+
+ if (status == PJ_SUCCESS)
+ tp->is_shutdown = PJ_TRUE;
+
+ pj_lock_release(tp->lock);
+ pj_lock_release(mgr->lock);
+
+ return status;
+}
+
+
/**
* Unregister transport.
*/
-PJ_DEF(pj_status_t) pjsip_transport_unregister( pjsip_tpmgr *mgr,
- pjsip_transport *tp)
+PJ_DEF(pj_status_t) pjsip_transport_destroy( pjsip_transport *tp)
{
/* Must have no user. */
PJ_ASSERT_RETURN(pj_atomic_get(tp->ref_cnt) == 0, PJSIP_EBUSY);
/* Destroy. */
- return destroy_transport(mgr, tp);
+ return destroy_transport(tp->tpmgr, tp);
}
@@ -1029,7 +1076,7 @@ PJ_DEF(pj_status_t) pjsip_tpmgr_acquire_transport(pjsip_tpmgr *mgr,
}
}
- if (transport != NULL) {
+ if (transport!=NULL && !transport->is_shutdown) {
/*
* Transport found!
*/
diff --git a/pjsip/src/pjsip/sip_transport_tcp.c b/pjsip/src/pjsip/sip_transport_tcp.c
new file mode 100644
index 00000000..8072ddd7
--- /dev/null
+++ b/pjsip/src/pjsip/sip_transport_tcp.c
@@ -0,0 +1,220 @@
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjsip/sip_transport_tcp.h>
+#include <pjsip/sip_endpoint.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/ioqueue.h>
+#include <pj/lock.h>
+#include <pj/pool.h>
+#include <pj/string.h>
+
+
+#define MAX_ASYNC_CNT 16
+#define POOL_INIT 4000
+#define POOL_INC 4000
+
+
+struct tcp_listener;
+
+struct pending_accept
+{
+ pj_ioqueue_op_key_t op_key;
+ struct tcp_listener *listener;
+ pj_sock_t new_sock;
+ int addr_len;
+ pj_sockaddr_in local_addr;
+ pj_sockaddr_in remote_addr;
+};
+
+struct tcp_listener
+{
+ pjsip_tpfactory factory;
+ pj_bool_t active;
+ pjsip_tpmgr *tpmgr;
+ pj_sock_t sock;
+ pj_ioqueue_key_t *key;
+ unsigned async_cnt;
+ struct pending_accept accept_op[MAX_ASYNC_CNT];
+};
+
+
+struct tcp_transport
+{
+ pjsip_transport base;
+ pj_sock_t sock;
+};
+
+
+/*
+ * This callback is called when #pj_ioqueue_accept completes.
+ */
+static void on_accept_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status);
+
+static pj_status_t destroy_listener(struct tcp_listener *listener);
+
+static pj_status_t create_transport(pjsip_tpfactory *factory,
+ pjsip_tpmgr *mgr,
+ pjsip_endpoint *endpt,
+ const pj_sockaddr *rem_addr,
+ int addr_len,
+ pjsip_transport **transport);
+
+PJ_DEF(pj_status_t) pjsip_tcp_transport_start( pjsip_endpoint *endpt,
+ const pj_sockaddr_in *local,
+ unsigned async_cnt)
+{
+ pj_pool_t *pool;
+ struct tcp_listener *listener;
+ pj_ioqueue_callback listener_cb;
+ unsigned i;
+ pj_status_t status;
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(endpt && local && async_cnt, PJ_EINVAL);
+
+
+ pool = pjsip_endpt_create_pool(endpt, "tcplis", POOL_INIT, POOL_INC);
+ PJ_ASSERT_RETURN(pool, PJ_ENOMEM);
+
+
+ listener = pj_pool_zalloc(pool, sizeof(struct tcp_listener));
+ listener->factory.pool = pool;
+ listener->factory.type = PJSIP_TRANSPORT_TCP;
+ pj_ansi_strcpy(listener->factory.type_name, "tcp");
+ listener->factory.flag =
+ pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_TCP);
+ listener->sock = PJ_INVALID_SOCKET;
+
+ status = pj_lock_create_recursive_mutex(pool, "tcplis",
+ &listener->factory.lock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create and bind socket */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_STREAM, 0, &listener->sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ pj_memcpy(&listener->factory.local_addr, local, sizeof(pj_sockaddr_in));
+ status = pj_sock_bind(listener->sock, local, sizeof(*local));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Register socket to ioqeuue */
+ pj_memset(&listener_cb, 0, sizeof(listener_cb));
+ listener_cb.on_accept_complete = &on_accept_complete;
+ status = pj_ioqueue_register_sock(pool, pjsip_endpt_get_ioqueue(endpt),
+ listener->sock, listener,
+ &listener_cb, &listener->key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Start pending accept() operation */
+ if (async_cnt > MAX_ASYNC_CNT) async_cnt = MAX_ASYNC_CNT;
+ listener->async_cnt = async_cnt;
+
+ for (i=0; i<async_cnt; ++i) {
+ pj_ioqueue_op_key_init(&listener->accept_op[i].op_key,
+ sizeof(listener->accept_op[i].op_key));
+ listener->accept_op[i].listener = listener;
+
+ status = pj_ioqueue_accept(listener->key,
+ &listener->accept_op[i].op_key,
+ &listener->accept_op[i].new_sock,
+ &listener->accept_op[i].local_addr,
+ &listener->accept_op[i].remote_addr,
+ &listener->accept_op[i].addr_len);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+ }
+
+ /* Register to transport manager */
+ listener->tpmgr = pjsip_endpt_get_tpmgr(endpt);
+ status = pjsip_tpmgr_register_tpfactory(listener->tpmgr,
+ &listener->factory);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Done! */
+ listener->active = PJ_TRUE;
+
+ return PJ_SUCCESS;
+
+on_error:
+ destroy_listener(listener);
+ return status;
+}
+
+
+
+
+static pj_status_t destroy_listener(struct tcp_listener *listener)
+{
+ if (listener->active) {
+ pjsip_tpmgr_unregister_tpfactory(listener->tpmgr, &listener->factory);
+ listener->active = PJ_FALSE;
+ }
+
+ if (listener->key) {
+ pj_ioqueue_unregister(listener->key);
+ listener->key = NULL;
+ listener->sock = PJ_INVALID_SOCKET;
+ }
+
+ if (listener->sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(listener->sock);
+ listener->sock = PJ_INVALID_SOCKET;
+ }
+
+ if (listener->factory.lock) {
+ pj_lock_destroy(listener->factory.lock);
+ listener->factory.lock = NULL;
+ }
+
+ if (listener->factory.pool) {
+ pj_pool_release(listener->factory.pool);
+ listener->factory.pool = NULL;
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+static void on_accept_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_sock_t sock,
+ pj_status_t status)
+{
+}
+
+
+static pj_status_t create_transport(pjsip_tpfactory *factory,
+ pjsip_tpmgr *mgr,
+ pjsip_endpoint *endpt,
+ const pj_sockaddr *rem_addr,
+ int addr_len,
+ pjsip_transport **transport)
+{
+}
+
diff --git a/pjsip/src/pjsip/sip_transport_udp.c b/pjsip/src/pjsip/sip_transport_udp.c
index 6312756c..1c72dca7 100644
--- a/pjsip/src/pjsip/sip_transport_udp.c
+++ b/pjsip/src/pjsip/sip_transport_udp.c
@@ -377,6 +377,17 @@ static pj_status_t udp_destroy( pjsip_transport *transport )
/*
+ * udp_shutdown()
+ *
+ * Start graceful UDP shutdown.
+ */
+static pj_status_t udp_shutdown(pjsip_transport *transport)
+{
+ return pjsip_transport_dec_ref(transport);
+}
+
+
+/*
* pjsip_udp_transport_attach()
*
* Attach UDP socket and start transport.
@@ -505,6 +516,7 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
/* Set functions. */
tp->base.send_msg = &udp_send_msg;
+ tp->base.do_shutdown = &udp_shutdown;
tp->base.destroy = &udp_destroy;
/* This is a permanent transport, so we initialize the ref count
@@ -530,7 +542,7 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
PJSIP_POOL_RDATA_INC);
if (!rdata_pool) {
pj_atomic_set(tp->base.ref_cnt, 0);
- pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ pjsip_transport_destroy(&tp->base);
return PJ_ENOMEM;
}
@@ -556,7 +568,7 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
size);
} else if (status != PJ_EPENDING) {
/* Error! */
- pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ pjsip_transport_destroy(&tp->base);
return status;
}
}
diff --git a/pjsip/src/test-pjsip/transport_udp_test.c b/pjsip/src/test-pjsip/transport_udp_test.c
index cf5f0ca5..4db0a7ca 100644
--- a/pjsip/src/test-pjsip/transport_udp_test.c
+++ b/pjsip/src/test-pjsip/transport_udp_test.c
@@ -102,7 +102,7 @@ int transport_udp_test(void)
pjsip_transport_dec_ref(udp_tp);
/* Force destroy this transport. */
- status = pjsip_transport_unregister( pjsip_endpt_get_tpmgr(endpt), udp_tp);
+ status = pjsip_transport_destroy(udp_tp);
if (status != PJ_SUCCESS)
return -90;