diff options
Diffstat (limited to 'pjsip/src/pjsip/sip_transport_udp.c')
-rw-r--r-- | pjsip/src/pjsip/sip_transport_udp.c | 1095 |
1 files changed, 1095 insertions, 0 deletions
diff --git a/pjsip/src/pjsip/sip_transport_udp.c b/pjsip/src/pjsip/sip_transport_udp.c new file mode 100644 index 0000000..60b3175 --- /dev/null +++ b/pjsip/src/pjsip/sip_transport_udp.c @@ -0,0 +1,1095 @@ +/* $Id: sip_transport_udp.c 3553 2011-05-05 06:14:19Z nanang $ */ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pjsip/sip_transport_udp.h> +#include <pjsip/sip_endpoint.h> +#include <pjsip/sip_errno.h> +#include <pj/addr_resolv.h> +#include <pj/assert.h> +#include <pj/lock.h> +#include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> +#include <pj/sock.h> +#include <pj/compat/socket.h> +#include <pj/string.h> + + +#define THIS_FILE "sip_transport_udp.c" + +/** + * These are the target values for socket send and receive buffer sizes, + * respectively. They will be applied to UDP socket with setsockopt(). + * When transport failed to set these size, it will decrease it until + * sufficiently large number has been successfully set. + * + * The buffer size is important, especially in WinXP/2000 machines. + * Basicly the lower the size, the more packets will be lost (dropped?) + * when we're sending (receiving?) packets in large volumes. + * + * The figure here is taken based on my experiment on WinXP/2000 machine, + * and with this value, the rate of dropped packet is about 8% when + * sending 1800 requests simultaneously (percentage taken as average + * after 50K requests or so). + * + * More experiments are needed probably. + */ +/* 2010/01/14 + * Too many people complained about seeing "Error setting SNDBUF" log, + * so lets just remove this. People who want to have SNDBUF set can + * still do so by declaring these two macros in config_site.h + */ +#ifndef PJSIP_UDP_SO_SNDBUF_SIZE +/*# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)*/ +# define PJSIP_UDP_SO_SNDBUF_SIZE 0 +#endif + +#ifndef PJSIP_UDP_SO_RCVBUF_SIZE +/*# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)*/ +# define PJSIP_UDP_SO_RCVBUF_SIZE 0 +#endif + + +/* Struct udp_transport "inherits" struct pjsip_transport */ +struct udp_transport +{ + pjsip_transport base; + pj_sock_t sock; + pj_ioqueue_key_t *key; + int rdata_cnt; + pjsip_rx_data **rdata; + int is_closing; + pj_bool_t is_paused; +}; + + +/* + * Initialize transport's receive buffer from the specified pool. + */ +static void init_rdata(struct udp_transport *tp, unsigned rdata_index, + pj_pool_t *pool, pjsip_rx_data **p_rdata) +{ + pjsip_rx_data *rdata; + + /* Reset pool. */ + //note: already done by caller + //pj_pool_reset(pool); + + rdata = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data); + + /* Init tp_info part. */ + rdata->tp_info.pool = pool; + rdata->tp_info.transport = &tp->base; + rdata->tp_info.tp_data = (void*)(long)rdata_index; + rdata->tp_info.op_key.rdata = rdata; + pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key, + sizeof(pj_ioqueue_op_key_t)); + + tp->rdata[rdata_index] = rdata; + + if (p_rdata) + *p_rdata = rdata; +} + + +/* + * udp_on_read_complete() + * + * This is callback notification from ioqueue that a pending recvfrom() + * operation has completed. + */ +static void udp_on_read_complete( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + /* See https://trac.pjsip.org/repos/ticket/1197 */ + enum { MAX_IMMEDIATE_PACKET = 50 }; + pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key; + pjsip_rx_data *rdata = rdata_op_key->rdata; + struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport; + int i; + pj_status_t status; + + /* Don't do anything if transport is closing. */ + if (tp->is_closing) { + tp->is_closing++; + return; + } + + /* Don't do anything if transport is being paused. */ + if (tp->is_paused) + return; + + /* + * The idea of the loop is to process immediate data received by + * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When + * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to + * complete asynchronously, to allow other sockets to get their data. + */ + for (i=0;; ++i) { + enum { MIN_SIZE = 32 }; + pj_uint32_t flags; + + /* Report the packet to transport manager. Only do so if packet size + * is relatively big enough for a SIP packet. + */ + if (bytes_read > MIN_SIZE) { + pj_size_t size_eaten; + const pj_sockaddr *src_addr = &rdata->pkt_info.src_addr; + + /* Init pkt_info part. */ + rdata->pkt_info.len = bytes_read; + rdata->pkt_info.zero = 0; + pj_gettimeofday(&rdata->pkt_info.timestamp); + if (src_addr->addr.sa_family == pj_AF_INET()) { + pj_ansi_strcpy(rdata->pkt_info.src_name, + pj_inet_ntoa(src_addr->ipv4.sin_addr)); + rdata->pkt_info.src_port = pj_ntohs(src_addr->ipv4.sin_port); + } else { + pj_inet_ntop(pj_AF_INET6(), + pj_sockaddr_get_addr(&rdata->pkt_info.src_addr), + rdata->pkt_info.src_name, + sizeof(rdata->pkt_info.src_name)); + rdata->pkt_info.src_port = pj_ntohs(src_addr->ipv6.sin6_port); + } + + size_eaten = + pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, + rdata); + + if (size_eaten < 0) { + pj_assert(!"It shouldn't happen!"); + size_eaten = rdata->pkt_info.len; + } + + /* Since this is UDP, the whole buffer is the message. */ + rdata->pkt_info.len = 0; + + } else if (bytes_read <= MIN_SIZE) { + + /* TODO: */ + + } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && + -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && + -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) + { + + /* Report error to endpoint. */ + PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, + rdata->tp_info.transport->obj_name, + -bytes_read, + "Warning: pj_ioqueue_recvfrom()" + " callback error")); + } + + if (i >= MAX_IMMEDIATE_PACKET) { + /* Force ioqueue_recvfrom() to return PJ_EPENDING */ + flags = PJ_IOQUEUE_ALWAYS_ASYNC; + } else { + flags = 0; + } + + /* Reset pool. + * Need to copy rdata fields to temp variable because they will + * be invalid after pj_pool_reset(). + */ + { + pj_pool_t *rdata_pool = rdata->tp_info.pool; + struct udp_transport *rdata_tp ; + unsigned rdata_index; + + rdata_tp = (struct udp_transport*)rdata->tp_info.transport; + rdata_index = (unsigned)(unsigned long)rdata->tp_info.tp_data; + + pj_pool_reset(rdata_pool); + init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata); + + /* Change some vars to point to new location after + * pool reset. + */ + op_key = &rdata->tp_info.op_key.op_key; + } + + /* Only read next packet if transport is not being paused. This + * check handles the case where transport is paused while endpoint + * is still processing a SIP message. + */ + if (tp->is_paused) + return; + + /* Read next packet. */ + bytes_read = sizeof(rdata->pkt_info.packet); + rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr); + status = pj_ioqueue_recvfrom(key, op_key, + rdata->pkt_info.packet, + &bytes_read, flags, + &rdata->pkt_info.src_addr, + &rdata->pkt_info.src_addr_len); + + if (status == PJ_SUCCESS) { + /* Continue loop. */ + pj_assert(i < MAX_IMMEDIATE_PACKET); + + } else if (status == PJ_EPENDING) { + break; + + } else { + + if (i < MAX_IMMEDIATE_PACKET) { + + /* Report error to endpoint if this is not EWOULDBLOCK error.*/ + if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) && + status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) && + status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET)) + { + + PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, + rdata->tp_info.transport->obj_name, + status, + "Warning: pj_ioqueue_recvfrom")); + } + + /* Continue loop. */ + bytes_read = 0; + } else { + /* This is fatal error. + * Ioqueue operation will stop for this transport! + */ + PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt, + rdata->tp_info.transport->obj_name, + status, + "FATAL: pj_ioqueue_recvfrom() error, " + "UDP transport stopping! Error")); + break; + } + } + } +} + +/* + * udp_on_write_complete() + * + * This is callback notification from ioqueue that a pending sendto() + * operation has completed. + */ +static void udp_on_write_complete( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_sent) +{ + struct udp_transport *tp = (struct udp_transport*) + pj_ioqueue_get_user_data(key); + pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key; + + tdata_op_key->tdata = NULL; + + if (tdata_op_key->callback) { + tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent); + } +} + +/* + * udp_send_msg() + * + * This function is called by transport manager (by transport->send_msg()) + * to send outgoing message. + */ +static pj_status_t udp_send_msg( pjsip_transport *transport, + pjsip_tx_data *tdata, + const pj_sockaddr_t *rem_addr, + int addr_len, + void *token, + pjsip_transport_callback callback) +{ + struct udp_transport *tp = (struct udp_transport*)transport; + pj_ssize_t size; + pj_status_t status; + + PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL); + PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX); + + /* Return error if transport is paused */ + if (tp->is_paused) + return PJSIP_ETPNOTAVAIL; + + /* Init op key. */ + tdata->op_key.tdata = tdata; + tdata->op_key.token = token; + tdata->op_key.callback = callback; + + /* Send to ioqueue! */ + size = tdata->buf.cur - tdata->buf.start; + status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key, + tdata->buf.start, &size, 0, + rem_addr, addr_len); + + if (status != PJ_EPENDING) + tdata->op_key.tdata = NULL; + + return status; +} + +/* + * udp_destroy() + * + * This function is called by transport manager (by transport->destroy()). + */ +static pj_status_t udp_destroy( pjsip_transport *transport ) +{ + struct udp_transport *tp = (struct udp_transport*)transport; + int i; + + /* Mark this transport as closing. */ + tp->is_closing = 1; + + /* Cancel all pending operations. */ + /* blp: NO NO NO... + * No need to post queued completion as we poll the ioqueue until + * we've got events anyway. Posting completion will only cause + * callback to be called twice with IOCP: one for the post completion + * and another one for closing the socket. + * + for (i=0; i<tp->rdata_cnt; ++i) { + pj_ioqueue_post_completion(tp->key, + &tp->rdata[i]->tp_info.op_key.op_key, -1); + } + */ + + /* Unregister from ioqueue. */ + if (tp->key) { + pj_ioqueue_unregister(tp->key); + tp->key = NULL; + } else { + /* Close socket. */ + if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { + pj_sock_close(tp->sock); + tp->sock = PJ_INVALID_SOCKET; + } + } + + /* Must poll ioqueue because IOCP calls the callback when socket + * is closed. We poll the ioqueue until all pending callbacks + * have been called. + */ + for (i=0; i<50 && tp->is_closing < 1+tp->rdata_cnt; ++i) { + int cnt; + pj_time_val timeout = {0, 1}; + + cnt = pj_ioqueue_poll(pjsip_endpt_get_ioqueue(transport->endpt), + &timeout); + if (cnt == 0) + break; + } + + /* Destroy rdata */ + for (i=0; i<tp->rdata_cnt; ++i) { + pj_pool_release(tp->rdata[i]->tp_info.pool); + } + + /* Destroy reference counter. */ + if (tp->base.ref_cnt) + pj_atomic_destroy(tp->base.ref_cnt); + + /* Destroy lock */ + if (tp->base.lock) + pj_lock_destroy(tp->base.lock); + + /* Destroy pool. */ + pjsip_endpt_release_pool(tp->base.endpt, tp->base.pool); + + return PJ_SUCCESS; +} + + +/* + * udp_shutdown() + * + * Start graceful UDP shutdown. + */ +static pj_status_t udp_shutdown(pjsip_transport *transport) +{ + return pjsip_transport_dec_ref(transport); +} + + +/* Create socket */ +static pj_status_t create_socket(int af, const pj_sockaddr_t *local_a, + int addr_len, pj_sock_t *p_sock) +{ + pj_sock_t sock; + pj_sockaddr_in tmp_addr; + pj_sockaddr_in6 tmp_addr6; + pj_status_t status; + + status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &sock); + if (status != PJ_SUCCESS) + return status; + + if (local_a == NULL) { + if (af == pj_AF_INET6()) { + pj_bzero(&tmp_addr6, sizeof(tmp_addr6)); + tmp_addr6.sin6_family = (pj_uint16_t)af; + local_a = &tmp_addr6; + addr_len = sizeof(tmp_addr6); + } else { + pj_sockaddr_in_init(&tmp_addr, NULL, 0); + local_a = &tmp_addr; + addr_len = sizeof(tmp_addr); + } + } + + status = pj_sock_bind(sock, local_a, addr_len); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + return status; + } + + *p_sock = sock; + return PJ_SUCCESS; +} + + +/* Generate transport's published address */ +static pj_status_t get_published_name(pj_sock_t sock, + char hostbuf[], + int hostbufsz, + pjsip_host_port *bound_name) +{ + pj_sockaddr tmp_addr; + int addr_len; + pj_status_t status; + + addr_len = sizeof(tmp_addr); + status = pj_sock_getsockname(sock, &tmp_addr, &addr_len); + if (status != PJ_SUCCESS) + return status; + + bound_name->host.ptr = hostbuf; + if (tmp_addr.addr.sa_family == pj_AF_INET()) { + bound_name->port = pj_ntohs(tmp_addr.ipv4.sin_port); + + /* If bound address specifies "0.0.0.0", get the IP address + * of local hostname. + */ + if (tmp_addr.ipv4.sin_addr.s_addr == PJ_INADDR_ANY) { + pj_sockaddr hostip; + + status = pj_gethostip(pj_AF_INET(), &hostip); + if (status != PJ_SUCCESS) + return status; + + pj_strcpy2(&bound_name->host, pj_inet_ntoa(hostip.ipv4.sin_addr)); + } else { + /* Otherwise use bound address. */ + pj_strcpy2(&bound_name->host, + pj_inet_ntoa(tmp_addr.ipv4.sin_addr)); + status = PJ_SUCCESS; + } + + } else { + /* If bound address specifies "INADDR_ANY" (IPv6), get the + * IP address of local hostname + */ + pj_uint32_t loop6[4] = { 0, 0, 0, 0}; + + bound_name->port = pj_ntohs(tmp_addr.ipv6.sin6_port); + + if (pj_memcmp(&tmp_addr.ipv6.sin6_addr, loop6, sizeof(loop6))==0) { + status = pj_gethostip(tmp_addr.addr.sa_family, &tmp_addr); + if (status != PJ_SUCCESS) + return status; + } + + status = pj_inet_ntop(tmp_addr.addr.sa_family, + pj_sockaddr_get_addr(&tmp_addr), + hostbuf, hostbufsz); + if (status == PJ_SUCCESS) { + bound_name->host.slen = pj_ansi_strlen(hostbuf); + } + } + + + return status; +} + +/* Set the published address of the transport */ +static void udp_set_pub_name(struct udp_transport *tp, + const pjsip_host_port *a_name) +{ + enum { INFO_LEN = 80 }; + char local_addr[PJ_INET6_ADDRSTRLEN+10]; + + pj_assert(a_name->host.slen != 0); + pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host, + &a_name->host); + tp->base.local_name.port = a_name->port; + + /* Update transport info. */ + if (tp->base.info == NULL) { + tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN); + } + + pj_sockaddr_print(&tp->base.local_addr, local_addr, sizeof(local_addr), 3); + + pj_ansi_snprintf( + tp->base.info, INFO_LEN, "udp %s [published as %s:%d]", + local_addr, + tp->base.local_name.host.ptr, + tp->base.local_name.port); +} + +/* Set the socket handle of the transport */ +static void udp_set_socket(struct udp_transport *tp, + pj_sock_t sock, + const pjsip_host_port *a_name) +{ +#if PJSIP_UDP_SO_RCVBUF_SIZE || PJSIP_UDP_SO_SNDBUF_SIZE + long sobuf_size; + pj_status_t status; +#endif + + /* Adjust socket rcvbuf size */ +#if PJSIP_UDP_SO_RCVBUF_SIZE + sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE; + status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_RCVBUF(), + &sobuf_size, sizeof(sobuf_size)); + if (status != PJ_SUCCESS) { + char errmsg[PJ_ERR_MSG_SIZE]; + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "Error setting SO_RCVBUF: %s [%d]", errmsg, + status)); + } +#endif + + /* Adjust socket sndbuf size */ +#if PJSIP_UDP_SO_SNDBUF_SIZE + sobuf_size = PJSIP_UDP_SO_SNDBUF_SIZE; + status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_SNDBUF(), + &sobuf_size, sizeof(sobuf_size)); + if (status != PJ_SUCCESS) { + char errmsg[PJ_ERR_MSG_SIZE]; + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "Error setting SO_SNDBUF: %s [%d]", errmsg, + status)); + } +#endif + + /* Set the socket. */ + tp->sock = sock; + + /* Init address name (published address) */ + udp_set_pub_name(tp, a_name); +} + +/* Register socket to ioqueue */ +static pj_status_t register_to_ioqueue(struct udp_transport *tp) +{ + pj_ioqueue_t *ioqueue; + pj_ioqueue_callback ioqueue_cb; + + /* Ignore if already registered */ + if (tp->key != NULL) + return PJ_SUCCESS; + + /* Register to ioqueue. */ + ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt); + pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); + ioqueue_cb.on_read_complete = &udp_on_read_complete; + ioqueue_cb.on_write_complete = &udp_on_write_complete; + + return pj_ioqueue_register_sock(tp->base.pool, ioqueue, tp->sock, tp, + &ioqueue_cb, &tp->key); +} + +/* Start ioqueue asynchronous reading to all rdata */ +static pj_status_t start_async_read(struct udp_transport *tp) +{ + pj_ioqueue_t *ioqueue; + int i; + pj_status_t status; + + ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt); + + /* Start reading the ioqueue. */ + for (i=0; i<tp->rdata_cnt; ++i) { + pj_ssize_t size; + + size = sizeof(tp->rdata[i]->pkt_info.packet); + tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr); + status = pj_ioqueue_recvfrom(tp->key, + &tp->rdata[i]->tp_info.op_key.op_key, + tp->rdata[i]->pkt_info.packet, + &size, PJ_IOQUEUE_ALWAYS_ASYNC, + &tp->rdata[i]->pkt_info.src_addr, + &tp->rdata[i]->pkt_info.src_addr_len); + if (status == PJ_SUCCESS) { + pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!"); + udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key, + size); + } else if (status != PJ_EPENDING) { + /* Error! */ + return status; + } + } + + return PJ_SUCCESS; +} + + +/* + * pjsip_udp_transport_attach() + * + * Attach UDP socket and start transport. + */ +static pj_status_t transport_attach( pjsip_endpoint *endpt, + pjsip_transport_type_e type, + pj_sock_t sock, + const pjsip_host_port *a_name, + unsigned async_cnt, + pjsip_transport **p_transport) +{ + pj_pool_t *pool; + struct udp_transport *tp; + const char *format, *ipv6_quoteb, *ipv6_quotee; + unsigned i; + pj_status_t status; + + PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0, + PJ_EINVAL); + + /* Object name. */ + if (type & PJSIP_TRANSPORT_IPV6) { + format = "udpv6%p"; + ipv6_quoteb = "["; + ipv6_quotee = "]"; + } else { + format = "udp%p"; + ipv6_quoteb = ipv6_quotee = ""; + } + + /* Create pool. */ + pool = pjsip_endpt_create_pool(endpt, format, PJSIP_POOL_LEN_TRANSPORT, + PJSIP_POOL_INC_TRANSPORT); + if (!pool) + return PJ_ENOMEM; + + /* Create the UDP transport object. */ + tp = PJ_POOL_ZALLOC_T(pool, struct udp_transport); + + /* Save pool. */ + tp->base.pool = pool; + + pj_memcpy(tp->base.obj_name, pool->obj_name, PJ_MAX_OBJ_NAME); + + /* Init reference counter. */ + status = pj_atomic_create(pool, 0, &tp->base.ref_cnt); + if (status != PJ_SUCCESS) + goto on_error; + + /* Init lock. */ + status = pj_lock_create_recursive_mutex(pool, pool->obj_name, + &tp->base.lock); + if (status != PJ_SUCCESS) + goto on_error; + + /* Set type. */ + tp->base.key.type = type; + + /* Remote address is left zero (except the family) */ + tp->base.key.rem_addr.addr.sa_family = (pj_uint16_t) + ((type & PJSIP_TRANSPORT_IPV6) ? pj_AF_INET6() : pj_AF_INET()); + + /* Type name. */ + tp->base.type_name = "UDP"; + + /* Transport flag */ + tp->base.flag = pjsip_transport_get_flag_from_type(type); + + + /* Length of addressess. */ + tp->base.addr_len = sizeof(tp->base.local_addr); + + /* Init local address. */ + status = pj_sock_getsockname(sock, &tp->base.local_addr, + &tp->base.addr_len); + if (status != PJ_SUCCESS) + goto on_error; + + /* Init remote name. */ + if (type == PJSIP_TRANSPORT_UDP) + tp->base.remote_name.host = pj_str("0.0.0.0"); + else + tp->base.remote_name.host = pj_str("::0"); + tp->base.remote_name.port = 0; + + /* Init direction */ + tp->base.dir = PJSIP_TP_DIR_NONE; + + /* Set endpoint. */ + tp->base.endpt = endpt; + + /* Transport manager and timer will be initialized by tpmgr */ + + /* Attach socket and assign name. */ + udp_set_socket(tp, sock, a_name); + + /* Register to ioqueue */ + status = register_to_ioqueue(tp); + if (status != PJ_SUCCESS) + goto on_error; + + /* Set functions. */ + tp->base.send_msg = &udp_send_msg; + tp->base.do_shutdown = &udp_shutdown; + tp->base.destroy = &udp_destroy; + + /* This is a permanent transport, so we initialize the ref count + * to one so that transport manager don't destroy this transport + * when there's no user! + */ + pj_atomic_inc(tp->base.ref_cnt); + + /* Register to transport manager. */ + tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt); + status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp); + if (status != PJ_SUCCESS) + goto on_error; + + + /* Create rdata and put it in the array. */ + tp->rdata_cnt = 0; + tp->rdata = (pjsip_rx_data**) + pj_pool_calloc(tp->base.pool, async_cnt, + sizeof(pjsip_rx_data*)); + for (i=0; i<async_cnt; ++i) { + pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p", + PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC); + if (!rdata_pool) { + pj_atomic_set(tp->base.ref_cnt, 0); + pjsip_transport_destroy(&tp->base); + return PJ_ENOMEM; + } + + init_rdata(tp, i, rdata_pool, NULL); + tp->rdata_cnt++; + } + + /* Start reading the ioqueue. */ + status = start_async_read(tp); + if (status != PJ_SUCCESS) { + pjsip_transport_destroy(&tp->base); + return status; + } + + /* Done. */ + if (p_transport) + *p_transport = &tp->base; + + PJ_LOG(4,(tp->base.obj_name, + "SIP %s started, published address is %s%.*s%s:%d", + pjsip_transport_get_type_desc((pjsip_transport_type_e)tp->base.key.type), + ipv6_quoteb, + (int)tp->base.local_name.host.slen, + tp->base.local_name.host.ptr, + ipv6_quotee, + tp->base.local_name.port)); + + return PJ_SUCCESS; + +on_error: + udp_destroy((pjsip_transport*)tp); + return status; +} + + +PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt, + pj_sock_t sock, + const pjsip_host_port *a_name, + unsigned async_cnt, + pjsip_transport **p_transport) +{ + return transport_attach(endpt, PJSIP_TRANSPORT_UDP, sock, a_name, + async_cnt, p_transport); +} + +PJ_DEF(pj_status_t) pjsip_udp_transport_attach2( pjsip_endpoint *endpt, + pjsip_transport_type_e type, + pj_sock_t sock, + const pjsip_host_port *a_name, + unsigned async_cnt, + pjsip_transport **p_transport) +{ + return transport_attach(endpt, type, sock, a_name, + async_cnt, p_transport); +} + +/* + * pjsip_udp_transport_start() + * + * Create a UDP socket in the specified address and start a transport. + */ +PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt, + const pj_sockaddr_in *local_a, + const pjsip_host_port *a_name, + unsigned async_cnt, + pjsip_transport **p_transport) +{ + pj_sock_t sock; + pj_status_t status; + char addr_buf[PJ_INET6_ADDRSTRLEN]; + pjsip_host_port bound_name; + + PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); + + status = create_socket(pj_AF_INET(), local_a, sizeof(pj_sockaddr_in), + &sock); + if (status != PJ_SUCCESS) + return status; + + if (a_name == NULL) { + /* Address name is not specified. + * Build a name based on bound address. + */ + status = get_published_name(sock, addr_buf, sizeof(addr_buf), + &bound_name); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + return status; + } + + a_name = &bound_name; + } + + return pjsip_udp_transport_attach( endpt, sock, a_name, async_cnt, + p_transport ); +} + + +/* + * pjsip_udp_transport_start() + * + * Create a UDP socket in the specified address and start a transport. + */ +PJ_DEF(pj_status_t) pjsip_udp_transport_start6(pjsip_endpoint *endpt, + const pj_sockaddr_in6 *local_a, + const pjsip_host_port *a_name, + unsigned async_cnt, + pjsip_transport **p_transport) +{ + pj_sock_t sock; + pj_status_t status; + char addr_buf[PJ_INET6_ADDRSTRLEN]; + pjsip_host_port bound_name; + + PJ_ASSERT_RETURN(endpt && async_cnt, PJ_EINVAL); + + status = create_socket(pj_AF_INET6(), local_a, sizeof(pj_sockaddr_in6), + &sock); + if (status != PJ_SUCCESS) + return status; + + if (a_name == NULL) { + /* Address name is not specified. + * Build a name based on bound address. + */ + status = get_published_name(sock, addr_buf, sizeof(addr_buf), + &bound_name); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + return status; + } + + a_name = &bound_name; + } + + return pjsip_udp_transport_attach2(endpt, PJSIP_TRANSPORT_UDP6, + sock, a_name, async_cnt, p_transport); +} + +/* + * Retrieve the internal socket handle used by the UDP transport. + */ +PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport) +{ + struct udp_transport *tp; + + PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET); + + tp = (struct udp_transport*) transport; + + return tp->sock; +} + + +/* + * Temporarily pause or shutdown the transport. + */ +PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport, + unsigned option) +{ + struct udp_transport *tp; + unsigned i; + + PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL); + + /* Flag must be specified */ + PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL); + + tp = (struct udp_transport*) transport; + + /* Transport must not have been paused */ + PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP); + + /* Set transport to paused first, so that when the read callback is + * called by pj_ioqueue_post_completion() it will not try to + * re-register the rdata. + */ + tp->is_paused = PJ_TRUE; + + /* Cancel the ioqueue operation. */ + for (i=0; i<(unsigned)tp->rdata_cnt; ++i) { + pj_ioqueue_post_completion(tp->key, + &tp->rdata[i]->tp_info.op_key.op_key, -1); + } + + /* Destroy the socket? */ + if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) { + if (tp->key) { + /* This implicitly closes the socket */ + pj_ioqueue_unregister(tp->key); + tp->key = NULL; + } else { + /* Close socket. */ + if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { + pj_sock_close(tp->sock); + tp->sock = PJ_INVALID_SOCKET; + } + } + tp->sock = PJ_INVALID_SOCKET; + + } + + PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused")); + + return PJ_SUCCESS; +} + + +/* + * Restart transport. + * + * If option is KEEP_SOCKET, just re-activate ioqueue operation. + * + * If option is DESTROY_SOCKET: + * - if socket is specified, replace. + * - if socket is not specified, create and replace. + */ +PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport, + unsigned option, + pj_sock_t sock, + const pj_sockaddr_in *local, + const pjsip_host_port *a_name) +{ + struct udp_transport *tp; + pj_status_t status; + + PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL); + /* Flag must be specified */ + PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL); + + tp = (struct udp_transport*) transport; + + if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) { + char addr_buf[PJ_INET6_ADDRSTRLEN]; + pjsip_host_port bound_name; + + /* Request to recreate transport */ + + /* Destroy existing socket, if any. */ + if (tp->key) { + /* This implicitly closes the socket */ + pj_ioqueue_unregister(tp->key); + tp->key = NULL; + } else { + /* Close socket. */ + if (tp->sock && tp->sock != PJ_INVALID_SOCKET) { + pj_sock_close(tp->sock); + tp->sock = PJ_INVALID_SOCKET; + } + } + tp->sock = PJ_INVALID_SOCKET; + + /* Create the socket if it's not specified */ + if (sock == PJ_INVALID_SOCKET) { + status = create_socket(pj_AF_INET(), local, + sizeof(pj_sockaddr_in), &sock); + if (status != PJ_SUCCESS) + return status; + } + + /* If transport published name is not specified, calculate it + * from the bound address. + */ + if (a_name == NULL) { + status = get_published_name(sock, addr_buf, sizeof(addr_buf), + &bound_name); + if (status != PJ_SUCCESS) { + pj_sock_close(sock); + return status; + } + + a_name = &bound_name; + } + + /* Assign the socket and published address to transport. */ + udp_set_socket(tp, sock, a_name); + + } else { + + /* For KEEP_SOCKET, transport must have been paused before */ + PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP); + + /* If address name is specified, update it */ + if (a_name != NULL) + udp_set_pub_name(tp, a_name); + } + + /* Re-register new or existing socket to ioqueue. */ + status = register_to_ioqueue(tp); + if (status != PJ_SUCCESS) { + return status; + } + + /* Restart async read operation. */ + status = start_async_read(tp); + if (status != PJ_SUCCESS) + return status; + + /* Everything has been set up */ + tp->is_paused = PJ_FALSE; + + PJ_LOG(4,(tp->base.obj_name, + "SIP UDP transport restarted, published address is %.*s:%d", + (int)tp->base.local_name.host.slen, + tp->base.local_name.host.ptr, + tp->base.local_name.port)); + + return PJ_SUCCESS; +} + |