summaryrefslogtreecommitdiff
path: root/pjsip/src/pjsip/sip_transport_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjsip/src/pjsip/sip_transport_udp.c')
-rw-r--r--pjsip/src/pjsip/sip_transport_udp.c820
1 files changed, 410 insertions, 410 deletions
diff --git a/pjsip/src/pjsip/sip_transport_udp.c b/pjsip/src/pjsip/sip_transport_udp.c
index a6729ffa..2dc835a3 100644
--- a/pjsip/src/pjsip/sip_transport_udp.c
+++ b/pjsip/src/pjsip/sip_transport_udp.c
@@ -1,410 +1,410 @@
-/* $Id: $ */
-/*
- * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
-#include <pjsip/sip_transport.h>
-#include <pjsip/sip_endpoint.h>
-#include <pjsip/sip_errno.h>
-#include <pj/pool.h>
-#include <pj/sock.h>
-#include <pj/os.h>
-#include <pj/lock.h>
-#include <pj/string.h>
-#include <pj/assert.h>
-
-
-/* Struct udp_transport "inherits" struct pjsip_transport */
-struct udp_transport
-{
- pjsip_transport base;
- pj_sock_t sock;
- pj_ioqueue_key_t *key;
- int rdata_cnt;
- pjsip_rx_data **rdata;
- int is_closing;
-};
-
-
-/*
- * on_read_complete()
- *
- * This is callback notification from ioqueue that a pending recvfrom()
- * operation has completed.
- */
-static void on_read_complete( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_read)
-{
- enum { MAX_IMMEDIATE_PACKET = 10 };
- pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
- pjsip_rx_data *rdata = rdata_op_key->rdata;
- struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
- int i;
- pj_status_t status;
-
- /* Don't do anything if transport is closing. */
- if (tp->is_closing)
- return;
-
- /*
- * The idea of the loop is to process immediate data received by
- * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
- * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
- * complete asynchronously, to allow other sockets to get their data.
- */
- for (i=0;; ++i) {
- pj_uint32_t flags;
-
- /* Report the packet to transport manager. */
- if (bytes_read > 0) {
- pj_size_t size_eaten;
-
- rdata->pkt_info.len = bytes_read;
- rdata->pkt_info.zero = 0;
- pj_gettimeofday(&rdata->pkt_info.timestamp);
-
- size_eaten =
- pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
- rdata);
-
- if (size_eaten < 0) {
- pj_assert(!"It shouldn't happen!");
- size_eaten = rdata->pkt_info.len;
- }
-
- /* Since this is UDP, the whole buffer is the message. */
- rdata->pkt_info.len = 0;
-
- } else if (bytes_read == 0) {
- /* TODO: */
- } else {
- /* Report error to endpoint. */
- PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
- rdata->tp_info.transport->obj_name,
- -bytes_read,
- "Warning: pj_ioqueue_recvfrom()"
- " callback error"));
- }
-
- if (i >= MAX_IMMEDIATE_PACKET) {
- /* Force ioqueue_recvfrom() to return PJ_EPENDING */
- flags = PJ_IOQUEUE_ALWAYS_ASYNC;
- } else {
- flags = 0;
- }
-
- /* Read next packet. */
- bytes_read = sizeof(rdata->pkt_info.packet);
- rdata->pkt_info.addr_len = sizeof(rdata->pkt_info.addr);
- status = pj_ioqueue_recvfrom(key, op_key,
- rdata->pkt_info.packet,
- &bytes_read, flags,
- &rdata->pkt_info.addr,
- &rdata->pkt_info.addr_len);
-
- if (status == PJ_SUCCESS) {
- /* Continue loop. */
- pj_assert(i < MAX_IMMEDIATE_PACKET);
-
- } else if (status == PJ_EPENDING) {
- break;
-
- } else {
-
- if (i < MAX_IMMEDIATE_PACKET) {
- /* Report error to endpoint. */
- PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
- rdata->tp_info.transport->obj_name,
- status,
- "Warning: pj_ioqueue_recvfrom error"));
- /* Continue loop. */
- bytes_read = 0;
- } else {
- /* This is fatal error.
- * Ioqueue operation will stop for this transport!
- */
- PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
- rdata->tp_info.transport->obj_name,
- status,
- "FATAL: pj_ioqueue_recvfrom() error, "
- "UDP transport stopping! Error"));
- break;
- }
- }
- }
-}
-
-/*
- * on_write_complete()
- *
- * This is callback notification from ioqueue that a pending sendto()
- * operation has completed.
- */
-static void on_write_complete( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
- pj_ssize_t bytes_sent)
-{
- struct udp_transport *tp = pj_ioqueue_get_user_data(key);
- pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
-
- tdata_op_key->tdata = NULL;
-
- if (tdata_op_key->callback) {
- tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
- }
-}
-
-/*
- * transport_send_msg()
- *
- * This function is called by transport manager (by transport->send_msg())
- * to send outgoing message.
- */
-static pj_status_t transport_send_msg( pjsip_transport *transport,
- pjsip_tx_data *tdata,
- const pj_sockaddr_in *rem_addr,
- void *token,
- void (*callback)(pjsip_transport*,
- void *token,
- pj_ssize_t))
-{
- struct udp_transport *tp = (struct udp_transport*)transport;
- pj_ssize_t size;
-
- PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
- PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
-
- /* Init op key. */
- tdata->op_key.tdata = tdata;
- tdata->op_key.token = token;
- tdata->op_key.callback = callback;
-
- /* Send to ioqueue! */
- size = tdata->buf.cur - tdata->buf.start;
- return pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
- tdata->buf.start, &size, 0,
- rem_addr, (rem_addr ? sizeof(pj_sockaddr_in):0));
-}
-
-/*
- * transport_destroy()
- *
- * This function is called by transport manager (by transport->destroy()).
- */
-static pj_status_t transport_destroy( pjsip_transport *transport )
-{
- struct udp_transport *tp = (struct udp_transport*)transport;
- int i;
-
- /* Mark this transport as closing. */
- tp->is_closing = 1;
-
- /* Cancel all pending operations. */
- for (i=0; i<tp->rdata_cnt; ++i) {
- pj_ioqueue_post_completion(tp->key,
- &tp->rdata[i]->tp_info.op_key.op_key, -1);
- }
-
- /* Unregister from ioqueue. */
- if (tp->key)
- pj_ioqueue_unregister(tp->key);
-
- /* Close socket. */
- if (tp->sock && tp->sock != PJ_INVALID_SOCKET)
- pj_sock_close(tp->sock);
-
- /* Destroy reference counter. */
- if (tp->base.ref_cnt)
- pj_atomic_destroy(tp->base.ref_cnt);
-
- /* Destroy lock */
- if (tp->base.lock)
- pj_lock_destroy(tp->base.lock);
-
- /* Destroy pool. */
- pjsip_endpt_destroy_pool(tp->base.endpt, tp->base.pool);
-
- return PJ_SUCCESS;
-}
-
-
-/*
- * pjsip_udp_transport_attach()
- *
- * Attach UDP socket and start transport.
- */
-PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
- pj_sock_t sock,
- const pj_sockaddr_in *pub_addr,
- unsigned async_cnt,
- pjsip_transport **p_transport)
-{
- pj_pool_t *pool;
- struct udp_transport *tp;
- pj_ioqueue_t *ioqueue;
- pj_ioqueue_callback ioqueue_cb;
- unsigned i;
- int addrlen;
- pj_status_t status;
-
- /* Create pool. */
- pool = pjsip_endpt_create_pool(endpt, "udp%p", PJSIP_POOL_LEN_TRANSPORT,
- PJSIP_POOL_INC_TRANSPORT);
- if (!pool)
- return PJ_ENOMEM;
-
- tp = pj_pool_zalloc(pool, sizeof(struct udp_transport));
- tp->base.pool = pool;
- tp->base.endpt = endpt;
-
- /* Init type, type_name, and flag */
- tp->base.type = PJSIP_TRANSPORT_UDP;
- pj_native_strcpy(tp->base.type_name, "UDP");
- tp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_UDP);
-
- /* Init addresses. */
- addrlen = sizeof(tp->base.local_addr);
- status = pj_sock_getsockname(sock, &tp->base.local_addr, &addrlen);
- if (status != PJ_SUCCESS) {
- pjsip_endpt_destroy_pool(endpt, pool);
- return status;
- }
- pj_memcpy(&tp->base.public_addr, pub_addr, sizeof(pj_sockaddr_in));
- tp->base.rem_addr.sin_family = PJ_AF_INET;
-
- /* Init reference counter. */
- status = pj_atomic_create(pool, 0, &tp->base.ref_cnt);
- if (status != PJ_SUCCESS)
- goto on_error;
-
- /* Init lock. */
- status = pj_lock_create_recursive_mutex(pool, "udp%p", &tp->base.lock);
- if (status != PJ_SUCCESS)
- goto on_error;
-
- /* Attach socket. */
- tp->sock = sock;
-
- /* Register to ioqueue. */
- ioqueue = pjsip_endpt_get_ioqueue(endpt);
- pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
- ioqueue_cb.on_read_complete = &on_read_complete;
- ioqueue_cb.on_write_complete = &on_write_complete;
- status = pj_ioqueue_register_sock(pool, ioqueue, tp->sock, tp,
- &ioqueue_cb, &tp->key);
- if (status != PJ_SUCCESS)
- goto on_error;
-
- /* Set functions. */
- tp->base.send_msg = &transport_send_msg;
- tp->base.destroy = &transport_destroy;
-
- /* This is a permanent transport, so we initialize the ref count
- * to one so that transport manager don't destroy this transport
- * when there's no user!
- */
- pj_atomic_inc(tp->base.ref_cnt);
-
- /* Register to transport manager. */
- tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
- status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp);
- if (status != PJ_SUCCESS)
- goto on_error;
-
-
- /* Create rdata and put it in the array. */
- tp->rdata_cnt = 0;
- for (i=0; i<async_cnt; ++i) {
- pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p",
- PJSIP_POOL_LEN_RDATA,
- PJSIP_POOL_INC_RDATA);
- if (!rdata_pool) {
- pj_atomic_set(tp->base.ref_cnt, 0);
- pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
- return PJ_ENOMEM;
- }
-
- tp->rdata[i] = pj_pool_zalloc(rdata_pool, sizeof(pjsip_rx_data));
- tp->rdata[i]->tp_info.pool = rdata_pool;
- tp->rdata[i]->tp_info.transport = &tp->base;
- pj_ioqueue_op_key_init(&tp->rdata[i]->tp_info.op_key.op_key,
- sizeof(pj_ioqueue_op_key_t));
-
- tp->rdata_cnt++;
- }
-
- /* Start reading the ioqueue. */
- for (i=0; i<async_cnt; ++i) {
- pj_ssize_t size;
-
- size = sizeof(tp->rdata[i]->pkt_info.packet);
- tp->rdata[i]->pkt_info.addr_len = sizeof(tp->rdata[i]->pkt_info.addr);
- status = pj_ioqueue_recvfrom(tp->key,
- &tp->rdata[i]->tp_info.op_key.op_key,
- tp->rdata[i]->pkt_info.packet,
- &size, PJ_IOQUEUE_ALWAYS_ASYNC,
- &tp->rdata[i]->pkt_info.addr,
- &tp->rdata[i]->pkt_info.addr_len);
- if (status == PJ_SUCCESS) {
- pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
- on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
- size);
- } else if (status != PJ_EPENDING) {
- /* Error! */
- pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
- return status;
- }
- }
-
- /* Done. */
- *p_transport = &tp->base;
- return PJ_SUCCESS;
-
-on_error:
- transport_destroy((pjsip_transport*)tp);
- return status;
-}
-
-/*
- * pjsip_udp_transport_start()
- *
- * Start an UDP transport/listener.
- */
-PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
- const pj_sockaddr_in *local,
- const pj_sockaddr_in *pub_addr,
- unsigned async_cnt,
- pjsip_transport **p_transport)
-{
- pj_sock_t sock;
- pj_status_t status;
-
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
- if (status != PJ_SUCCESS)
- return status;
-
- status = pj_sock_bind(sock, local, sizeof(*local));
- if (status != PJ_SUCCESS) {
- pj_sock_close(sock);
- return status;
- }
-
- return pjsip_udp_transport_attach( endpt, sock, pub_addr, async_cnt,
- p_transport );
-}
-
-
+/* $Id: $ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjsip/sip_transport.h>
+#include <pjsip/sip_endpoint.h>
+#include <pjsip/sip_errno.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/os.h>
+#include <pj/lock.h>
+#include <pj/string.h>
+#include <pj/assert.h>
+
+
+/* Struct udp_transport "inherits" struct pjsip_transport */
+struct udp_transport
+{
+ pjsip_transport base;
+ pj_sock_t sock;
+ pj_ioqueue_key_t *key;
+ int rdata_cnt;
+ pjsip_rx_data **rdata;
+ int is_closing;
+};
+
+
+/*
+ * on_read_complete()
+ *
+ * This is callback notification from ioqueue that a pending recvfrom()
+ * operation has completed.
+ */
+static void on_read_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ enum { MAX_IMMEDIATE_PACKET = 10 };
+ pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
+ pjsip_rx_data *rdata = rdata_op_key->rdata;
+ struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
+ int i;
+ pj_status_t status;
+
+ /* Don't do anything if transport is closing. */
+ if (tp->is_closing)
+ return;
+
+ /*
+ * The idea of the loop is to process immediate data received by
+ * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
+ * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
+ * complete asynchronously, to allow other sockets to get their data.
+ */
+ for (i=0;; ++i) {
+ pj_uint32_t flags;
+
+ /* Report the packet to transport manager. */
+ if (bytes_read > 0) {
+ pj_size_t size_eaten;
+
+ rdata->pkt_info.len = bytes_read;
+ rdata->pkt_info.zero = 0;
+ pj_gettimeofday(&rdata->pkt_info.timestamp);
+
+ size_eaten =
+ pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
+ rdata);
+
+ if (size_eaten < 0) {
+ pj_assert(!"It shouldn't happen!");
+ size_eaten = rdata->pkt_info.len;
+ }
+
+ /* Since this is UDP, the whole buffer is the message. */
+ rdata->pkt_info.len = 0;
+
+ } else if (bytes_read == 0) {
+ /* TODO: */
+ } else {
+ /* Report error to endpoint. */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ -bytes_read,
+ "Warning: pj_ioqueue_recvfrom()"
+ " callback error"));
+ }
+
+ if (i >= MAX_IMMEDIATE_PACKET) {
+ /* Force ioqueue_recvfrom() to return PJ_EPENDING */
+ flags = PJ_IOQUEUE_ALWAYS_ASYNC;
+ } else {
+ flags = 0;
+ }
+
+ /* Read next packet. */
+ bytes_read = sizeof(rdata->pkt_info.packet);
+ rdata->pkt_info.addr_len = sizeof(rdata->pkt_info.addr);
+ status = pj_ioqueue_recvfrom(key, op_key,
+ rdata->pkt_info.packet,
+ &bytes_read, flags,
+ &rdata->pkt_info.addr,
+ &rdata->pkt_info.addr_len);
+
+ if (status == PJ_SUCCESS) {
+ /* Continue loop. */
+ pj_assert(i < MAX_IMMEDIATE_PACKET);
+
+ } else if (status == PJ_EPENDING) {
+ break;
+
+ } else {
+
+ if (i < MAX_IMMEDIATE_PACKET) {
+ /* Report error to endpoint. */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ status,
+ "Warning: pj_ioqueue_recvfrom error"));
+ /* Continue loop. */
+ bytes_read = 0;
+ } else {
+ /* This is fatal error.
+ * Ioqueue operation will stop for this transport!
+ */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ status,
+ "FATAL: pj_ioqueue_recvfrom() error, "
+ "UDP transport stopping! Error"));
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * on_write_complete()
+ *
+ * This is callback notification from ioqueue that a pending sendto()
+ * operation has completed.
+ */
+static void on_write_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ struct udp_transport *tp = pj_ioqueue_get_user_data(key);
+ pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
+
+ tdata_op_key->tdata = NULL;
+
+ if (tdata_op_key->callback) {
+ tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
+ }
+}
+
+/*
+ * transport_send_msg()
+ *
+ * This function is called by transport manager (by transport->send_msg())
+ * to send outgoing message.
+ */
+static pj_status_t transport_send_msg( pjsip_transport *transport,
+ pjsip_tx_data *tdata,
+ const pj_sockaddr_in *rem_addr,
+ void *token,
+ void (*callback)(pjsip_transport*,
+ void *token,
+ pj_ssize_t))
+{
+ struct udp_transport *tp = (struct udp_transport*)transport;
+ pj_ssize_t size;
+
+ PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
+ PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
+
+ /* Init op key. */
+ tdata->op_key.tdata = tdata;
+ tdata->op_key.token = token;
+ tdata->op_key.callback = callback;
+
+ /* Send to ioqueue! */
+ size = tdata->buf.cur - tdata->buf.start;
+ return pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
+ tdata->buf.start, &size, 0,
+ rem_addr, (rem_addr ? sizeof(pj_sockaddr_in):0));
+}
+
+/*
+ * transport_destroy()
+ *
+ * This function is called by transport manager (by transport->destroy()).
+ */
+static pj_status_t transport_destroy( pjsip_transport *transport )
+{
+ struct udp_transport *tp = (struct udp_transport*)transport;
+ int i;
+
+ /* Mark this transport as closing. */
+ tp->is_closing = 1;
+
+ /* Cancel all pending operations. */
+ for (i=0; i<tp->rdata_cnt; ++i) {
+ pj_ioqueue_post_completion(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key, -1);
+ }
+
+ /* Unregister from ioqueue. */
+ if (tp->key)
+ pj_ioqueue_unregister(tp->key);
+
+ /* Close socket. */
+ if (tp->sock && tp->sock != PJ_INVALID_SOCKET)
+ pj_sock_close(tp->sock);
+
+ /* Destroy reference counter. */
+ if (tp->base.ref_cnt)
+ pj_atomic_destroy(tp->base.ref_cnt);
+
+ /* Destroy lock */
+ if (tp->base.lock)
+ pj_lock_destroy(tp->base.lock);
+
+ /* Destroy pool. */
+ pjsip_endpt_destroy_pool(tp->base.endpt, tp->base.pool);
+
+ return PJ_SUCCESS;
+}
+
+
+/*
+ * pjsip_udp_transport_attach()
+ *
+ * Attach UDP socket and start transport.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
+ pj_sock_t sock,
+ const pj_sockaddr_in *pub_addr,
+ unsigned async_cnt,
+ pjsip_transport **p_transport)
+{
+ pj_pool_t *pool;
+ struct udp_transport *tp;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback ioqueue_cb;
+ unsigned i;
+ int addrlen;
+ pj_status_t status;
+
+ /* Create pool. */
+ pool = pjsip_endpt_create_pool(endpt, "udp%p", PJSIP_POOL_LEN_TRANSPORT,
+ PJSIP_POOL_INC_TRANSPORT);
+ if (!pool)
+ return PJ_ENOMEM;
+
+ tp = pj_pool_zalloc(pool, sizeof(struct udp_transport));
+ tp->base.pool = pool;
+ tp->base.endpt = endpt;
+
+ /* Init type, type_name, and flag */
+ tp->base.type = PJSIP_TRANSPORT_UDP;
+ pj_native_strcpy(tp->base.type_name, "UDP");
+ tp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_UDP);
+
+ /* Init addresses. */
+ addrlen = sizeof(tp->base.local_addr);
+ status = pj_sock_getsockname(sock, &tp->base.local_addr, &addrlen);
+ if (status != PJ_SUCCESS) {
+ pjsip_endpt_destroy_pool(endpt, pool);
+ return status;
+ }
+ pj_memcpy(&tp->base.public_addr, pub_addr, sizeof(pj_sockaddr_in));
+ tp->base.rem_addr.sin_family = PJ_AF_INET;
+
+ /* Init reference counter. */
+ status = pj_atomic_create(pool, 0, &tp->base.ref_cnt);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Init lock. */
+ status = pj_lock_create_recursive_mutex(pool, "udp%p", &tp->base.lock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Attach socket. */
+ tp->sock = sock;
+
+ /* Register to ioqueue. */
+ ioqueue = pjsip_endpt_get_ioqueue(endpt);
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_read_complete;
+ ioqueue_cb.on_write_complete = &on_write_complete;
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->sock, tp,
+ &ioqueue_cb, &tp->key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Set functions. */
+ tp->base.send_msg = &transport_send_msg;
+ tp->base.destroy = &transport_destroy;
+
+ /* This is a permanent transport, so we initialize the ref count
+ * to one so that transport manager don't destroy this transport
+ * when there's no user!
+ */
+ pj_atomic_inc(tp->base.ref_cnt);
+
+ /* Register to transport manager. */
+ tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
+ status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create rdata and put it in the array. */
+ tp->rdata_cnt = 0;
+ for (i=0; i<async_cnt; ++i) {
+ pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p",
+ PJSIP_POOL_LEN_RDATA,
+ PJSIP_POOL_INC_RDATA);
+ if (!rdata_pool) {
+ pj_atomic_set(tp->base.ref_cnt, 0);
+ pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ return PJ_ENOMEM;
+ }
+
+ tp->rdata[i] = pj_pool_zalloc(rdata_pool, sizeof(pjsip_rx_data));
+ tp->rdata[i]->tp_info.pool = rdata_pool;
+ tp->rdata[i]->tp_info.transport = &tp->base;
+ pj_ioqueue_op_key_init(&tp->rdata[i]->tp_info.op_key.op_key,
+ sizeof(pj_ioqueue_op_key_t));
+
+ tp->rdata_cnt++;
+ }
+
+ /* Start reading the ioqueue. */
+ for (i=0; i<async_cnt; ++i) {
+ pj_ssize_t size;
+
+ size = sizeof(tp->rdata[i]->pkt_info.packet);
+ tp->rdata[i]->pkt_info.addr_len = sizeof(tp->rdata[i]->pkt_info.addr);
+ status = pj_ioqueue_recvfrom(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key,
+ tp->rdata[i]->pkt_info.packet,
+ &size, PJ_IOQUEUE_ALWAYS_ASYNC,
+ &tp->rdata[i]->pkt_info.addr,
+ &tp->rdata[i]->pkt_info.addr_len);
+ if (status == PJ_SUCCESS) {
+ pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
+ on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
+ size);
+ } else if (status != PJ_EPENDING) {
+ /* Error! */
+ pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ return status;
+ }
+ }
+
+ /* Done. */
+ *p_transport = &tp->base;
+ return PJ_SUCCESS;
+
+on_error:
+ transport_destroy((pjsip_transport*)tp);
+ return status;
+}
+
+/*
+ * pjsip_udp_transport_start()
+ *
+ * Start an UDP transport/listener.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
+ const pj_sockaddr_in *local,
+ const pj_sockaddr_in *pub_addr,
+ unsigned async_cnt,
+ pjsip_transport **p_transport)
+{
+ pj_sock_t sock;
+ pj_status_t status;
+
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &sock);
+ if (status != PJ_SUCCESS)
+ return status;
+
+ status = pj_sock_bind(sock, local, sizeof(*local));
+ if (status != PJ_SUCCESS) {
+ pj_sock_close(sock);
+ return status;
+ }
+
+ return pjsip_udp_transport_attach( endpt, sock, pub_addr, async_cnt,
+ p_transport );
+}
+
+