summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pjmedia/build/Makefile3
-rw-r--r--pjmedia/build/os-win32.mak3
-rw-r--r--pjmedia/build/pjmedia.dsp8
-rw-r--r--pjmedia/include/pjmedia.h1
-rw-r--r--pjmedia/include/pjmedia/session.h18
-rw-r--r--pjmedia/include/pjmedia/stream.h94
-rw-r--r--pjmedia/include/pjmedia/transport_udp.h59
-rw-r--r--pjmedia/src/pjmedia/session.c9
-rw-r--r--pjmedia/src/pjmedia/stream.c434
-rw-r--r--pjmedia/src/pjmedia/transport_udp.c487
-rw-r--r--pjsip-apps/src/samples/simpleua.c15
-rw-r--r--pjsip-apps/src/samples/siprtp.c2
-rw-r--r--pjsip-apps/src/samples/streamutil.c56
-rw-r--r--pjsip/include/pjsua-lib/pjsua.h1
-rw-r--r--pjsip/src/pjsua-lib/pjsua_call.c44
-rw-r--r--pjsip/src/pjsua-lib/pjsua_core.c15
16 files changed, 839 insertions, 410 deletions
diff --git a/pjmedia/build/Makefile b/pjmedia/build/Makefile
index 332aa40b..9dfac98c 100644
--- a/pjmedia/build/Makefile
+++ b/pjmedia/build/Makefile
@@ -69,7 +69,8 @@ export PJMEDIA_OBJS += $(OS_OBJS) $(M_OBJS) $(CC_OBJS) $(HOST_OBJS) \
master_port.o null_port.o plc_common.o plc_g711.o \
port.o resample.o \
resample_port.o rtcp.o rtp.o sdp.o sdp_cmp.o sdp_neg.o \
- session.o silencedet.o sound_port.o stream.o wave.o \
+ session.o silencedet.o sound_port.o stream.o \
+ transport_udp.o wave.o \
$(SOUND_OBJS) $(NULLSOUND_OBJS)
export PJMEDIA_CFLAGS += $(_CFLAGS)
diff --git a/pjmedia/build/os-win32.mak b/pjmedia/build/os-win32.mak
index 9a7c836b..78095336 100644
--- a/pjmedia/build/os-win32.mak
+++ b/pjmedia/build/os-win32.mak
@@ -10,7 +10,8 @@
export PJMEDIA_OBJS += $(PA_DIR)/pa_win_hostapis.o $(PA_DIR)/pa_win_util.o \
$(PA_DIR)/pa_win_wmme.o
-export OS_CFLAGS += -DPA_NO_ASIO -DPA_NO_DS
+export OS_CFLAGS += -DPA_NO_ASIO -DPA_NO_DS \
+ -DPJMEDIA_SOUND_IMPLEMENTATION=PJMEDIA_SOUND_PORTAUDIO_SOUND
# Example:
# to activate Null sound, uncomment these two lines below.
diff --git a/pjmedia/build/pjmedia.dsp b/pjmedia/build/pjmedia.dsp
index af4d9d7d..9e998198 100644
--- a/pjmedia/build/pjmedia.dsp
+++ b/pjmedia/build/pjmedia.dsp
@@ -191,6 +191,10 @@ SOURCE=..\src\pjmedia\stream.c
# End Source File
# Begin Source File
+SOURCE=..\src\pjmedia\transport_udp.c
+# End Source File
+# Begin Source File
+
SOURCE=..\src\pjmedia\wav_player.c
# End Source File
# Begin Source File
@@ -303,6 +307,10 @@ SOURCE=..\include\pjmedia\stream.h
# End Source File
# Begin Source File
+SOURCE=..\include\pjmedia\transport_udp.h
+# End Source File
+# Begin Source File
+
SOURCE=..\include\pjmedia\types.h
# End Source File
# Begin Source File
diff --git a/pjmedia/include/pjmedia.h b/pjmedia/include/pjmedia.h
index c0869e9d..9e363769 100644
--- a/pjmedia/include/pjmedia.h
+++ b/pjmedia/include/pjmedia.h
@@ -43,6 +43,7 @@
#include <pjmedia/sdp_neg.h>
#include <pjmedia/silencedet.h>
#include <pjmedia/session.h>
+#include <pjmedia/transport_udp.h>
#include <pjmedia/sound.h>
#include <pjmedia/sound_port.h>
#include <pjmedia/wav_port.h>
diff --git a/pjmedia/include/pjmedia/session.h b/pjmedia/include/pjmedia/session.h
index c93d4343..38fe1e2c 100644
--- a/pjmedia/include/pjmedia/session.h
+++ b/pjmedia/include/pjmedia/session.h
@@ -88,10 +88,6 @@ typedef struct pjmedia_session_info pjmedia_session_info;
* @param endpt Pjmedia endpoint.
* @param max_streams Maximum number of stream infos to be created.
* @param si Session info structure to be initialized.
- * @param skinfo Optional array of media socket info to be copied
- * to the stream info. If this argument is specified,
- * the array must contain sufficient elements for
- * each stream to be initialized.
* @param local Local SDP session descriptor.
* @param remote Remote SDP session descriptor.
* @param stream_idx Media stream index in the session descriptor.
@@ -103,7 +99,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool,
pjmedia_endpt *endpt,
unsigned max_streams,
pjmedia_session_info *si,
- const pjmedia_sock_info skinfo[],
const pjmedia_sdp_session *local,
const pjmedia_sdp_session *remote);
@@ -118,7 +113,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool,
* @param si Stream info structure to be initialized.
* @param pool Pool to allocate memory.
* @param endpt PJMEDIA endpoint instance.
- * @param skinfo Optional socket info to be copied to the stream info.
* @param local Local SDP session descriptor.
* @param remote Remote SDP session descriptor.
* @param stream_idx Media stream index in the session descriptor.
@@ -129,7 +123,6 @@ PJ_DECL(pj_status_t)
pjmedia_stream_info_from_sdp( pjmedia_stream_info *si,
pj_pool_t *pool,
pjmedia_endpt *endpt,
- const pjmedia_sock_info *skinfo,
const pjmedia_sdp_session *local,
const pjmedia_sdp_session *remote,
unsigned stream_idx);
@@ -145,11 +138,11 @@ pjmedia_stream_info_from_sdp( pjmedia_stream_info *si,
* no media frames transmitted or received by the session.
*
* @param endpt The PJMEDIA endpoint instance.
- * @param stream_cnt Maximum number of streams to be created. This
- * also denotes the number of elements in the
- * socket information.
- * @param local_sdp The SDP describing local capability.
- * @param rem_sdp The SDP describing remote capability.
+ * @param si Session info containing stream count and array of
+ * stream info. The stream count indicates how many
+ * streams to be created in the session.
+ * @param transports Array of media stream transports, with
+ * sufficient number of elements (one for each stream).
* @param user_data Arbitrary user data to be kept in the session.
* @param p_session Pointer to receive the media session.
*
@@ -159,6 +152,7 @@ pjmedia_stream_info_from_sdp( pjmedia_stream_info *si,
PJ_DECL(pj_status_t)
pjmedia_session_create( pjmedia_endpt *endpt,
const pjmedia_session_info *si,
+ pjmedia_transport *transports[],
void *user_data,
pjmedia_session **p_session );
diff --git a/pjmedia/include/pjmedia/stream.h b/pjmedia/include/pjmedia/stream.h
index 335a40e2..94101d46 100644
--- a/pjmedia/include/pjmedia/stream.h
+++ b/pjmedia/include/pjmedia/stream.h
@@ -69,7 +69,6 @@ struct pjmedia_stream_info
{
pjmedia_type type; /**< Media type (audio, video) */
pjmedia_dir dir; /**< Media direction. */
- pjmedia_sock_info sock_info; /**< Media transport (RTP/RTCP sockets) */
pj_sockaddr_in rem_addr; /**< Remote RTP address */
pjmedia_codec_info fmt; /**< Incoming codec format info. */
pjmedia_codec_param *param; /**< Optional codec param. */
@@ -100,6 +99,80 @@ typedef struct pjmedia_stream pjmedia_stream;
/**
+ * @see pjmedia_transport_op.
+ */
+typedef struct pjmedia_transport pjmedia_transport;
+
+
+/**
+ * This structure describes the operations for the stream transport.
+ */
+struct pjmedia_transport_op
+{
+ /**
+ * This function is called by the stream when the transport is about
+ * to be used by the stream for the first time, and it tells the transport
+ * about remote RTP address to send the packet and some callbacks to be
+ * called for incoming packets.
+ */
+ pj_status_t (*attach)(pjmedia_transport *tp,
+ pjmedia_stream *strm,
+ const pj_sockaddr_t *rem_addr,
+ unsigned addr_len,
+ void (*rtp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t),
+ void (*rtcp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t));
+
+ /**
+ * This function is called by the stream when the stream is no longer
+ * need the transport (normally when the stream is about to be closed).
+ */
+ void (*detach)(pjmedia_transport *tp,
+ pjmedia_stream *strm);
+
+ /**
+ * This function is called by the stream to send RTP packet using the
+ * transport.
+ */
+ pj_status_t (*send_rtp)(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+
+ /**
+ * This function is called by the stream to send RTCP packet using the
+ * transport.
+ */
+ pj_status_t (*send_rtcp)(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+
+};
+
+
+/**
+ * @see pjmedia_transport_op.
+ */
+typedef struct pjmedia_transport_op pjmedia_transport_op;
+
+
+/**
+ * This structure declares stream transport. A stream transport is called
+ * by the stream to transmit a packet, and will notify stream when
+ * incoming packet is arrived.
+ */
+struct pjmedia_transport
+{
+ char name[PJ_MAX_OBJ_NAME];
+
+ pjmedia_transport_op *op;
+};
+
+
+
+/**
* Create a media stream based on the specified parameter. After the stream
* has been created, application normally would want to get the media port
* interface of the streams, by calling pjmedia_stream_get_port(). The
@@ -114,6 +187,9 @@ typedef struct pjmedia_stream pjmedia_stream;
* number of memory may be needed because jitter
* buffer needs to preallocate some storage.
* @param info Stream information.
+ * @param tp Stream transport instance used to transmit
+ * and receive RTP/RTCP packets to/from the underlying
+ * transport.
* @param user_data Arbitrary user data (for future callback feature).
* @param p_stream Pointer to receive the media stream.
*
@@ -122,6 +198,7 @@ typedef struct pjmedia_stream pjmedia_stream;
PJ_DECL(pj_status_t) pjmedia_stream_create(pjmedia_endpt *endpt,
pj_pool_t *pool,
const pjmedia_stream_info *info,
+ pjmedia_transport *tp,
void *user_data,
pjmedia_stream **p_stream);
@@ -151,6 +228,16 @@ PJ_DECL(pj_status_t) pjmedia_stream_get_port(pjmedia_stream *stream,
/**
+ * Get the media transport object associated with this stream.
+ *
+ * @param st The media stream.
+ *
+ * @return The transport object being used by the stream.
+ */
+PJ_DECL(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st);
+
+
+/**
* Start the media stream. This will start the appropriate channels
* in the media stream, depending on the media direction that was set
* when the stream was created.
@@ -224,8 +311,9 @@ PJ_DECL(pj_bool_t) pjmedia_stream_check_dtmf(pjmedia_stream *stream);
/**
- * Retrieve the incoming DTMF digits from the stream. Note that the digits
- * buffer will not be NULL terminated.
+ * Retrieve the incoming DTMF digits from the stream, and remove the digits
+ * from stream's DTMF buffer. Note that the digits buffer will not be NULL
+ * terminated.
*
* @param stream The media stream.
* @param ascii_digits Buffer to receive the digits. The length of this
diff --git a/pjmedia/include/pjmedia/transport_udp.h b/pjmedia/include/pjmedia/transport_udp.h
new file mode 100644
index 00000000..a84da98c
--- /dev/null
+++ b/pjmedia/include/pjmedia/transport_udp.h
@@ -0,0 +1,59 @@
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#ifndef __PJMEDIA_TRANSPORT_UDP_H__
+#define __PJMEDIA_TRANSPORT_UDP_H__
+
+
+/**
+ * @file stream_transport_udp.h
+ * @brief Stream transport with UDP.
+ */
+
+#include <pjmedia/stream.h>
+
+
+/**
+ * Create UDP stream transport.
+ */
+PJ_DECL(pj_status_t) pjmedia_transport_udp_create(pjmedia_endpt *endpt,
+ const char *name,
+ int port,
+ pjmedia_transport **p_tp);
+
+
+/**
+ * Create UDP stream transport from existing socket info.
+ */
+PJ_DECL(pj_status_t) pjmedia_transport_udp_attach(pjmedia_endpt *endpt,
+ const char *name,
+ const pjmedia_sock_info *si,
+ pjmedia_transport **p_tp);
+
+
+/**
+ * Close UDP transport.
+ */
+PJ_DECL(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp);
+
+
+
+
+#endif /* __PJMEDIA_TRANSPORT_UDP_H__ */
+
+
diff --git a/pjmedia/src/pjmedia/session.c b/pjmedia/src/pjmedia/session.c
index 6bd5d020..9073bef1 100644
--- a/pjmedia/src/pjmedia/session.c
+++ b/pjmedia/src/pjmedia/session.c
@@ -64,7 +64,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_info_from_sdp(
pjmedia_stream_info *si,
pj_pool_t *pool,
pjmedia_endpt *endpt,
- const pjmedia_sock_info *skinfo,
const pjmedia_sdp_session *local,
const pjmedia_sdp_session *remote,
unsigned stream_idx)
@@ -342,10 +341,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_info_from_sdp(
}
}
- /* Copy skinfo */
- if (skinfo)
- si->sock_info = *skinfo;
-
/* Leave SSRC to random. */
si->ssrc = pj_rand();
@@ -364,7 +359,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool,
pjmedia_endpt *endpt,
unsigned max_streams,
pjmedia_session_info *si,
- const pjmedia_sock_info skinfo[],
const pjmedia_sdp_session *local,
const pjmedia_sdp_session *remote)
{
@@ -381,7 +375,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool,
status = pjmedia_stream_info_from_sdp( &si->stream_info[i], pool,
endpt,
- (skinfo ? &skinfo[i] : NULL),
local, remote, i);
if (status != PJ_SUCCESS)
return status;
@@ -397,6 +390,7 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool,
*/
PJ_DEF(pj_status_t) pjmedia_session_create( pjmedia_endpt *endpt,
const pjmedia_session_info *si,
+ pjmedia_transport *transports[],
void *user_data,
pjmedia_session **p_session )
{
@@ -432,6 +426,7 @@ PJ_DEF(pj_status_t) pjmedia_session_create( pjmedia_endpt *endpt,
/* Create the stream */
status = pjmedia_stream_create(endpt, session->pool,
&session->stream_info[i],
+ (transports?transports[i]:NULL),
session,
&session->stream[i]);
if (status == PJ_SUCCESS)
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c
index 29430ccd..36acd1a4 100644
--- a/pjmedia/src/pjmedia/stream.c
+++ b/pjmedia/src/pjmedia/stream.c
@@ -85,32 +85,18 @@ struct pjmedia_stream
pjmedia_dir dir; /**< Stream direction. */
void *user_data; /**< User data. */
+ pjmedia_transport *transport; /**< Stream transport. */
+
pjmedia_codec *codec; /**< Codec instance being used. */
pjmedia_codec_param codec_param; /**< Codec param. */
unsigned frame_size; /**< Size of encoded base frame.*/
pj_mutex_t *jb_mutex;
pjmedia_jbuf *jb; /**< Jitter buffer. */
- pjmedia_sock_info skinfo; /**< Transport info. */
- pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */
- pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */
-
-
pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */
- pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */
- pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */
- pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/
- unsigned rtp_src_cnt; /**< if different, # of pkt rcv */
- int rtp_addrlen; /**< Address length. */
-
- pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */
- pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */
- pj_size_t rtcp_pkt_size; /**< Size of RTCP packet buf. */
- char rtcp_pkt[512]; /**< RTCP packet buffer. */
pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */
pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */
- int rtcp_addrlen; /**< Address length. */
/* RFC 2833 DTMF transmission queue: */
int tx_event_pt; /**< Outgoing pt for dtmf. */
@@ -423,24 +409,12 @@ static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp)
} else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) {
pjmedia_rtcp_pkt *rtcp_pkt;
- pj_ssize_t size;
int len;
- pj_status_t status;
pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len);
- size = len;
- status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0,
- &stream->rem_rtcp_addr,
- sizeof(stream->rem_rtcp_addr));
-#if 0
- if (status != PJ_SUCCESS) {
- char errmsg[PJ_ERR_MSG_SIZE];
-
- pj_strerror(status, errmsg, sizeof(errmsg));
- PJ_LOG(4,(port->info.name.ptr, "Error sending RTCP: %s [%d]",
- errmsg, status));
- }
-#endif
+
+ (*stream->transport->op->send_rtcp)(stream->transport,
+ rtcp_pkt, len);
stream->rtcp_last_tx = timestamp;
}
@@ -586,11 +560,10 @@ static pj_status_t put_frame( pjmedia_port *port,
/* Send. */
sent = frame_out.size+sizeof(pjmedia_rtp_hdr);
- status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt,
- &sent, 0, &stream->rem_rtp_addr,
- sizeof(stream->rem_rtp_addr));
- if (status != PJ_SUCCESS)
- return status;
+
+ (*stream->transport->op->send_rtp)(stream->transport,
+ channel->out_pkt, sent);
+
/* Update stat */
pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size);
@@ -682,249 +655,150 @@ static void handle_incoming_dtmf( pjmedia_stream *stream,
/*
- * This callback is called by ioqueue framework on receipt of packets
+ * This callback is called by stream transport on receipt of packets
* in the RTP socket.
*/
-static void on_rx_rtp( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_rx_rtp( pjmedia_stream *stream,
+ const void *pkt,
pj_ssize_t bytes_read)
{
- pjmedia_stream *stream = pj_ioqueue_get_user_data(key);
pjmedia_channel *channel = stream->dec;
+ const pjmedia_rtp_hdr *hdr;
+ const void *payload;
+ unsigned payloadlen;
+ pjmedia_rtp_status seq_st;
pj_status_t status;
-
- PJ_UNUSED_ARG(op_key);
-
-
- /*
- * Loop while we have packet.
- */
- do {
- const pjmedia_rtp_hdr *hdr;
- const void *payload;
- unsigned payloadlen;
- pjmedia_rtp_status seq_st;
-
- /* Go straight to read next packet if bytes_read == 0.
- */
- if (bytes_read == 0)
- goto read_next_packet;
-
- if (bytes_read < 0)
- goto read_next_packet;
-
- /* Update RTP and RTCP session. */
- status = pjmedia_rtp_decode_rtp(&channel->rtp,
- channel->in_pkt, bytes_read,
- &hdr, &payload, &payloadlen);
- if (status != PJ_SUCCESS) {
- LOGERR_((stream->port.info.name.ptr, "RTP decode error", status));
- goto read_next_packet;
- }
-
-
- /* Inform RTCP session */
- pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),
- pj_ntohl(hdr->ts), payloadlen);
-
- /* Handle incoming DTMF. */
- if (hdr->pt == stream->rx_event_pt) {
- handle_incoming_dtmf(stream, payload, payloadlen);
- goto read_next_packet;
- }
-
- /* Update RTP session (also checks if RTP session can accept
- * the incoming packet.
- */
- pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st);
- if (seq_st.status.value) {
- TRC_ ((stream->port.info.name.ptr,
- "RTP status: badpt=%d, badssrc=%d, dup=%d, "
- "outorder=%d, probation=%d, restart=%d",
- seq_st.status.flag.badpt,
- seq_st.status.flag.badssrc,
- seq_st.status.flag.dup,
- seq_st.status.flag.outorder,
- seq_st.status.flag.probation,
- seq_st.status.flag.restart));
-
- if (seq_st.status.flag.badpt) {
- PJ_LOG(4,(stream->port.info.name.ptr,
- "Bad RTP pt %d (expecting %d)",
- hdr->pt, channel->rtp.out_pt));
- }
- }
+ /* Update RTP and RTCP session. */
+ status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read,
+ &hdr, &payload, &payloadlen);
+ if (status != PJ_SUCCESS) {
+ LOGERR_((stream->port.info.name.ptr, "RTP decode error", status));
+ return;
+ }
- /* Skip bad RTP packet */
- if (seq_st.status.flag.bad)
- goto read_next_packet;
+ /* Inform RTCP session */
+ pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq),
+ pj_ntohl(hdr->ts), payloadlen);
- /* See if source address of RTP packet is different than the
- * configured address.
- */
- if ((stream->rem_rtp_addr.sin_addr.s_addr !=
- stream->rtp_src_addr.sin_addr.s_addr) ||
- (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port))
- {
- stream->rtp_src_cnt++;
+ /* Handle incoming DTMF. */
+ if (hdr->pt == stream->rx_event_pt) {
+ handle_incoming_dtmf(stream, payload, payloadlen);
+ return;
+ }
- if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
-
- stream->rem_rtp_addr = stream->rtp_src_addr;
- stream->rtp_src_cnt = 0;
- PJ_LOG(4,(stream->port.info.name.ptr,
- "Remote RTP address switched to %s:%d",
- pj_inet_ntoa(stream->rtp_src_addr.sin_addr),
- pj_ntohs(stream->rtp_src_addr.sin_port)));
- }
+ /* Update RTP session (also checks if RTP session can accept
+ * the incoming packet.
+ */
+ pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st);
+ if (seq_st.status.value) {
+ TRC_ ((stream->port.info.name.ptr,
+ "RTP status: badpt=%d, badssrc=%d, dup=%d, "
+ "outorder=%d, probation=%d, restart=%d",
+ seq_st.status.flag.badpt,
+ seq_st.status.flag.badssrc,
+ seq_st.status.flag.dup,
+ seq_st.status.flag.outorder,
+ seq_st.status.flag.probation,
+ seq_st.status.flag.restart));
+
+ if (seq_st.status.flag.badpt) {
+ PJ_LOG(4,(stream->port.info.name.ptr,
+ "Bad RTP pt %d (expecting %d)",
+ hdr->pt, channel->rtp.out_pt));
}
+ }
+ /* Skip bad RTP packet */
+ if (seq_st.status.flag.bad)
+ return;
- /* Put "good" packet to jitter buffer, or reset the jitter buffer
- * when RTP session is restarted.
- */
- pj_mutex_lock( stream->jb_mutex );
- if (seq_st.status.flag.restart) {
- status = pjmedia_jbuf_reset(stream->jb);
- PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset"));
-
- } else {
- /*
- * Packets may contain more than one frames, while the jitter
- * buffer can only take one frame per "put" operation. So we need
- * to ask the codec to "parse" the payload into multiple frames.
- */
- enum { MAX = 16 };
- pj_timestamp ts;
- unsigned i, count = MAX;
- unsigned samples_per_frame;
- pjmedia_frame frames[MAX];
-
- /* Get the timestamp of the first sample */
- ts.u64 = pj_ntohl(hdr->ts);
-
- /* Parse the payload. */
- status = (*stream->codec->op->parse)(stream->codec,
- (void*)payload,
- payloadlen,
- &ts,
- &count,
- frames);
- if (status != PJ_SUCCESS) {
- LOGERR_((stream->port.info.name.ptr,
- "Codec parse() error",
- status));
- count = 0;
- }
-
- /* Put each frame to jitter buffer. */
- samples_per_frame = stream->codec_param.info.frm_ptime *
- stream->codec_param.info.clock_rate *
- stream->codec_param.info.channel_cnt /
- 1000;
-
- for (i=0; i<count; ++i) {
- unsigned ext_seq;
-
- ext_seq = (unsigned)(frames[i].timestamp.u64 /
- samples_per_frame);
- pjmedia_jbuf_put_frame(stream->jb, frames[i].buf,
- frames[i].size, ext_seq);
-
- }
- }
- pj_mutex_unlock( stream->jb_mutex );
-
+ /* Put "good" packet to jitter buffer, or reset the jitter buffer
+ * when RTP session is restarted.
+ */
+ pj_mutex_lock( stream->jb_mutex );
+ if (seq_st.status.flag.restart) {
+ status = pjmedia_jbuf_reset(stream->jb);
+ PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset"));
- /* Check if now is the time to transmit RTCP SR/RR report.
- * We only do this when stream direction is "decoding only",
- * because otherwise check_tx_rtcp() will be handled by put_frame()
+ } else {
+ /*
+ * Packets may contain more than one frames, while the jitter
+ * buffer can only take one frame per "put" operation. So we need
+ * to ask the codec to "parse" the payload into multiple frames.
*/
- if (stream->dir == PJMEDIA_DIR_DECODING) {
- check_tx_rtcp(stream, pj_ntohl(hdr->ts));
- }
-
- if (status != 0) {
- LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",
- status));
- goto read_next_packet;
+ enum { MAX = 16 };
+ pj_timestamp ts;
+ unsigned i, count = MAX;
+ unsigned samples_per_frame;
+ pjmedia_frame frames[MAX];
+
+ /* Get the timestamp of the first sample */
+ ts.u64 = pj_ntohl(hdr->ts);
+
+ /* Parse the payload. */
+ status = (*stream->codec->op->parse)(stream->codec,
+ (void*)payload,
+ payloadlen,
+ &ts,
+ &count,
+ frames);
+ if (status != PJ_SUCCESS) {
+ LOGERR_((stream->port.info.name.ptr,
+ "Codec parse() error",
+ status));
+ count = 0;
}
+ /* Put each frame to jitter buffer. */
+ samples_per_frame = stream->codec_param.info.frm_ptime *
+ stream->codec_param.info.clock_rate *
+ stream->codec_param.info.channel_cnt /
+ 1000;
+
+ for (i=0; i<count; ++i) {
+ unsigned ext_seq;
-read_next_packet:
- bytes_read = channel->in_pkt_size;
- stream->rtp_addrlen = sizeof(stream->rtp_src_addr);
- status = pj_ioqueue_recvfrom( stream->rtp_key,
- &stream->rtp_op_key,
- channel->in_pkt,
- &bytes_read, 0,
- &stream->rtp_src_addr,
- &stream->rtp_addrlen);
+ ext_seq = (unsigned)(frames[i].timestamp.u64 /
+ samples_per_frame);
+ pjmedia_jbuf_put_frame(stream->jb, frames[i].buf,
+ frames[i].size, ext_seq);
- if (status != PJ_SUCCESS) {
- bytes_read = -status;
}
+ }
+ pj_mutex_unlock( stream->jb_mutex );
- } while (status == PJ_SUCCESS ||
- status == PJ_STATUS_FROM_OS(OSERR_ECONNRESET));
- if (status != PJ_SUCCESS && status != PJ_EPENDING) {
- char errmsg[PJ_ERR_MSG_SIZE];
+ /* Check if now is the time to transmit RTCP SR/RR report.
+ * We only do this when stream direction is "decoding only",
+ * because otherwise check_tx_rtcp() will be handled by put_frame()
+ */
+ if (stream->dir == PJMEDIA_DIR_DECODING) {
+ check_tx_rtcp(stream, pj_ntohl(hdr->ts));
+ }
- pj_strerror(status, errmsg, sizeof(errmsg));
- PJ_LOG(4,(stream->port.info.name.ptr,
- "Error reading RTP packet: %s [status=%d]. "
- "RTP stream thread quitting!",
- errmsg, status));
+ if (status != 0) {
+ LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error",
+ status));
+ return;
}
}
/*
- * This callback is called by ioqueue framework on receipt of packets
+ * This callback is called by stream transport on receipt of packets
* in the RTCP socket.
*/
-static void on_rx_rtcp( pj_ioqueue_key_t *key,
- pj_ioqueue_op_key_t *op_key,
+static void on_rx_rtcp( pjmedia_stream *stream,
+ const void *pkt,
pj_ssize_t bytes_read)
{
- pjmedia_stream *stream = pj_ioqueue_get_user_data(key);
- pj_status_t status;
-
- PJ_UNUSED_ARG(op_key);
-
- do {
- if (bytes_read > 0) {
- pjmedia_rtcp_rx_rtcp(&stream->rtcp, stream->rtcp_pkt,
- bytes_read);
- }
-
- bytes_read = stream->rtcp_pkt_size;
- stream->rtcp_addrlen = sizeof(stream->rem_rtcp_addr);
- status = pj_ioqueue_recvfrom( stream->rtcp_key,
- &stream->rtcp_op_key,
- stream->rtcp_pkt,
- &bytes_read, 0,
- &stream->rem_rtcp_addr,
- &stream->rtcp_addrlen);
-
- } while (status == PJ_SUCCESS);
-
- if (status != PJ_SUCCESS && status != PJ_EPENDING) {
- char errmsg[PJ_ERR_MSG_SIZE];
-
- pj_strerror(status, errmsg, sizeof(errmsg));
- PJ_LOG(4,(stream->port.info.name.ptr,
- "Error reading RTCP packet: %s [status=%d]",
- errmsg, status));
- }
-
+ pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read);
}
@@ -993,13 +867,12 @@ static pj_status_t create_channel( pj_pool_t *pool,
PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
pj_pool_t *pool,
const pjmedia_stream_info *info,
+ pjmedia_transport *tp,
void *user_data,
pjmedia_stream **p_stream)
{
pjmedia_stream *stream;
- pj_ioqueue_callback ioqueue_cb;
- pj_uint16_t rtcp_port;
unsigned jb_init, jb_max, jb_min_pre, jb_max_pre;
pj_status_t status;
@@ -1036,11 +909,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt);
stream->dir = info->dir;
stream->user_data = user_data;
- stream->skinfo = info->sock_info;
- stream->rem_rtp_addr = info->rem_addr;
- rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1);
- stream->rem_rtcp_addr = stream->rem_rtp_addr;
- stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port);
stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) *
info->fmt.clock_rate / 1000;
@@ -1048,6 +916,15 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1;
stream->last_dtmf = -1;
+ /* Attach transport */
+ status = (*tp->op->attach)(tp, stream, &info->rem_addr,
+ sizeof(info->rem_addr), &on_rx_rtp,
+ &on_rx_rtcp);
+ if (status != PJ_SUCCESS)
+ goto err_cleanup;
+
+ stream->transport = tp;
+
/* Create mutex to protect jitter buffer: */
@@ -1160,45 +1037,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt,
if (status != PJ_SUCCESS)
goto err_cleanup;
- /* Register RTP socket to ioqueue */
- pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
- ioqueue_cb.on_read_complete = &on_rx_rtp;
-
- status = pj_ioqueue_register_sock( pool,
- pjmedia_endpt_get_ioqueue(endpt),
- stream->skinfo.rtp_sock,
- stream, &ioqueue_cb, &stream->rtp_key);
- if (status != PJ_SUCCESS)
- goto err_cleanup;
-
- /* Init pending operation key. */
- pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key));
-
- /* Bootstrap the first recvfrom() operation. */
- on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0);
-
-
- /* Register RTCP socket to ioqueue. */
- if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) {
- pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
- ioqueue_cb.on_read_complete = &on_rx_rtcp;
- status = pj_ioqueue_register_sock( pool,
- pjmedia_endpt_get_ioqueue(endpt),
- stream->skinfo.rtcp_sock,
- stream, &ioqueue_cb,
- &stream->rtcp_key);
- if (status != PJ_SUCCESS)
- goto err_cleanup;
- }
-
- /* Init pending operation key. */
- pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key));
-
- stream->rtcp_pkt_size = sizeof(stream->rtcp_pkt);
-
- /* Bootstrap the first recvfrom() operation. */
- on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0);
/* Success! */
*p_stream = stream;
@@ -1227,14 +1066,10 @@ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream )
pj_mutex_lock(stream->jb_mutex);
- /* Unregister from ioqueue. */
- if (stream->rtp_key) {
- pj_ioqueue_unregister(stream->rtp_key);
- stream->rtp_key = NULL;
- }
- if (stream->rtcp_key) {
- pj_ioqueue_unregister(stream->rtcp_key);
- stream->rtcp_key = NULL;
+ /* Detach from transport */
+ if (stream->transport) {
+ (*stream->transport->op->detach)(stream->transport, stream);
+ stream->transport = NULL;
}
/* Free codec. */
@@ -1269,6 +1104,15 @@ PJ_DEF(pj_status_t) pjmedia_stream_get_port( pjmedia_stream *stream,
/*
+ * Get the transport object
+ */
+PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st)
+{
+ return st->transport;
+}
+
+
+/*
* Start stream.
*/
PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream)
diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c
new file mode 100644
index 00000000..c1936cfe
--- /dev/null
+++ b/pjmedia/src/pjmedia/transport_udp.c
@@ -0,0 +1,487 @@
+/* $Id$ */
+/*
+ * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+#include <pjmedia/transport_udp.h>
+#include <pj/assert.h>
+#include <pj/errno.h>
+#include <pj/ioqueue.h>
+#include <pj/log.h>
+#include <pj/string.h>
+
+
+/* Maximum size of incoming RTP packet */
+#define RTP_LEN 1500
+
+/* Maximum size of incoming RTCP packet */
+#define RTCP_LEN 600
+
+
+struct transport_udp
+{
+ pjmedia_transport base; /**< Base transport. */
+
+ pj_pool_t *pool; /**< Memory pool */
+
+ pjmedia_stream *stream; /**< Stream user (may be NULL) */
+ pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */
+ pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */
+ void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */
+ const void*,
+ pj_ssize_t);
+ void (*rtcp_cb)( pjmedia_stream*,/**< To report incoming RTCP. */
+ const void*,
+ pj_ssize_t);
+
+ pj_sock_t rtp_sock; /**< RTP socket */
+ pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */
+ pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */
+ pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */
+ pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */
+ unsigned rtp_src_cnt; /**< How many pkt from this addr. */
+ int rtp_addrlen; /**< Address length. */
+ char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */
+
+ pj_sock_t rtcp_sock; /**< RTCP socket */
+ pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */
+ pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */
+ pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */
+ char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */
+};
+
+
+
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+static void on_rx_rtcp(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read);
+
+static pj_status_t transport_attach( pjmedia_transport *tp,
+ pjmedia_stream *strm,
+ const pj_sockaddr_t *rem_addr,
+ unsigned addr_len,
+ void (*rtp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t),
+ void (*rtcp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t));
+static void transport_detach( pjmedia_transport *tp,
+ pjmedia_stream *strm);
+static pj_status_t transport_send_rtp( pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size);
+
+
+static pjmedia_transport_op transport_udp_op =
+{
+ &transport_attach,
+ &transport_detach,
+ &transport_send_rtp,
+ &transport_send_rtcp,
+};
+
+
+/**
+ * Create UDP stream transport.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt,
+ const char *name,
+ int port,
+ pjmedia_transport **p_tp)
+{
+ pjmedia_sock_info si;
+ pj_status_t status;
+
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL);
+
+
+ pj_memset(&si, 0, sizeof(pjmedia_sock_info));
+ si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET;
+
+ /* Create RTP socket */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtp_sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Bind RTP socket */
+ si.rtp_addr_name.sin_family = PJ_AF_INET;
+ si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port);
+
+ status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name,
+ sizeof(si.rtp_addr_name));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create RTCP socket */
+ status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtcp_sock);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ /* Bind RTCP socket */
+ si.rtcp_addr_name.sin_family = PJ_AF_INET;
+ si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1));
+
+ status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name,
+ sizeof(si.rtcp_addr_name));
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+
+ /* Create UDP transport by attaching socket info */
+ return pjmedia_transport_udp_attach( endpt, name, &si, p_tp);
+
+
+on_error:
+ if (si.rtp_sock != PJ_INVALID_SOCKET)
+ pj_sock_close(si.rtp_sock);
+ if (si.rtcp_sock != PJ_INVALID_SOCKET)
+ pj_sock_close(si.rtcp_sock);
+ return status;
+}
+
+
+/**
+ * Create UDP stream transport from existing socket info.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt,
+ const char *name,
+ const pjmedia_sock_info *si,
+ pjmedia_transport **p_tp)
+{
+ struct transport_udp *tp;
+ pj_pool_t *pool;
+ pj_ioqueue_t *ioqueue;
+ pj_ioqueue_callback rtp_cb, rtcp_cb;
+ pj_ssize_t size;
+ pj_status_t status;
+
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL);
+
+ /* Check name */
+ if (!name)
+ name = "udpmedia";
+
+ /* Get ioqueue instance */
+ ioqueue = pjmedia_endpt_get_ioqueue(endpt);
+
+
+ /* Create transport structure */
+ pool = pjmedia_endpt_create_pool(endpt, name, 4000, 4000);
+ if (!pool)
+ return PJ_ENOMEM;
+
+ tp = pj_pool_zalloc(pool, sizeof(struct transport_udp));
+ tp->pool = pool;
+ pj_ansi_strcpy(tp->base.name, name);
+ tp->base.op = &transport_udp_op;
+
+ /* Copy socket infos */
+ tp->rtp_sock = si->rtp_sock;
+ tp->rtcp_sock = si->rtcp_sock;
+
+
+ /* Setup RTP socket with the ioqueue */
+ pj_memset(&rtp_cb, 0, sizeof(rtp_cb));
+ rtp_cb.on_read_complete = &on_rx_rtp;
+
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp,
+ &rtp_cb, &tp->rtp_key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op));
+ pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
+
+ /* Kick of pending RTP read from the ioqueue */
+ tp->rtp_addrlen = sizeof(tp->rtp_src_addr);
+ size = sizeof(tp->rtp_pkt);
+ status = pj_ioqueue_recvfrom(tp->rtp_key, &tp->rtp_read_op,
+ tp->rtp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC,
+ &tp->rtp_src_addr, &tp->rtp_addrlen);
+ if (status != PJ_EPENDING)
+ goto on_error;
+
+
+ /* Setup RTCP socket with ioqueue */
+ pj_memset(&rtcp_cb, 0, sizeof(rtcp_cb));
+ rtcp_cb.on_read_complete = &on_rx_rtcp;
+
+ status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp,
+ &rtcp_cb, &tp->rtcp_key);
+ if (status != PJ_SUCCESS)
+ goto on_error;
+
+ pj_ioqueue_op_key_init(&tp->rtcp_read_op, sizeof(tp->rtcp_read_op));
+ pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op));
+
+
+ /* Kick of pending RTCP read from the ioqueue */
+ size = sizeof(tp->rtcp_pkt);
+ status = pj_ioqueue_recv(tp->rtcp_key, &tp->rtcp_read_op,
+ tp->rtcp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC);
+ if (status != PJ_EPENDING)
+ goto on_error;
+
+
+ /* Done */
+ *p_tp = &tp->base;
+ return PJ_SUCCESS;
+
+
+on_error:
+ pjmedia_transport_udp_close(&tp->base);
+ return status;
+}
+
+
+/**
+ * Close UDP transport.
+ */
+PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp)
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ /* Sanity check */
+ PJ_ASSERT_RETURN(tp, PJ_EINVAL);
+
+ /* Must not close while stream is using this */
+ PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+
+
+ if (udp->rtp_key) {
+ pj_ioqueue_unregister(udp->rtp_key);
+ udp->rtp_key = NULL;
+ } else if (udp->rtp_sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(udp->rtp_sock);
+ udp->rtp_sock = PJ_INVALID_SOCKET;
+ }
+
+ if (udp->rtcp_key) {
+ pj_ioqueue_unregister(udp->rtcp_key);
+ udp->rtcp_key = NULL;
+ } else if (udp->rtcp_sock != PJ_INVALID_SOCKET) {
+ pj_sock_close(udp->rtcp_sock);
+ udp->rtcp_sock = PJ_INVALID_SOCKET;
+ }
+
+ pj_pool_release(udp->pool);
+
+ return PJ_SUCCESS;
+}
+
+
+/* Notification from ioqueue about incoming RTP packet */
+static void on_rx_rtp( pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ struct transport_udp *udp;
+ pj_status_t status;
+
+ PJ_UNUSED_ARG(op_key);
+
+ udp = pj_ioqueue_get_user_data(key);
+
+ do {
+ void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
+ pjmedia_stream *stream;
+
+ cb = udp->rtp_cb;
+ stream = udp->stream;
+
+ if (bytes_read > 0 && cb && stream)
+ (*cb)(stream, udp->rtp_pkt, bytes_read);
+
+ /* See if source address of RTP packet is different than the
+ * configured address.
+ */
+ if ((udp->rem_rtp_addr.sin_addr.s_addr !=
+ udp->rtp_src_addr.sin_addr.s_addr) ||
+ (udp->rem_rtp_addr.sin_port !=
+ udp->rtp_src_addr.sin_port))
+ {
+ udp->rtp_src_cnt++;
+
+ if (udp->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) {
+
+ udp->rem_rtp_addr = udp->rtp_src_addr;
+ udp->rtp_src_cnt = 0;
+
+ PJ_LOG(4,(udp->base.name,
+ "Remote RTP address switched to %s:%d",
+ pj_inet_ntoa(udp->rtp_src_addr.sin_addr),
+ pj_ntohs(udp->rtp_src_addr.sin_port)));
+ }
+ }
+
+ bytes_read = sizeof(udp->rtp_pkt);
+ udp->rtp_addrlen = sizeof(pj_sockaddr_in);
+ status = pj_ioqueue_recvfrom(udp->rtp_key, &udp->rtp_read_op,
+ udp->rtp_pkt, &bytes_read, 0,
+ &udp->rtp_src_addr,
+ &udp->rtp_addrlen);
+
+ } while (status == PJ_SUCCESS);
+}
+
+
+/* Notification from ioqueue about incoming RTCP packet */
+static void on_rx_rtcp(pj_ioqueue_key_t *key,
+ pj_ioqueue_op_key_t *op_key,
+ pj_ssize_t bytes_read)
+{
+ struct transport_udp *udp;
+ pj_status_t status;
+
+ PJ_UNUSED_ARG(op_key);
+
+ udp = pj_ioqueue_get_user_data(key);
+
+ do {
+ void (*cb)(pjmedia_stream*,const void*,pj_ssize_t);
+ pjmedia_stream *stream;
+
+ cb = udp->rtcp_cb;
+ stream = udp->stream;
+
+ if (bytes_read > 0 && cb && stream)
+ (*cb)(stream, udp->rtcp_pkt, bytes_read);
+
+ bytes_read = sizeof(udp->rtcp_pkt);
+ status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op,
+ udp->rtcp_pkt, &bytes_read, 0);
+
+ } while (status == PJ_SUCCESS);
+}
+
+
+/* Called by stream to initialize the transport */
+static pj_status_t transport_attach( pjmedia_transport *tp,
+ pjmedia_stream *strm,
+ const pj_sockaddr_t *rem_addr,
+ unsigned addr_len,
+ void (*rtp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t),
+ void (*rtcp_cb)(pjmedia_stream*,
+ const void*,
+ pj_ssize_t))
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ /* Validate arguments */
+ PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL);
+
+ /* Remote address must be Internet address */
+ PJ_ASSERT_RETURN(addr_len == sizeof(pj_sockaddr_in) &&
+ ((pj_sockaddr_in*)rem_addr)->sin_family == PJ_AF_INET,
+ PJ_EINVAL);
+
+ /* Must not be "attached" to existing stream */
+ PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP);
+
+ /* "Attach" the stream: */
+
+ /* Copy remote RTP address */
+ pj_memcpy(&udp->rem_rtp_addr, rem_addr, sizeof(pj_sockaddr_in));
+
+ /* Guess RTCP address from RTP address */
+ pj_memcpy(&udp->rem_rtcp_addr, rem_addr, sizeof(pj_sockaddr_in));
+ udp->rem_rtcp_addr.sin_port = (pj_uint16_t) pj_htons((pj_uint16_t)(
+ pj_ntohs(udp->rem_rtp_addr.sin_port)+1));
+
+ /* Save the callbacks */
+ udp->rtp_cb = rtp_cb;
+ udp->rtcp_cb = rtcp_cb;
+
+ /* Last, save the stream to mark that we have a "client" */
+ udp->stream = strm;
+
+ return PJ_SUCCESS;
+}
+
+
+/* Called by stream when it no longer needs the transport */
+static void transport_detach( pjmedia_transport *tp,
+ pjmedia_stream *strm)
+{
+ struct transport_udp *udp = (struct transport_udp*) tp;
+
+ pj_assert(tp && strm);
+
+ /* Clear up stream infos from transport */
+ udp->stream = NULL;
+ udp->rtp_cb = NULL;
+ udp->rtcp_cb = NULL;
+}
+
+
+/* Called by stream to send RTP packet */
+static pj_status_t transport_send_rtp( pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size)
+{
+ struct transport_udp *udp = (struct transport_udp*)tp;
+ pj_ssize_t sent;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+
+ sent = size;
+ status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op,
+ pkt, &sent, 0,
+ &udp->rem_rtp_addr, sizeof(pj_sockaddr_in));
+
+ if (status==PJ_SUCCESS || status==PJ_EPENDING)
+ return PJ_SUCCESS;
+
+ return status;
+}
+
+/* Called by stream to send RTCP packet */
+static pj_status_t transport_send_rtcp(pjmedia_transport *tp,
+ const void *pkt,
+ pj_size_t size)
+{
+ struct transport_udp *udp = (struct transport_udp*)tp;
+ pj_ssize_t sent;
+ pj_status_t status;
+
+ PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP);
+
+ sent = size;
+ status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op,
+ pkt, &sent, 0,
+ &udp->rem_rtcp_addr, sizeof(pj_sockaddr_in));
+
+ if (status==PJ_SUCCESS || status==PJ_EPENDING)
+ return PJ_SUCCESS;
+
+ return status;
+}
+
diff --git a/pjsip-apps/src/samples/simpleua.c b/pjsip-apps/src/samples/simpleua.c
index c55c3e99..2cf7de7b 100644
--- a/pjsip-apps/src/samples/simpleua.c
+++ b/pjsip-apps/src/samples/simpleua.c
@@ -73,6 +73,7 @@ static pj_caching_pool cp; /* Global pool factory. */
static pjmedia_endpt *g_med_endpt; /* Media endpoint. */
static pjmedia_sock_info g_med_skinfo; /* Socket info for media */
+static pjmedia_transport *g_med_transport;/* Media stream transport */
/* Call variables: */
static pjsip_inv_session *g_inv; /* Current invite session. */
@@ -280,6 +281,14 @@ int main(int argc, char *argv[])
g_med_skinfo.rtcp_addr_name = g_med_skinfo.rtp_addr_name;
+ /* Create media transport */
+ status = pjmedia_transport_udp_attach(g_med_endpt, NULL, &g_med_skinfo,
+ &g_med_transport);
+ if (status != PJ_SUCCESS) {
+ app_perror(THIS_FILE, "Unable to create media transport", status);
+ return 1;
+ }
+
/*
* If URL is specified, then make call immediately.
*/
@@ -611,8 +620,8 @@ static void call_on_media_update( pjsip_inv_session *inv,
/* Create session info based on the two SDPs.
* We only support one stream per session for now.
*/
- status = pjmedia_session_info_from_sdp(inv->dlg->pool, g_med_endpt, 1,
- &sess_info, &g_med_skinfo,
+ status = pjmedia_session_info_from_sdp(inv->dlg->pool, g_med_endpt,
+ 1, &sess_info,
local_sdp, remote_sdp);
if (status != PJ_SUCCESS) {
app_perror( THIS_FILE, "Unable to create media session", status);
@@ -629,7 +638,7 @@ static void call_on_media_update( pjsip_inv_session *inv,
* The media session is active immediately.
*/
status = pjmedia_session_create( g_med_endpt, &sess_info,
- NULL, &g_med_session );
+ &g_med_transport, NULL, &g_med_session );
if (status != PJ_SUCCESS) {
app_perror( THIS_FILE, "Unable to create media session", status);
return;
diff --git a/pjsip-apps/src/samples/siprtp.c b/pjsip-apps/src/samples/siprtp.c
index 8b896b27..14830640 100644
--- a/pjsip-apps/src/samples/siprtp.c
+++ b/pjsip-apps/src/samples/siprtp.c
@@ -1276,7 +1276,7 @@ static void call_on_media_update( pjsip_inv_session *inv,
pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp);
status = pjmedia_stream_info_from_sdp(&audio->si, inv->pool, app.med_endpt,
- NULL, local_sdp, remote_sdp, 0);
+ local_sdp, remote_sdp, 0);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error creating stream info from SDP", status);
return;
diff --git a/pjsip-apps/src/samples/streamutil.c b/pjsip-apps/src/samples/streamutil.c
index d87dae74..aea56178 100644
--- a/pjsip-apps/src/samples/streamutil.c
+++ b/pjsip-apps/src/samples/streamutil.c
@@ -110,6 +110,7 @@ static pj_status_t create_stream( pj_pool_t *pool,
pjmedia_stream **p_stream )
{
pjmedia_stream_info info;
+ pjmedia_transport *transport;
pj_status_t status;
@@ -129,58 +130,23 @@ static pj_status_t create_stream( pj_pool_t *pool,
pj_memcpy(&info.rem_addr, rem_addr, sizeof(pj_sockaddr_in));
- /* Create RTP socket */
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
- &info.sock_info.rtp_sock);
- PJ_ASSERT_RETURN(status == PJ_SUCCESS, status);
-
-
- /* Bind RTP socket to local port */
- info.sock_info.rtp_addr_name.sin_family = PJ_AF_INET;
- info.sock_info.rtp_addr_name.sin_port = pj_htons(local_port);
-
- status = pj_sock_bind(info.sock_info.rtp_sock,
- &info.sock_info.rtp_addr_name,
- sizeof(pj_sockaddr_in));
- if (status != PJ_SUCCESS) {
- app_perror(THIS_FILE, "Unable to bind RTP socket", status);
- pj_sock_close(info.sock_info.rtp_sock);
- return status;
- }
-
-
- /* Create RTCP socket */
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0,
- &info.sock_info.rtcp_sock);
- PJ_ASSERT_RETURN(status == PJ_SUCCESS, status);
-
-
- /* Bind RTP socket to local port + 1 */
- ++local_port;
- info.sock_info.rtcp_addr_name.sin_family = PJ_AF_INET;
- info.sock_info.rtcp_addr_name.sin_port = pj_htons(local_port);
-
- status = pj_sock_bind(info.sock_info.rtcp_sock,
- &info.sock_info.rtcp_addr_name,
- sizeof(pj_sockaddr_in));
- if (status != PJ_SUCCESS) {
- app_perror(THIS_FILE, "Unable to bind RTCP socket", status);
- pj_sock_close(info.sock_info.rtp_sock);
- pj_sock_close(info.sock_info.rtcp_sock);
+ /* Create media transport */
+ status = pjmedia_transport_udp_create(med_endpt, NULL, local_port,
+ &transport);
+ if (status != PJ_SUCCESS)
return status;
- }
/* Now that the stream info is initialized, we can create the
* stream.
*/
- status = pjmedia_stream_create( med_endpt, pool, &info, NULL, p_stream);
+ status = pjmedia_stream_create( med_endpt, pool, &info,
+ transport, NULL, p_stream);
if (status != PJ_SUCCESS) {
app_perror(THIS_FILE, "Error creating stream", status);
- pj_sock_close(info.sock_info.rtp_sock);
- pj_sock_close(info.sock_info.rtcp_sock);
+ pjmedia_transport_udp_close(transport);
return status;
}
@@ -211,7 +177,7 @@ int main(int argc, char *argv[])
pjmedia_stream *stream = NULL;
pjmedia_port *stream_port;
char tmp[10];
- pj_status_t status;
+ pj_status_t status;
/* Default values */
@@ -513,7 +479,11 @@ on_exit:
/* Destroy stream */
if (stream) {
+ pjmedia_transport *tp;
+
+ tp = pjmedia_stream_get_transport(stream);
pjmedia_stream_destroy(stream);
+ pjmedia_transport_udp_close(tp);
}
/* Destroy file ports */
diff --git a/pjsip/include/pjsua-lib/pjsua.h b/pjsip/include/pjsua-lib/pjsua.h
index 18467129..a3c0c3a4 100644
--- a/pjsip/include/pjsua-lib/pjsua.h
+++ b/pjsip/include/pjsua-lib/pjsua.h
@@ -102,6 +102,7 @@ struct pjsua_call
pjsip_evsub *xfer_sub; /**< Xfer server subscription, if this
call was triggered by xfer. */
pjmedia_sock_info skinfo; /**< Preallocated media sockets. */
+ pjmedia_transport *med_tp; /**< Media transport. */
void *app_data; /**< Application data. */
pj_timer_entry refresh_tm;/**< Timer to send re-INVITE. */
pj_timer_entry hangup_tm; /**< Timer to hangup call. */
diff --git a/pjsip/src/pjsua-lib/pjsua_call.c b/pjsip/src/pjsua-lib/pjsua_call.c
index 73f5aac1..4dbfb596 100644
--- a/pjsip/src/pjsua-lib/pjsua_call.c
+++ b/pjsip/src/pjsua-lib/pjsua_call.c
@@ -97,26 +97,6 @@ static void schedule_call_timer( pjsua_call *call, pj_timer_entry *e,
}
-/* Close and reopen socket. */
-static pj_status_t reopen_sock( pj_sock_t *sock, pj_sockaddr_in *addr)
-{
- pj_status_t status;
-
- status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, sock);
- if (status != PJ_SUCCESS) {
- pjsua_perror(THIS_FILE, "Unable to create socket", status);
- return status;
- }
-
- status = pj_sock_bind(*sock, addr, sizeof(pj_sockaddr_in));
- if (status != PJ_SUCCESS) {
- pjsua_perror(THIS_FILE, "Unable to re-bind RTP/RTCP socket", status);
- return status;
- }
-
- return PJ_SUCCESS;
-}
-
/*
* Destroy the call's media
*/
@@ -130,27 +110,8 @@ static pj_status_t call_destroy_media(int call_index)
}
if (call->session) {
- pj_sockaddr_in rtp_addr, rtcp_addr;
- int addrlen;
-
- addrlen = sizeof(rtp_addr);
- pj_sock_getsockname(call->skinfo.rtp_sock, &rtp_addr, &addrlen);
-
- addrlen = sizeof(rtcp_addr);
- pj_sock_getsockname(call->skinfo.rtcp_sock, &rtcp_addr, &addrlen);
-
/* Destroy session (this will also close RTP/RTCP sockets). */
pjmedia_session_destroy(call->session);
-
- /* Close and reopen RTP socket.
- * This is necessary to get the socket unregistered from ioqueue,
- * when IOCompletionPort is used.
- */
- reopen_sock(&call->skinfo.rtp_sock, &rtp_addr);
-
- /* Close and reopen RTCP socket too. */
- reopen_sock(&call->skinfo.rtcp_sock, &rtcp_addr);
-
call->session = NULL;
PJ_LOG(3,(THIS_FILE, "Media session for call %d is destroyed",
@@ -1011,8 +972,8 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv,
* We only support one stream per session at the moment
*/
status = pjmedia_session_info_from_sdp( call->inv->dlg->pool,
- pjsua.med_endpt, 1,
- &sess_info, &call->skinfo,
+ pjsua.med_endpt,
+ 1,&sess_info,
local_sdp, remote_sdp);
if (status != PJ_SUCCESS) {
pjsua_perror(THIS_FILE, "Unable to create media session",
@@ -1035,6 +996,7 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv,
/* Create session based on session info. */
status = pjmedia_session_create( pjsua.med_endpt, &sess_info,
+ &call->med_tp,
call, &call->session );
if (status != PJ_SUCCESS) {
pjsua_perror(THIS_FILE, "Unable to create media session",
diff --git a/pjsip/src/pjsua-lib/pjsua_core.c b/pjsip/src/pjsua-lib/pjsua_core.c
index eafcd78b..5959d065 100644
--- a/pjsip/src/pjsua-lib/pjsua_core.c
+++ b/pjsip/src/pjsua-lib/pjsua_core.c
@@ -320,7 +320,7 @@ static pj_status_t init_sockets(pj_bool_t sip,
PJ_LOG(4,(THIS_FILE, "RTP socket reachable at %s:%d",
pj_inet_ntoa(skinfo->rtp_addr_name.sin_addr),
pj_ntohs(skinfo->rtp_addr_name.sin_port)));
- PJ_LOG(4,(THIS_FILE, "RTCP UDP socket reachable at %s:%d",
+ PJ_LOG(4,(THIS_FILE, "RTCP socket reachable at %s:%d",
pj_inet_ntoa(skinfo->rtcp_addr_name.sin_addr),
pj_ntohs(skinfo->rtcp_addr_name.sin_port)));
@@ -779,6 +779,10 @@ pj_status_t pjsua_start(void)
/* Init sockets (STUN etc): */
for (i=0; i<(int)pjsua.max_calls; ++i) {
status = init_sockets(i==0, &pjsua.calls[i].skinfo);
+ if (status == PJ_SUCCESS)
+ status = pjmedia_transport_udp_attach(pjsua.med_endpt, NULL,
+ &pjsua.calls[i].skinfo,
+ &pjsua.calls[i].med_tp);
if (status != PJ_SUCCESS) {
pjsua_perror(THIS_FILE, "init_sockets() has returned error",
status);
@@ -786,8 +790,7 @@ pj_status_t pjsua_start(void)
if (i >= 0)
pj_sock_close(pjsua.sip_sock);
while (i >= 0) {
- pj_sock_close(pjsua.calls[i].skinfo.rtp_sock);
- pj_sock_close(pjsua.calls[i].skinfo.rtcp_sock);
+ pjmedia_transport_udp_close(pjsua.calls[i].med_tp);
}
return status;
}
@@ -1048,6 +1051,12 @@ pj_status_t pjsua_destroy(void)
pjmedia_codec_l16_deinit();
#endif /* PJMEDIA_HAS_L16_CODEC */
+
+ /* Close transports */
+ for (i=0; i<pjsua.call_cnt; ++i) {
+ pjmedia_transport_udp_close(pjsua.calls[i].med_tp);
+ }
+
/* Destroy media endpoint. */
pjmedia_endpt_destroy(pjsua.med_endpt);