summaryrefslogtreecommitdiff
path: root/pjmedia/src/pjmedia/transport_udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'pjmedia/src/pjmedia/transport_udp.c')
-rw-r--r--pjmedia/src/pjmedia/transport_udp.c487
1 files changed, 487 insertions, 0 deletions
diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c
new file mode 100644
index 00000000..c1936cfe
--- /dev/null
+++ b/pjmedia/src/pjmedia/transport_udp.c
@@ -0,0 +1,487 @@
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjmedia/transport_udp.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/ioqueue.h>
+#include <pj/log.h>
+#include <pj/string.h>
+
+
+/* Maximum size of incoming RTP packet */
+#define RTP_LEN 1500
+
+/* Maximum size of incoming RTCP packet */
+#define RTCP_LEN 600
+
+
+struct transport_udp
+{
+ pjmedia_transport base; /**< Base transport. */
+
+ pj_pool_t *pool; /**< Memory pool */
+
+ pjmedia_stream *stream; /**< Stream user (may be NULL) */
+ pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */
+ pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */
+ void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */
+ const void*,
+ pj_ssize_t);
+ void (*rtcp_cb)( pjmedia_stream*,/**< To report incoming RTCP. */
+ const void*,
+ pj_ssize_t);
+
+ pj_sock_t rtp_sock; /**< RTP socket */
+ pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
+ pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
+ pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */
+ pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */
+ unsigned rtp_src_cnt; /**< How many pkt from this addr. */
+ int rtp_addrlen; /**< Address length. */
+ char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */
+
+ pj_sock_t rtcp_sock; /**< RTCP socket */
+ pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */
+ pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */
+ pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */
+ char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */
+};
+
+
+
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+static void on_rx_rtcp(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+
+static pj_status_t transport_attach( pjmedia_transport *tp,
+ pjmedia_stream *strm,
+ const pj_sockaddr_t *rem_addr,
+ unsigned addr_len,
+ void (*rtp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t),
+ void (*rtcp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t));
+static void transport_detach( pjmedia_transport *tp,
+ pjmedia_stream *strm);
+static pj_status_t transport_send_rtp( pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+
+
+static pjmedia_transport_op transport_udp_op =
+{
+ &transport_attach,
+ &transport_detach,
+ &transport_send_rtp,
+ &transport_send_rtcp,
+};
+
+
+/**
+ * Create UDP stream transport.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
+ const char *name,
+ int port,
+ pjmedia_transport **p_tp)
+{
+ pjmedia_sock_info si;
+ pj_status_t status;
+
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL);
+
+
+ pj_memset(&si, 0, sizeof(pjmedia_sock_info));
+ si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET;
+
+ /* Create RTP socket */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtp_sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Bind RTP socket */
+ si.rtp_addr_name.sin_family = PJ_AF_INET;
+ si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port);
+
+ status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
+ sizeof(si.rtp_addr_name));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create RTCP socket */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtcp_sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Bind RTCP socket */
+ si.rtcp_addr_name.sin_family = PJ_AF_INET;
+ si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1));
+
+ status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
+ sizeof(si.rtcp_addr_name));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create UDP transport by attaching socket info */
+ return pjmedia_transport_udp_attach( endpt, name, &si, p_tp);
+
+
+on_error:
+ if (si.rtp_sock != PJ_INVALID_SOCKET)
+ pj_sock_close(si.rtp_sock);
+ if (si.rtcp_sock != PJ_INVALID_SOCKET)
+ pj_sock_close(si.rtcp_sock);
+ return status;
+}
+
+
+/**
+ * Create UDP stream transport from existing socket info.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
+ const char *name,
+ const pjmedia_sock_info *si,
+ pjmedia_transport **p_tp)
+{
+ struct transport_udp *tp;
+ pj_pool_t *pool;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback rtp_cb, rtcp_cb;
+ pj_ssize_t size;
+ pj_status_t status;
+
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL);
+
+ /* Check name */
+ if (!name)
+ name = "udpmedia";
+
+ /* Get ioqueue instance */
+ ioqueue = pjmedia_endpt_get_ioqueue(endpt);
+
+
+ /* Create transport structure */
+ pool = pjmedia_endpt_create_pool(endpt, name, 4000, 4000);
+ if (!pool)
+ return PJ_ENOMEM;
+
+ tp = pj_pool_zalloc(pool, sizeof(struct transport_udp));
+ tp->pool = pool;
+ pj_ansi_strcpy(tp->base.name, name);
+ tp->base.op = &transport_udp_op;
+
+ /* Copy socket infos */
+ tp->rtp_sock = si->rtp_sock;
+ tp->rtcp_sock = si->rtcp_sock;
+
+
+ /* Setup RTP socket with the ioqueue */
+ pj_memset(&rtp_cb, 0, sizeof(rtp_cb));
+ rtp_cb.on_read_complete = &on_rx_rtp;
+
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp,
+ &rtp_cb, &tp->rtp_key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op));
+ pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
+
+ /* Kick of pending RTP read from the ioqueue */
+ tp->rtp_addrlen = sizeof(tp->rtp_src_addr);
+ size = sizeof(tp->rtp_pkt);
+ status = pj_ioqueue_recvfrom(tp->rtp_key, &tp->rtp_read_op,
+ tp->rtp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC,
+ &tp->rtp_src_addr, &tp->rtp_addrlen);
+ if (status != PJ_EPENDING)
+ goto on_error;
+
+
+ /* Setup RTCP socket with ioqueue */
+ pj_memset(&rtcp_cb, 0, sizeof(rtcp_cb));
+ rtcp_cb.on_read_complete = &on_rx_rtcp;
+
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp,
+ &rtcp_cb, &tp->rtcp_key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ pj_ioqueue_op_key_init(&tp->rtcp_read_op, sizeof(tp->rtcp_read_op));
+ pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
+
+
+ /* Kick of pending RTCP read from the ioqueue */
+ size = sizeof(tp->rtcp_pkt);
+ status = pj_ioqueue_recv(tp->rtcp_key, &tp->rtcp_read_op,
+ tp->rtcp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC);
+ if (status != PJ_EPENDING)
+ goto on_error;
+
+
+ /* Done */
+ *p_tp = &tp->base;
+ return PJ_SUCCESS;
+
+
+on_error:
+ pjmedia_transport_udp_close(&tp->base);
+ return status;
+}
+
+
+/**
+ * Close UDP transport.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp)
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(tp, PJ_EINVAL);
+
+ /* Must not close while stream is using this */
+ PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+
+
+ if (udp->rtp_key) {
+ pj_ioqueue_unregister(udp->rtp_key);
+ udp->rtp_key = NULL;
+ } else if (udp->rtp_sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(udp->rtp_sock);
+ udp->rtp_sock = PJ_INVALID_SOCKET;
+ }
+
+ if (udp->rtcp_key) {
+ pj_ioqueue_unregister(udp->rtcp_key);
+ udp->rtcp_key = NULL;
+ } else if (udp->rtcp_sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(udp->rtcp_sock);
+ udp->rtcp_sock = PJ_INVALID_SOCKET;
+ }
+
+ pj_pool_release(udp->pool);
+
+ return PJ_SUCCESS;
+}
+
+
+/* Notification from ioqueue about incoming RTP packet */
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ struct transport_udp *udp;
+ pj_status_t status;
+
+ PJ_UNUSED_ARG(op_key);
+
+ udp = pj_ioqueue_get_user_data(key);
+
+ do {
+ void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
+ pjmedia_stream *stream;
+
+ cb = udp->rtp_cb;
+ stream = udp->stream;
+
+ if (bytes_read > 0 && cb && stream)
+ (*cb)(stream, udp->rtp_pkt, bytes_read);
+
+ /* See if source address of RTP packet is different than the
+ * configured address.
+ */
+ if ((udp->rem_rtp_addr.sin_addr.s_addr !=
+ udp->rtp_src_addr.sin_addr.s_addr) ||
+ (udp->rem_rtp_addr.sin_port !=
+ udp->rtp_src_addr.sin_port))
+ {
+ udp->rtp_src_cnt++;
+
+ if (udp->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
+
+ udp->rem_rtp_addr = udp->rtp_src_addr;
+ udp->rtp_src_cnt = 0;
+
+ PJ_LOG(4,(udp->base.name,
+ "Remote RTP address switched to %s:%d",
+ pj_inet_ntoa(udp->rtp_src_addr.sin_addr),
+ pj_ntohs(udp->rtp_src_addr.sin_port)));
+ }
+ }
+
+ bytes_read = sizeof(udp->rtp_pkt);
+ udp->rtp_addrlen = sizeof(pj_sockaddr_in);
+ status = pj_ioqueue_recvfrom(udp->rtp_key, &udp->rtp_read_op,
+ udp->rtp_pkt, &bytes_read, 0,
+ &udp->rtp_src_addr,
+ &udp->rtp_addrlen);
+
+ } while (status == PJ_SUCCESS);
+}
+
+
+/* Notification from ioqueue about incoming RTCP packet */
+static void on_rx_rtcp(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ struct transport_udp *udp;
+ pj_status_t status;
+
+ PJ_UNUSED_ARG(op_key);
+
+ udp = pj_ioqueue_get_user_data(key);
+
+ do {
+ void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
+ pjmedia_stream *stream;
+
+ cb = udp->rtcp_cb;
+ stream = udp->stream;
+
+ if (bytes_read > 0 && cb && stream)
+ (*cb)(stream, udp->rtcp_pkt, bytes_read);
+
+ bytes_read = sizeof(udp->rtcp_pkt);
+ status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op,
+ udp->rtcp_pkt, &bytes_read, 0);
+
+ } while (status == PJ_SUCCESS);
+}
+
+
+/* Called by stream to initialize the transport */
+static pj_status_t transport_attach( pjmedia_transport *tp,
+ pjmedia_stream *strm,
+ const pj_sockaddr_t *rem_addr,
+ unsigned addr_len,
+ void (*rtp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t),
+ void (*rtcp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t))
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ /* Validate arguments */
+ PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL);
+
+ /* Remote address must be Internet address */
+ PJ_ASSERT_RETURN(addr_len == sizeof(pj_sockaddr_in) &&
+ ((pj_sockaddr_in*)rem_addr)->sin_family == PJ_AF_INET,
+ PJ_EINVAL);
+
+ /* Must not be "attached" to existing stream */
+ PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+
+ /* "Attach" the stream: */
+
+ /* Copy remote RTP address */
+ pj_memcpy(&udp->rem_rtp_addr, rem_addr, sizeof(pj_sockaddr_in));
+
+ /* Guess RTCP address from RTP address */
+ pj_memcpy(&udp->rem_rtcp_addr, rem_addr, sizeof(pj_sockaddr_in));
+ udp->rem_rtcp_addr.sin_port = (pj_uint16_t) pj_htons((pj_uint16_t)(
+ pj_ntohs(udp->rem_rtp_addr.sin_port)+1));
+
+ /* Save the callbacks */
+ udp->rtp_cb = rtp_cb;
+ udp->rtcp_cb = rtcp_cb;
+
+ /* Last, save the stream to mark that we have a "client" */
+ udp->stream = strm;
+
+ return PJ_SUCCESS;
+}
+
+
+/* Called by stream when it no longer needs the transport */
+static void transport_detach( pjmedia_transport *tp,
+ pjmedia_stream *strm)
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ pj_assert(tp && strm);
+
+ /* Clear up stream infos from transport */
+ udp->stream = NULL;
+ udp->rtp_cb = NULL;
+ udp->rtcp_cb = NULL;
+}
+
+
+/* Called by stream to send RTP packet */
+static pj_status_t transport_send_rtp( pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size)
+{
+ struct transport_udp *udp = (struct transport_udp*)tp;
+ pj_ssize_t sent;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+
+ sent = size;
+ status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op,
+ pkt, &sent, 0,
+ &udp->rem_rtp_addr, sizeof(pj_sockaddr_in));
+
+ if (status==PJ_SUCCESS || status==PJ_EPENDING)
+ return PJ_SUCCESS;
+
+ return status;
+}
+
+/* Called by stream to send RTCP packet */
+static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size)
+{
+ struct transport_udp *udp = (struct transport_udp*)tp;
+ pj_ssize_t sent;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+
+ sent = size;
+ status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op,
+ pkt, &sent, 0,
+ &udp->rem_rtcp_addr, sizeof(pj_sockaddr_in));
+
+ if (status==PJ_SUCCESS || status==PJ_EPENDING)
+ return PJ_SUCCESS;
+
+ return status;
+}
+