summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenny Prijono <bennylp@teluu.com>2005-11-19 13:20:08 +0000
committerBenny Prijono <bennylp@teluu.com>2005-11-19 13:20:08 +0000
commit87b05ce12596910229d12c59aebfe566d00e8a96 (patch)
tree925aad4e6c71629d1a72529839ced9f4255ae280
parente5dbd1fd5564952a7ad242d887b643ef962ec90b (diff)
Added UDP transport implementation
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@57 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r--pjsip/build/pjsip_core.dsp4
-rw-r--r--pjsip/include/pjsip/sip_endpoint.h19
-rw-r--r--pjsip/include/pjsip/sip_transport.h44
-rw-r--r--pjsip/src/pjsip/sip_endpoint.c38
-rw-r--r--pjsip/src/pjsip/sip_transport.c16
-rw-r--r--pjsip/src/pjsip/sip_transport_udp.c379
6 files changed, 476 insertions, 24 deletions
diff --git a/pjsip/build/pjsip_core.dsp b/pjsip/build/pjsip_core.dsp
index 2204a6f3..7f110d5d 100644
--- a/pjsip/build/pjsip_core.dsp
+++ b/pjsip/build/pjsip_core.dsp
@@ -127,6 +127,10 @@ SOURCE=..\src\pjsip\sip_transport.c
# End Source File
# Begin Source File
+SOURCE=..\src\pjsip\sip_transport_udp.c
+# End Source File
+# Begin Source File
+
SOURCE=..\src\pjsip\sip_uri.c
# End Source File
# End Group
diff --git a/pjsip/include/pjsip/sip_endpoint.h b/pjsip/include/pjsip/sip_endpoint.h
index 24eca24f..37f4acee 100644
--- a/pjsip/include/pjsip/sip_endpoint.h
+++ b/pjsip/include/pjsip/sip_endpoint.h
@@ -80,6 +80,7 @@ PJ_BEGIN_DECL
* @return PJ_SUCCESS on success.
*/
PJ_DECL(pj_status_t) pjsip_endpt_create(pj_pool_factory *pf,
+ const char *name,
pjsip_endpoint **endpt);
/**
@@ -252,6 +253,24 @@ PJ_DECL(void) pjsip_endpt_resolve( pjsip_endpoint *endpt,
pjsip_resolver_callback *cb);
/**
+ * Get transport manager instance.
+ *
+ * @param endpt The endpoint.
+ *
+ * @return Transport manager instance.
+ */
+PJ_DECL(pjsip_tpmgr*) pjsip_endpt_get_tpmgr(pjsip_endpoint *endpt);
+
+/**
+ * Get ioqueue instance.
+ *
+ * @param endpt The endpoint.
+ *
+ * @return The ioqueue.
+ */
+PJ_DECL(pj_ioqueue_t*) pjsip_endpt_get_ioqueue(pjsip_endpoint *endpt);
+
+/**
* Find a SIP transport suitable for sending SIP message to the specified
* address. This function will complete asynchronously when the transport is
* ready (for example, when TCP socket is connected), and when it completes,
diff --git a/pjsip/include/pjsip/sip_transport.h b/pjsip/include/pjsip/sip_transport.h
index 36cf863e..741f5478 100644
--- a/pjsip/include/pjsip/sip_transport.h
+++ b/pjsip/include/pjsip/sip_transport.h
@@ -116,6 +116,17 @@ pjsip_transport_get_default_port_for_type(pjsip_transport_type_e type);
*
*****************************************************************************/
+/**
+ * A customized ioqueue async operation key which is used by transport
+ * to locate rdata when a pending read operation completes.
+ */
+typedef struct pjsip_rx_data_op_key
+{
+ pj_ioqueue_op_key_t op_key;
+ pjsip_rx_data *rdata;
+} pjsip_rx_data_op_key;
+
+
/**
* Incoming message buffer.
* This structure keep all the information regarding the received message. This
@@ -139,7 +150,7 @@ struct pjsip_rx_data
pjsip_transport *transport;
/** Ioqueue key. */
- pj_ioqueue_op_key_t op_key;
+ pjsip_rx_data_op_key op_key;
} tp_info;
@@ -249,6 +260,18 @@ struct pjsip_rx_data
*
*****************************************************************************/
+/** Customized ioqueue async operation key, used by transport to keep
+ * callback parameters.
+ */
+typedef struct pjsip_tx_data_op_key
+{
+ pj_ioqueue_op_key_t key;
+ pjsip_tx_data *tdata;
+ void *token;
+ void (*callback)(pjsip_transport*,void*,pj_ssize_t);
+} pjsip_tx_data_op_key;
+
+
/**
* Data structure for sending outgoing message. Application normally creates
* this buffer by calling #pjsip_endpt_create_tdata.
@@ -265,6 +288,7 @@ struct pjsip_rx_data
*/
struct pjsip_tx_data
{
+ /** This is for transmission queue; it's managed by transports. */
PJ_DECL_LIST_MEMBER(struct pjsip_tx_data);
/** Memory pool for this buffer. */
@@ -284,7 +308,7 @@ struct pjsip_tx_data
pjsip_tpmgr *mgr;
/** Ioqueue asynchronous operation key. */
- pj_ioqueue_op_key_t op_key;
+ pjsip_tx_data_op_key op_key;
/** Lock object. */
pj_lock_t *lock;
@@ -303,10 +327,10 @@ struct pjsip_tx_data
/** Reference counter. */
pj_atomic_t *ref_cnt;
- /** Being sent? */
+ /** Being processed by transport? */
int is_pending;
- /** Transport internal. */
+ /** Transport manager internal. */
void *token;
void (*cb)(void*, pjsip_tx_data*, pj_status_t);
};
@@ -391,6 +415,7 @@ typedef struct pjsip_transport
pj_sockaddr_in public_addr; /**< STUN addres. */
pj_sockaddr_in rem_addr; /**< Remote addr (zero for UDP) */
+ pjsip_endpoint *endpt; /**< Endpoint instance. */
pjsip_tpmgr *tpmgr; /**< Transport manager. */
pj_timer_entry idle_timer; /**< Timer when ref cnt is zero.*/
@@ -420,14 +445,12 @@ typedef struct pjsip_transport
* Other return values indicate the error code.
*/
pj_status_t (*send_msg)(pjsip_transport *transport,
- const void *packet,
- pj_size_t length,
- pj_ioqueue_op_key_t *op_key,
+ pjsip_tx_data *tdata,
const pj_sockaddr_in *rem_addr,
void *token,
void (*callback)(pjsip_transport *transport,
void *token,
- pj_status_t status));
+ pj_ssize_t sent_bytes));
/**
* Destroy this transport.
@@ -508,7 +531,6 @@ struct pjsip_tpfactory
pj_status_t (*create_transport)(pjsip_tpfactory *factory,
pjsip_tpmgr *mgr,
pjsip_endpoint *endpt,
- pj_ioqueue_t *ioqueue,
const pj_sockaddr_in *rem_addr,
pjsip_transport **transport);
@@ -555,8 +577,6 @@ PJ_DECL(pj_status_t) pjsip_tpmgr_unregister_tpfactory(pjsip_tpmgr *mgr,
*/
PJ_DECL(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool,
pjsip_endpoint * endpt,
- pj_ioqueue_t *ioqueue,
- pj_timer_heap_t *timer_heap,
void (*cb)(pjsip_endpoint*,
pj_status_t,
pjsip_rx_data *),
@@ -601,7 +621,7 @@ PJ_DECL(pj_status_t) pjsip_transport_send( pjsip_transport *tr,
void *token,
void (*cb)(void *token,
pjsip_tx_data *tdata,
- pj_status_t));
+ pj_ssize_t bytes_sent));
/**
diff --git a/pjsip/src/pjsip/sip_endpoint.c b/pjsip/src/pjsip/sip_endpoint.c
index 398c1c55..0d80b994 100644
--- a/pjsip/src/pjsip/sip_endpoint.c
+++ b/pjsip/src/pjsip/sip_endpoint.c
@@ -53,6 +53,9 @@ struct pjsip_endpoint
/** Pool factory. */
pj_pool_factory *pf;
+ /** Name. */
+ pj_str_t name;
+
/** Transaction table. */
pj_hash_table_t *tsx_table;
@@ -352,6 +355,7 @@ PJ_DEF(const pjsip_route_hdr*) pjsip_endpt_get_routing( pjsip_endpoint *endpt )
* Initialize endpoint.
*/
PJ_DEF(pj_status_t) pjsip_endpt_create(pj_pool_factory *pf,
+ const char *name,
pjsip_endpoint **p_endpt)
{
pj_status_t status;
@@ -376,6 +380,14 @@ PJ_DEF(pj_status_t) pjsip_endpt_create(pj_pool_factory *pf,
endpt->pool = pool;
endpt->pf = pf;
+ /* Get name. */
+ if (name != NULL) {
+ pj_str_t temp;
+ pj_strdup_with_null(endpt->pool, &endpt->name, pj_cstr(&temp, name));
+ } else {
+ pj_strdup_with_null(endpt->pool, &endpt->name, pj_gethostname());
+ }
+
/* Create mutex for the events, etc. */
status = pj_mutex_create_recursive( endpt->pool, "ept%p", &endpt->mutex );
if (status != PJ_SUCCESS) {
@@ -422,7 +434,6 @@ PJ_DEF(pj_status_t) pjsip_endpt_create(pj_pool_factory *pf,
/* Create transport manager. */
status = pjsip_tpmgr_create( endpt->pool, endpt,
- endpt->ioqueue, endpt->timer_heap,
&endpt_transport_callback,
&endpt->transport_mgr);
if (status != PJ_SUCCESS) {
@@ -505,6 +516,15 @@ PJ_DEF(void) pjsip_endpt_destroy(pjsip_endpoint *endpt)
}
/*
+ * Get endpoint name.
+ */
+PJ_DEF(const pj_str_t*) pjsip_endpt_name(const pjsip_endpoint *endpt)
+{
+ return &endpt->name;
+}
+
+
+/*
* Create new pool.
*/
PJ_DEF(pj_pool_t*) pjsip_endpt_create_pool( pjsip_endpoint *endpt,
@@ -942,6 +962,22 @@ PJ_DEF(void) pjsip_endpt_resolve( pjsip_endpoint *endpt,
}
/*
+ * Get transport manager.
+ */
+PJ_DEF(pjsip_tpmgr*) pjsip_endpt_get_tpmgr(pjsip_endpoint *endpt)
+{
+ return endpt->transport_mgr;
+}
+
+/*
+ * Get ioqueue instance.
+ */
+PJ_DEF(pj_ioqueue_t*) pjsip_endpt_get_ioqueue(pjsip_endpoint *endpt)
+{
+ return endpt->ioqueue;
+}
+
+/*
* Find/create transport.
*/
PJ_DECL(pj_status_t) pjsip_endpt_alloc_transport( pjsip_endpoint *endpt,
diff --git a/pjsip/src/pjsip/sip_transport.c b/pjsip/src/pjsip/sip_transport.c
index cd9b564f..013e45ea 100644
--- a/pjsip/src/pjsip/sip_transport.c
+++ b/pjsip/src/pjsip/sip_transport.c
@@ -42,8 +42,6 @@ struct pjsip_tpmgr
pj_hash_table_t *table;
pj_lock_t *lock;
pjsip_endpoint *endpt;
- pj_ioqueue_t *ioqueue;
- pj_timer_heap_t *timer_heap;
pjsip_tpfactory factory_list;
void (*msg_cb)(pjsip_endpoint*, pj_status_t, pjsip_rx_data*);
};
@@ -356,7 +354,7 @@ PJ_DEF(pj_status_t) pjsip_transport_add_ref( pjsip_transport *tp )
/* Verify again. */
if (pj_atomic_get(tp->ref_cnt) == 1) {
if (tp->idle_timer.id != PJ_FALSE) {
- pj_timer_heap_cancel(tp->tpmgr->timer_heap, &tp->idle_timer);
+ pjsip_endpt_cancel_timer(tp->tpmgr->endpt, &tp->idle_timer);
tp->idle_timer.id = PJ_FALSE;
}
}
@@ -383,7 +381,8 @@ PJ_DEF(pj_status_t) pjsip_transport_dec_ref( pjsip_transport *tp )
pj_assert(tp->idle_timer.id == 0);
tp->idle_timer.id = PJ_TRUE;
- pj_timer_heap_schedule(tp->tpmgr->timer_heap, &tp->idle_timer, &delay);
+ pjsip_endpt_schedule_timer(tp->tpmgr->endpt, &tp->idle_timer,
+ &delay);
}
pj_lock_release(tp->tpmgr->lock);
}
@@ -440,7 +439,7 @@ PJ_DEF(pj_status_t) pjsip_transport_unregister( pjsip_tpmgr *mgr,
*/
pj_assert(tp->idle_timer.id == PJ_FALSE);
if (tp->idle_timer.id != PJ_FALSE) {
- pj_timer_heap_cancel(mgr->timer_heap, &tp->idle_timer);
+ pjsip_endpt_cancel_timer(mgr->endpt, &tp->idle_timer);
tp->idle_timer.id = PJ_FALSE;
}
@@ -531,8 +530,6 @@ PJ_DEF(pj_status_t) pjsip_tpmgr_unregister_tpfactory( pjsip_tpmgr *mgr,
*/
PJ_DEF(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool,
pjsip_endpoint *endpt,
- pj_ioqueue_t *ioqueue,
- pj_timer_heap_t *timer_heap,
void (*cb)(pjsip_endpoint*,
pj_status_t,
pjsip_rx_data *),
@@ -548,8 +545,6 @@ PJ_DEF(pj_status_t) pjsip_tpmgr_create( pj_pool_t *pool,
mgr = pj_pool_zalloc(pool, sizeof(*mgr));
mgr->endpt = endpt;
mgr->msg_cb = cb;
- mgr->ioqueue = ioqueue;
- mgr->timer_heap = timer_heap;
pj_list_init(&mgr->factory_list);
mgr->table = pj_hash_create(pool, PJSIP_TPMGR_HTABLE_SIZE);
@@ -592,7 +587,6 @@ PJ_DEF(pj_status_t) pjsip_tpmgr_destroy( pjsip_tpmgr *mgr )
itr = next;
}
- pj_ioqueue_destroy(mgr->ioqueue);
pj_lock_release(mgr->lock);
@@ -801,7 +795,7 @@ PJ_DEF(pj_status_t) pjsip_tpmgr_alloc_transport( pjsip_tpmgr *mgr,
/* Request factory to create transport. */
status = factory->create_transport(factory, mgr, mgr->endpt,
- mgr->ioqueue, remote, p_transport);
+ remote, p_transport);
pj_lock_release(mgr->lock);
return status;
diff --git a/pjsip/src/pjsip/sip_transport_udp.c b/pjsip/src/pjsip/sip_transport_udp.c
new file mode 100644
index 00000000..dfe8492f
--- /dev/null
+++ b/pjsip/src/pjsip/sip_transport_udp.c
@@ -0,0 +1,379 @@
+/* $Id: $ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjsip/sip_transport.h>
+#include <pjsip/sip_endpoint.h>
+#include <pjsip/sip_errno.h>
+#include <pj/pool.h>
+#include <pj/sock.h>
+#include <pj/os.h>
+#include <pj/lock.h>
+#include <pj/string.h>
+#include <pj/assert.h>
+
+
+/* Struct udp_transport "inherits" struct pjsip_transport */
+struct udp_transport
+{
+ pjsip_transport base;
+ pj_sock_t sock;
+ pj_ioqueue_key_t *key;
+ int rdata_cnt;
+ pjsip_rx_data **rdata;
+ int is_closing;
+};
+
+
+/*
+ * on_read_complete()
+ *
+ * This is callback notification from ioqueue that a pending recvfrom()
+ * operation has completed.
+ */
+static void on_read_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ enum { MAX_IMMEDIATE_PACKET = 10 };
+ pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
+ pjsip_rx_data *rdata = rdata_op_key->rdata;
+ struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
+ int i;
+ pj_status_t status;
+
+ /* Don't do anything if transport is closing. */
+ if (tp->is_closing)
+ return;
+
+ /*
+ * The idea of the loop is to process immediate data received by
+ * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
+ * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
+ * complete asynchronously, to allow other sockets to get their data.
+ */
+ for (i=0;; ++i) {
+ pj_uint32_t flags;
+
+ /* Report the packet to transport manager. */
+ if (bytes_read > 0) {
+ pj_size_t size_eaten;
+
+ rdata->pkt_info.len = bytes_read;
+ rdata->pkt_info.zero = 0;
+ pj_gettimeofday(&rdata->pkt_info.timestamp);
+
+ size_eaten =
+ pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
+ rdata);
+
+ if (size_eaten < 0) {
+ pj_assert(!"It shouldn't happen!");
+ size_eaten = rdata->pkt_info.len;
+ }
+
+ /* Since this is UDP, the whole buffer is the message. */
+ rdata->pkt_info.len = 0;
+
+ } else if (bytes_read == 0) {
+ /* TODO: */
+ } else {
+ /* Report error to endpoint. */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ -bytes_read,
+ "Warning: pj_ioqueue_recvfrom()"
+ " callback error"));
+ }
+
+ if (i >= MAX_IMMEDIATE_PACKET) {
+ /* Force ioqueue_recvfrom() to return PJ_EPENDING */
+ flags = PJ_IOQUEUE_ALWAYS_ASYNC;
+ } else {
+ flags = 0;
+ }
+
+ /* Read next packet. */
+ bytes_read = sizeof(rdata->pkt_info.packet);
+ status = pj_ioqueue_recvfrom(key, op_key,
+ rdata->pkt_info.packet,
+ &bytes_read, flags,
+ &rdata->pkt_info.addr,
+ &rdata->pkt_info.addr_len);
+
+ if (status == PJ_SUCCESS) {
+ /* Continue loop. */
+ pj_assert(i < MAX_IMMEDIATE_PACKET);
+
+ } else if (status == PJ_EPENDING) {
+ break;
+
+ } else {
+
+ if (i < MAX_IMMEDIATE_PACKET) {
+ /* Report error to endpoint. */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ status,
+ "Warning: pj_ioqueue_recvfrom error"));
+ /* Continue loop. */
+ bytes_read = 0;
+ } else {
+ /* This is fatal error.
+ * Ioqueue operation will stop for this transport!
+ */
+ PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
+ rdata->tp_info.transport->obj_name,
+ status,
+ "FATAL: pj_ioqueue_recvfrom() error, "
+ "UDP transport stopping! Error"));
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * on_write_complete()
+ *
+ * This is callback notification from ioqueue that a pending sendto()
+ * operation has completed.
+ */
+static void on_write_complete( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_sent)
+{
+ struct udp_transport *tp = pj_ioqueue_get_user_data(key);
+ pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
+
+ if (tdata_op_key->callback) {
+ tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
+ }
+}
+
+/*
+ * transport_send_msg()
+ *
+ * This function is called by transport manager (by transport->send_msg())
+ * to send outgoing message.
+ */
+static pj_status_t transport_send_msg( pjsip_transport *transport,
+ pjsip_tx_data *tdata,
+ const pj_sockaddr_in *rem_addr,
+ void *token,
+ void (*callback)(pjsip_transport*,
+ void *token,
+ pj_ssize_t))
+{
+ struct udp_transport *tp = (struct udp_transport*)transport;
+ pj_ssize_t size;
+
+ PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
+
+ /* Init op key. */
+ tdata->op_key.tdata = tdata;
+ tdata->op_key.token = token;
+ tdata->op_key.callback = callback;
+
+ /* Send to ioqueue! */
+ size = tdata->buf.cur - tdata->buf.start;
+ return pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
+ tdata->buf.start, &size, 0,
+ rem_addr, (rem_addr ? sizeof(pj_sockaddr_in):0));
+}
+
+/*
+ * transport_destroy()
+ *
+ * This function is called by transport manager (by transport->destroy()).
+ */
+static pj_status_t transport_destroy( pjsip_transport *transport )
+{
+ struct udp_transport *tp = (struct udp_transport*)transport;
+ int i;
+
+ /* Mark this transport as closing. */
+ tp->is_closing = 1;
+
+ /* Cancel all pending operations. */
+ for (i=0; i<tp->rdata_cnt; ++i) {
+ pj_ioqueue_post_completion(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key, -1);
+ }
+
+ /* Unregister from ioqueue. */
+ if (tp->key)
+ pj_ioqueue_unregister(tp->key);
+
+ /* Close socket. */
+ if (tp->sock && tp->sock != PJ_INVALID_SOCKET)
+ pj_sock_close(tp->sock);
+
+ /* Destroy reference counter. */
+ if (tp->base.ref_cnt)
+ pj_atomic_destroy(tp->base.ref_cnt);
+
+ /* Destroy lock */
+ if (tp->base.lock)
+ pj_lock_destroy(tp->base.lock);
+
+ /* Destroy pool. */
+ pjsip_endpt_destroy_pool(tp->base.endpt, tp->base.pool);
+
+ return PJ_SUCCESS;
+}
+
+
+/*
+ * pjsip_udp_transport_start()
+ *
+ * Start an UDP transport/listener.
+ */
+PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
+ const pj_sockaddr_in *local,
+ const pj_sockaddr_in *pub_addr,
+ unsigned async_cnt,
+ pjsip_transport **p_transport)
+{
+ pj_pool_t *pool;
+ struct udp_transport *tp;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback ioqueue_cb;
+ unsigned i;
+ pj_status_t status;
+
+ /* Create pool. */
+ pool = pjsip_endpt_create_pool(endpt, "udp%p", PJSIP_POOL_LEN_TRANSPORT,
+ PJSIP_POOL_INC_TRANSPORT);
+ if (!pool)
+ return PJ_ENOMEM;
+
+ tp = pj_pool_zalloc(pool, sizeof(struct udp_transport));
+ tp->base.pool = pool;
+ tp->base.endpt = endpt;
+
+ /* Init type, type_name, and flag */
+ tp->base.type = PJSIP_TRANSPORT_UDP;
+ pj_native_strcpy(tp->base.type_name, "UDP");
+ tp->base.flag = pjsip_transport_get_flag_from_type(PJSIP_TRANSPORT_UDP);
+
+ /* Init addresses. */
+ pj_memcpy(&tp->base.local_addr, local, sizeof(pj_sockaddr_in));
+ pj_memcpy(&tp->base.public_addr, pub_addr, sizeof(pj_sockaddr_in));
+ tp->base.rem_addr.sin_family = PJ_AF_INET;
+
+ /* Init reference counter. */
+ status = pj_atomic_create(pool, 0, &tp->base.ref_cnt);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Init lock. */
+ status = pj_lock_create_recursive_mutex(pool, "udp%p", &tp->base.lock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Create socket. */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &tp->sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Bind socket. */
+ status = pj_sock_bind(tp->sock, local, sizeof(pj_sockaddr_in));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Register to ioqueue. */
+ ioqueue = pjsip_endpt_get_ioqueue(endpt);
+ pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
+ ioqueue_cb.on_read_complete = &on_read_complete;
+ ioqueue_cb.on_write_complete = &on_write_complete;
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->sock, tp,
+ &ioqueue_cb, &tp->key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Set functions. */
+ tp->base.send_msg = &transport_send_msg;
+ tp->base.destroy = &transport_destroy;
+
+ /* This is a permanent transport, so we initialize the ref count
+ * to one so that transport manager don't destroy this transport
+ * when there's no user!
+ */
+ pj_atomic_inc(tp->base.ref_cnt);
+
+ /* Register to transport manager. */
+ tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
+ status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create rdata and put it in the array. */
+ tp->rdata_cnt = 0;
+ for (i=0; i<async_cnt; ++i) {
+ pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p",
+ PJSIP_POOL_LEN_RDATA,
+ PJSIP_POOL_INC_RDATA);
+ if (!rdata_pool) {
+ pj_atomic_set(tp->base.ref_cnt, 0);
+ pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ return PJ_ENOMEM;
+ }
+
+ tp->rdata[i] = pj_pool_zalloc(rdata_pool, sizeof(pjsip_rx_data));
+ tp->rdata[i]->tp_info.pool = rdata_pool;
+ tp->rdata[i]->tp_info.transport = &tp->base;
+ pj_ioqueue_op_key_init(&tp->rdata[i]->tp_info.op_key.op_key,
+ sizeof(pj_ioqueue_op_key_t));
+
+ tp->rdata_cnt++;
+ }
+
+ /* Start reading the ioqueue. */
+ for (i=0; i<async_cnt; ++i) {
+ pj_ssize_t size;
+
+ size = sizeof(tp->rdata[i]->pkt_info.packet);
+ tp->rdata[i]->pkt_info.addr_len = sizeof(tp->rdata[i]->pkt_info.addr);
+ status = pj_ioqueue_recvfrom(tp->key,
+ &tp->rdata[i]->tp_info.op_key.op_key,
+ tp->rdata[i]->pkt_info.packet,
+ &size, PJ_IOQUEUE_ALWAYS_ASYNC,
+ &tp->rdata[i]->pkt_info.addr,
+ &tp->rdata[i]->pkt_info.addr_len);
+ if (status == PJ_SUCCESS) {
+ pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
+ on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
+ size);
+ } else if (status != PJ_EPENDING) {
+ /* Error! */
+ pjsip_transport_unregister(tp->base.tpmgr, &tp->base);
+ return status;
+ }
+ }
+
+ /* Done. */
+ *p_transport = &tp->base;
+ return PJ_SUCCESS;
+
+on_error:
+ transport_destroy((pjsip_transport*)tp);
+ return status;
+}
+
+