summaryrefslogtreecommitdiff
path: root/pjsip/src
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2007-06-22 11:32:49 +0000
committerBenny Prijono <bennylp@teluu.com>2007-06-22 11:32:49 +0000
commiteaaaec7b572fbeee95cfe783cfa27aabecc08b7f (patch)
treed5d5993245ca3e9abb1ef41fabd4fb4c6f3c0ead /pjsip/src
parentb58523bcdf6f34f4b00c4a5e81719c63ae58e570 (diff)
Committed ticket #337: Ability to restart PJSIP UDP transport
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@1382 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjsip/src')
-rw-r--r--pjsip/src/pjsip/sip_errno.c1
-rw-r--r--pjsip/src/pjsip/sip_transport_udp.c452
2 files changed, 359 insertions, 94 deletions
diff --git a/pjsip/src/pjsip/sip_errno.c b/pjsip/src/pjsip/sip_errno.c
index aeb0cbd6..dd2724e9 100644
--- a/pjsip/src/pjsip/sip_errno.c
+++ b/pjsip/src/pjsip/sip_errno.c
@@ -68,6 +68,7 @@ static const struct
PJ_BUILD_ERR( PJSIP_ERXOVERFLOW, "Rx buffer overflow"),
PJ_BUILD_ERR( PJSIP_EBUFDESTROYED, "Buffer destroyed"),
PJ_BUILD_ERR( PJSIP_ETPNOTSUITABLE, "Unsuitable transport selected"),
+ PJ_BUILD_ERR( PJSIP_ETPNOTAVAIL, "Transport not available for use"),
/* Transaction errors */
PJ_BUILD_ERR( PJSIP_ETSXDESTROYED, "Transaction has been destroyed"),
diff --git a/pjsip/src/pjsip/sip_transport_udp.c b/pjsip/src/pjsip/sip_transport_udp.c
index 48da26f9..223c6659 100644
--- a/pjsip/src/pjsip/sip_transport_udp.c
+++ b/pjsip/src/pjsip/sip_transport_udp.c
@@ -67,6 +67,7 @@ struct udp_transport
int rdata_cnt;
pjsip_rx_data **rdata;
int is_closing;
+ pj_bool_t is_paused;
};
@@ -122,6 +123,10 @@ static void udp_on_read_complete( pj_ioqueue_key_t *key,
return;
}
+ /* Don't do anything if transport is being paused. */
+ if (tp->is_paused)
+ return;
+
/*
* The idea of the loop is to process immediate data received by
* pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
@@ -205,6 +210,13 @@ static void udp_on_read_complete( pj_ioqueue_key_t *key,
op_key = &rdata->tp_info.op_key.op_key;
}
+ /* Only read next packet if transport is not being paused. This
+ * check handles the case where transport is paused while endpoint
+ * is still processing a SIP message.
+ */
+ if (tp->is_paused)
+ return;
+
/* Read next packet. */
bytes_read = sizeof(rdata->pkt_info.packet);
rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
@@ -295,6 +307,10 @@ static pj_status_t udp_send_msg( pjsip_transport *transport,
PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
+ /* Return error if transport is paused */
+ if (tp->is_paused)
+ return PJSIP_ETPNOTAVAIL;
+
/* Init op key. */
tdata->op_key.tdata = tdata;
tdata->op_key.token = token;
@@ -395,29 +411,100 @@ static pj_status_t udp_shutdown(pjsip_transport *transport)
}
-/*
- * pjsip_udp_transport_attach()
- *
- * Attach UDP socket and start transport.
- */
-PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
- pj_sock_t sock,
- const pjsip_host_port *a_name,
- unsigned async_cnt,
- pjsip_transport **p_transport)
+/* Create socket */
+static pj_status_t create_socket(const pj_sockaddr_in *local_a,
+ pj_sock_t *p_sock)
{
- enum { M = 80 };
- pj_pool_t *pool;
- struct udp_transport *tp;
- pj_ioqueue_t *ioqueue;
- pj_ioqueue_callback ioqueue_cb;
- long sobuf_size;
- unsigned i;
+ pj_sock_t sock;
+ pj_sockaddr_in tmp_addr;
pj_status_t status;
- PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
- PJ_EINVAL);
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ if (local_a == NULL) {
+ pj_sockaddr_in_init(&tmp_addr, NULL, 0);
+ local_a = &tmp_addr;
+ }
+
+ status = pj_sock_bind(sock, local_a, sizeof(*local_a));
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(sock);
+ return status;
+ }
+
+ *p_sock = sock;
+ return PJ_SUCCESS;
+}
+
+
+/* Generate transport's published address */
+static pj_status_t get_published_name(pj_sock_t sock,
+ char hostbuf[],
+ pjsip_host_port *bound_name)
+{
+ pj_sockaddr_in tmp_addr;
+ int addr_len;
+ pj_status_t status;
+
+ addr_len = sizeof(tmp_addr);
+ status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ bound_name->host.ptr = hostbuf;
+ bound_name->port = pj_ntohs(tmp_addr.sin_port);
+
+ /* If bound address specifies "0.0.0.0", get the IP address
+ * of local hostname.
+ */
+ if (tmp_addr.sin_addr.s_addr == PJ_INADDR_ANY) {
+ pj_in_addr hostip;
+
+ status = pj_gethostip(&hostip);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ pj_strcpy2(&bound_name->host, pj_inet_ntoa(hostip));
+ } else {
+ /* Otherwise use bound address. */
+ pj_strcpy2(&bound_name->host, pj_inet_ntoa(tmp_addr.sin_addr));
+ }
+
+ return PJ_SUCCESS;
+}
+
+/* Set the published address of the transport */
+static void udp_set_pub_name(struct udp_transport *tp,
+ const pjsip_host_port *a_name)
+{
+ enum { INFO_LEN = 80 };
+
+ pj_assert(a_name->host.slen != 0);
+ pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host,
+ &a_name->host);
+ tp->base.local_name.port = a_name->port;
+
+ /* Update transport info. */
+ if (tp->base.info == NULL) {
+ tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN);
+ }
+ pj_ansi_snprintf(
+ tp->base.info, INFO_LEN, "udp %s:%d [published as %s:%d]",
+ pj_inet_ntoa(((pj_sockaddr_in*)&tp->base.local_addr)->sin_addr),
+ pj_ntohs(((pj_sockaddr_in*)&tp->base.local_addr)->sin_port),
+ tp->base.local_name.host.ptr,
+ tp->base.local_name.port);
+}
+/* Set the socket handle of the transport */
+static void udp_set_socket(struct udp_transport *tp,
+ pj_sock_t sock,
+ const pjsip_host_port *a_name)
+{
+ long sobuf_size;
+ pj_status_t status;
/* Adjust socket rcvbuf size */
sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE;
@@ -441,6 +528,83 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
status));
}
+ /* Set the socket. */
+ tp->sock = sock;
+
+ /* Init address name (published address) */
+ udp_set_pub_name(tp, a_name);
+}
+
+/* Register socket to ioqueue */
+static pj_status_t register_to_ioqueue(struct udp_transport *tp)
+{
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback ioqueue_cb;
+
+ /* Register to ioqueue. */
+ ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &udp_on_read_complete;
+ ioqueue_cb.on_write_complete = &udp_on_write_complete;
+
+ return pj_ioqueue_register_sock(tp->base.pool, ioqueue, tp->sock, tp,
+ &ioqueue_cb, &tp->key);
+}
+
+/* Start ioqueue asynchronous reading to all rdata */
+static pj_status_t start_async_read(struct udp_transport *tp)
+{
+ pj_ioqueue_t *ioqueue;
+ int i;
+ pj_status_t status;
+
+ ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
+
+ /* Start reading the ioqueue. */
+ for (i=0; i<tp->rdata_cnt; ++i) {
+ pj_ssize_t size;
+
+ size = sizeof(tp->rdata[i]->pkt_info.packet);
+ tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
+ status = pj_ioqueue_recvfrom(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key,
+ tp->rdata[i]->pkt_info.packet,
+ &size, PJ_IOQUEUE_ALWAYS_ASYNC,
+ &tp->rdata[i]->pkt_info.src_addr,
+ &tp->rdata[i]->pkt_info.src_addr_len);
+ if (status == PJ_SUCCESS) {
+ pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
+ udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
+ size);
+ } else if (status != PJ_EPENDING) {
+ /* Error! */
+ return status;
+ }
+ }
+
+ return PJ_SUCCESS;
+}
+
+
+/*
+ * pjsip_udp_transport_attach()
+ *
+ * Attach UDP socket and start transport.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
+ pj_sock_t sock,
+ const pjsip_host_port *a_name,
+ unsigned async_cnt,
+ pjsip_transport **p_transport)
+{
+ pj_pool_t *pool;
+ struct udp_transport *tp;
+ unsigned i;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
+ PJ_EINVAL);
+
/* Create pool. */
pool = pjsip_endpt_create_pool(endpt, "udp%p", PJSIP_POOL_LEN_TRANSPORT,
PJSIP_POOL_INC_TRANSPORT);
@@ -489,38 +653,20 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
if (status != PJ_SUCCESS)
goto on_error;
- /* Init address name (published address) */
- pj_strdup_with_null(pool, &tp->base.local_name.host, &a_name->host);
- tp->base.local_name.port = a_name->port;
-
/* Init remote name. */
tp->base.remote_name.host = pj_str("0.0.0.0");
tp->base.remote_name.port = 0;
- /* Transport info. */
- tp->base.info = (char*) pj_pool_alloc(pool, M);
- pj_ansi_snprintf(
- tp->base.info, M, "udp %s:%d [published as %s:%d]",
- pj_inet_ntoa(((pj_sockaddr_in*)&tp->base.local_addr)->sin_addr),
- pj_ntohs(((pj_sockaddr_in*)&tp->base.local_addr)->sin_port),
- tp->base.local_name.host.ptr,
- tp->base.local_name.port);
-
/* Set endpoint. */
tp->base.endpt = endpt;
/* Transport manager and timer will be initialized by tpmgr */
- /* Attach socket. */
- tp->sock = sock;
+ /* Attach socket and assign name. */
+ udp_set_socket(tp, sock, a_name);
- /* Register to ioqueue. */
- ioqueue = pjsip_endpt_get_ioqueue(endpt);
- pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
- ioqueue_cb.on_read_complete = &udp_on_read_complete;
- ioqueue_cb.on_write_complete = &udp_on_write_complete;
- status = pj_ioqueue_register_sock(pool, ioqueue, tp->sock, tp,
- &ioqueue_cb, &tp->key);
+ /* Register to ioqueue */
+ status = register_to_ioqueue(tp);
if (status != PJ_SUCCESS)
goto on_error;
@@ -562,26 +708,10 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
}
/* Start reading the ioqueue. */
- for (i=0; i<async_cnt; ++i) {
- pj_ssize_t size;
-
- size = sizeof(tp->rdata[i]->pkt_info.packet);
- tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
- status = pj_ioqueue_recvfrom(tp->key,
- &tp->rdata[i]->tp_info.op_key.op_key,
- tp->rdata[i]->pkt_info.packet,
- &size, PJ_IOQUEUE_ALWAYS_ASYNC,
- &tp->rdata[i]->pkt_info.src_addr,
- &tp->rdata[i]->pkt_info.src_addr_len);
- if (status == PJ_SUCCESS) {
- pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
- udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
- size);
- } else if (status != PJ_EPENDING) {
- /* Error! */
- pjsip_transport_destroy(&tp->base);
- return status;
- }
+ status = start_async_read(tp);
+ if (status != PJ_SUCCESS) {
+ pjsip_transport_destroy(&tp->base);
+ return status;
}
/* Done. */
@@ -615,63 +745,197 @@ PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
pj_sock_t sock;
pj_status_t status;
char addr_buf[16];
- pj_sockaddr_in tmp_addr;
pjsip_host_port bound_name;
PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL);
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
+ status = create_socket(local_a, &sock);
if (status != PJ_SUCCESS)
return status;
- if (local_a == NULL) {
- pj_sockaddr_in_init(&tmp_addr, NULL, 0);
- local_a = &tmp_addr;
- }
-
- status = pj_sock_bind(sock, local_a, sizeof(*local_a));
- if (status != PJ_SUCCESS) {
- pj_sock_close(sock);
- return status;
- }
-
if (a_name == NULL) {
/* Address name is not specified.
* Build a name based on bound address.
*/
- int addr_len;
-
- addr_len = sizeof(tmp_addr);
- status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
+ status = get_published_name(sock, addr_buf, &bound_name);
if (status != PJ_SUCCESS) {
pj_sock_close(sock);
return status;
}
a_name = &bound_name;
- bound_name.host.ptr = addr_buf;
- bound_name.port = pj_ntohs(tmp_addr.sin_port);
+ }
- /* If bound address specifies "0.0.0.0", get the IP address
- * of local hostname.
- */
- if (tmp_addr.sin_addr.s_addr == PJ_INADDR_ANY) {
- pj_in_addr hostip;
+ return pjsip_udp_transport_attach( endpt, sock, a_name, async_cnt,
+ p_transport );
+}
+
+
+/*
+ * Retrieve the internal socket handle used by the UDP transport.
+ */
+PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport)
+{
+ struct udp_transport *tp;
+
+ PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET);
+
+ tp = (struct udp_transport*) transport;
+
+ return tp->sock;
+}
- status = pj_gethostip(&hostip);
- if (status != PJ_SUCCESS)
- return status;
- pj_strcpy2(&bound_name.host, pj_inet_ntoa(hostip));
+/*
+ * Temporarily pause or shutdown the transport.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport,
+ unsigned option)
+{
+ struct udp_transport *tp;
+ unsigned i;
+
+ PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
+
+ /* Flag must be specified */
+ PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
+
+ tp = (struct udp_transport*) transport;
+
+ /* Transport must not have been paused */
+ PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP);
+
+ /* Set transport to paused first, so that when the read callback is
+ * called by pj_ioqueue_post_completion() it will not try to
+ * re-register the rdata.
+ */
+ tp->is_paused = PJ_TRUE;
+
+ /* Cancel the ioqueue operation. */
+ for (i=0; i<(unsigned)tp->rdata_cnt; ++i) {
+ pj_ioqueue_post_completion(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key, -1);
+ }
+
+ /* Destroy the socket? */
+ if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
+ if (tp->key) {
+ /* This implicitly closes the socket */
+ pj_ioqueue_unregister(tp->key);
+ tp->key = NULL;
} else {
- /* Otherwise use bound address. */
- pj_strcpy2(&bound_name.host, pj_inet_ntoa(tmp_addr.sin_addr));
+ /* Close socket. */
+ if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(tp->sock);
+ tp->sock = PJ_INVALID_SOCKET;
+ }
}
-
+ tp->sock = PJ_INVALID_SOCKET;
}
- return pjsip_udp_transport_attach( endpt, sock, a_name, async_cnt,
- p_transport );
+ PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused"));
+
+ return PJ_SUCCESS;
}
+/*
+ * Restart transport.
+ *
+ * If option is KEEP_SOCKET, just re-activate ioqueue operation.
+ *
+ * If option is DESTROY_SOCKET:
+ * - if socket is specified, replace.
+ * - if socket is not specified, create and replace.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport,
+ unsigned option,
+ pj_sock_t sock,
+ const pj_sockaddr_in *local,
+ const pjsip_host_port *a_name)
+{
+ struct udp_transport *tp;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
+ /* Flag must be specified */
+ PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
+
+ tp = (struct udp_transport*) transport;
+
+ if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
+ char addr_buf[16];
+ pjsip_host_port bound_name;
+
+ /* Request to recreate transport */
+
+ /* Destroy existing socket, if any. */
+ if (tp->key) {
+ /* This implicitly closes the socket */
+ pj_ioqueue_unregister(tp->key);
+ tp->key = NULL;
+ } else {
+ /* Close socket. */
+ if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(tp->sock);
+ tp->sock = PJ_INVALID_SOCKET;
+ }
+ }
+ tp->sock = PJ_INVALID_SOCKET;
+
+ /* Create the socket if it's not specified */
+ if (sock == PJ_INVALID_SOCKET) {
+ status = create_socket(local, &sock);
+ if (status != PJ_SUCCESS)
+ return status;
+ }
+
+ /* If transport published name is not specified, calculate it
+ * from the bound address.
+ */
+ if (a_name == NULL) {
+ status = get_published_name(sock, addr_buf, &bound_name);
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(sock);
+ return status;
+ }
+
+ a_name = &bound_name;
+ }
+
+ /* Assign the socket and published address to transport. */
+ udp_set_socket(tp, sock, a_name);
+
+ } else {
+
+ /* For KEEP_SOCKET, transport must have been paused before */
+ PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP);
+
+ /* If address name is specified, update it */
+ if (a_name != NULL)
+ udp_set_pub_name(tp, a_name);
+ }
+
+ /* Re-register new or existing socket to ioqueue. */
+ status = register_to_ioqueue(tp);
+ if (status != PJ_SUCCESS) {
+ return status;
+ }
+
+ /* Restart async read operation. */
+ status = start_async_read(tp);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ /* Everything has been set up */
+ tp->is_paused = PJ_FALSE;
+
+ PJ_LOG(4,(tp->base.obj_name,
+ "SIP UDP transport restarted, published address is %.*s:%d",
+ (int)tp->base.local_name.host.slen,
+ tp->base.local_name.host.ptr,
+ tp->base.local_name.port));
+
+ return PJ_SUCCESS;
+}
+