diff options
author | Benny Prijono <bennylp@teluu.com> | 2006-05-17 17:17:39 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2006-05-17 17:17:39 +0000 |
commit | 1b3863ac6dcae1a7bed7e0b0cb6a2f482c093989 (patch) | |
tree | 8ead72c61a60697246ab48fde099fb063c3fbaff | |
parent | 79e6d6ac5ae27d653d1724059f081a6be1c39b7e (diff) |
Major modification in pjmedia to split stream transport into separate functionality, to allow using custom transports with streams
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@452 74dad513-b988-da41-8d7b-12977e46ad98
-rw-r--r-- | pjmedia/build/Makefile | 3 | ||||
-rw-r--r-- | pjmedia/build/os-win32.mak | 3 | ||||
-rw-r--r-- | pjmedia/build/pjmedia.dsp | 8 | ||||
-rw-r--r-- | pjmedia/include/pjmedia.h | 1 | ||||
-rw-r--r-- | pjmedia/include/pjmedia/session.h | 18 | ||||
-rw-r--r-- | pjmedia/include/pjmedia/stream.h | 94 | ||||
-rw-r--r-- | pjmedia/include/pjmedia/transport_udp.h | 59 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/session.c | 9 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/stream.c | 434 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/transport_udp.c | 487 | ||||
-rw-r--r-- | pjsip-apps/src/samples/simpleua.c | 15 | ||||
-rw-r--r-- | pjsip-apps/src/samples/siprtp.c | 2 | ||||
-rw-r--r-- | pjsip-apps/src/samples/streamutil.c | 56 | ||||
-rw-r--r-- | pjsip/include/pjsua-lib/pjsua.h | 1 | ||||
-rw-r--r-- | pjsip/src/pjsua-lib/pjsua_call.c | 44 | ||||
-rw-r--r-- | pjsip/src/pjsua-lib/pjsua_core.c | 15 |
16 files changed, 839 insertions, 410 deletions
diff --git a/pjmedia/build/Makefile b/pjmedia/build/Makefile index 332aa40b..9dfac98c 100644 --- a/pjmedia/build/Makefile +++ b/pjmedia/build/Makefile @@ -69,7 +69,8 @@ export PJMEDIA_OBJS += $(OS_OBJS) $(M_OBJS) $(CC_OBJS) $(HOST_OBJS) \ master_port.o null_port.o plc_common.o plc_g711.o \ port.o resample.o \ resample_port.o rtcp.o rtp.o sdp.o sdp_cmp.o sdp_neg.o \ - session.o silencedet.o sound_port.o stream.o wave.o \ + session.o silencedet.o sound_port.o stream.o \ + transport_udp.o wave.o \ $(SOUND_OBJS) $(NULLSOUND_OBJS) export PJMEDIA_CFLAGS += $(_CFLAGS) diff --git a/pjmedia/build/os-win32.mak b/pjmedia/build/os-win32.mak index 9a7c836b..78095336 100644 --- a/pjmedia/build/os-win32.mak +++ b/pjmedia/build/os-win32.mak @@ -10,7 +10,8 @@ export PJMEDIA_OBJS += $(PA_DIR)/pa_win_hostapis.o $(PA_DIR)/pa_win_util.o \ $(PA_DIR)/pa_win_wmme.o -export OS_CFLAGS += -DPA_NO_ASIO -DPA_NO_DS +export OS_CFLAGS += -DPA_NO_ASIO -DPA_NO_DS \ + -DPJMEDIA_SOUND_IMPLEMENTATION=PJMEDIA_SOUND_PORTAUDIO_SOUND # Example: # to activate Null sound, uncomment these two lines below. diff --git a/pjmedia/build/pjmedia.dsp b/pjmedia/build/pjmedia.dsp index af4d9d7d..9e998198 100644 --- a/pjmedia/build/pjmedia.dsp +++ b/pjmedia/build/pjmedia.dsp @@ -191,6 +191,10 @@ SOURCE=..\src\pjmedia\stream.c # End Source File
# Begin Source File
+SOURCE=..\src\pjmedia\transport_udp.c
+# End Source File
+# Begin Source File
+
SOURCE=..\src\pjmedia\wav_player.c
# End Source File
# Begin Source File
@@ -303,6 +307,10 @@ SOURCE=..\include\pjmedia\stream.h # End Source File
# Begin Source File
+SOURCE=..\include\pjmedia\transport_udp.h
+# End Source File
+# Begin Source File
+
SOURCE=..\include\pjmedia\types.h
# End Source File
# Begin Source File
diff --git a/pjmedia/include/pjmedia.h b/pjmedia/include/pjmedia.h index c0869e9d..9e363769 100644 --- a/pjmedia/include/pjmedia.h +++ b/pjmedia/include/pjmedia.h @@ -43,6 +43,7 @@ #include <pjmedia/sdp_neg.h> #include <pjmedia/silencedet.h> #include <pjmedia/session.h> +#include <pjmedia/transport_udp.h> #include <pjmedia/sound.h> #include <pjmedia/sound_port.h> #include <pjmedia/wav_port.h> diff --git a/pjmedia/include/pjmedia/session.h b/pjmedia/include/pjmedia/session.h index c93d4343..38fe1e2c 100644 --- a/pjmedia/include/pjmedia/session.h +++ b/pjmedia/include/pjmedia/session.h @@ -88,10 +88,6 @@ typedef struct pjmedia_session_info pjmedia_session_info; * @param endpt Pjmedia endpoint. * @param max_streams Maximum number of stream infos to be created. * @param si Session info structure to be initialized. - * @param skinfo Optional array of media socket info to be copied - * to the stream info. If this argument is specified, - * the array must contain sufficient elements for - * each stream to be initialized. * @param local Local SDP session descriptor. * @param remote Remote SDP session descriptor. * @param stream_idx Media stream index in the session descriptor. @@ -103,7 +99,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool, pjmedia_endpt *endpt, unsigned max_streams, pjmedia_session_info *si, - const pjmedia_sock_info skinfo[], const pjmedia_sdp_session *local, const pjmedia_sdp_session *remote); @@ -118,7 +113,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool, * @param si Stream info structure to be initialized. * @param pool Pool to allocate memory. * @param endpt PJMEDIA endpoint instance. - * @param skinfo Optional socket info to be copied to the stream info. * @param local Local SDP session descriptor. * @param remote Remote SDP session descriptor. * @param stream_idx Media stream index in the session descriptor. @@ -129,7 +123,6 @@ PJ_DECL(pj_status_t) pjmedia_stream_info_from_sdp( pjmedia_stream_info *si, pj_pool_t *pool, pjmedia_endpt *endpt, - const pjmedia_sock_info *skinfo, const pjmedia_sdp_session *local, const pjmedia_sdp_session *remote, unsigned stream_idx); @@ -145,11 +138,11 @@ pjmedia_stream_info_from_sdp( pjmedia_stream_info *si, * no media frames transmitted or received by the session. * * @param endpt The PJMEDIA endpoint instance. - * @param stream_cnt Maximum number of streams to be created. This - * also denotes the number of elements in the - * socket information. - * @param local_sdp The SDP describing local capability. - * @param rem_sdp The SDP describing remote capability. + * @param si Session info containing stream count and array of + * stream info. The stream count indicates how many + * streams to be created in the session. + * @param transports Array of media stream transports, with + * sufficient number of elements (one for each stream). * @param user_data Arbitrary user data to be kept in the session. * @param p_session Pointer to receive the media session. * @@ -159,6 +152,7 @@ pjmedia_stream_info_from_sdp( pjmedia_stream_info *si, PJ_DECL(pj_status_t) pjmedia_session_create( pjmedia_endpt *endpt, const pjmedia_session_info *si, + pjmedia_transport *transports[], void *user_data, pjmedia_session **p_session ); diff --git a/pjmedia/include/pjmedia/stream.h b/pjmedia/include/pjmedia/stream.h index 335a40e2..94101d46 100644 --- a/pjmedia/include/pjmedia/stream.h +++ b/pjmedia/include/pjmedia/stream.h @@ -69,7 +69,6 @@ struct pjmedia_stream_info { pjmedia_type type; /**< Media type (audio, video) */ pjmedia_dir dir; /**< Media direction. */ - pjmedia_sock_info sock_info; /**< Media transport (RTP/RTCP sockets) */ pj_sockaddr_in rem_addr; /**< Remote RTP address */ pjmedia_codec_info fmt; /**< Incoming codec format info. */ pjmedia_codec_param *param; /**< Optional codec param. */ @@ -100,6 +99,80 @@ typedef struct pjmedia_stream pjmedia_stream; /** + * @see pjmedia_transport_op. + */ +typedef struct pjmedia_transport pjmedia_transport; + + +/** + * This structure describes the operations for the stream transport. + */ +struct pjmedia_transport_op +{ + /** + * This function is called by the stream when the transport is about + * to be used by the stream for the first time, and it tells the transport + * about remote RTP address to send the packet and some callbacks to be + * called for incoming packets. + */ + pj_status_t (*attach)(pjmedia_transport *tp, + pjmedia_stream *strm, + const pj_sockaddr_t *rem_addr, + unsigned addr_len, + void (*rtp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t), + void (*rtcp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t)); + + /** + * This function is called by the stream when the stream is no longer + * need the transport (normally when the stream is about to be closed). + */ + void (*detach)(pjmedia_transport *tp, + pjmedia_stream *strm); + + /** + * This function is called by the stream to send RTP packet using the + * transport. + */ + pj_status_t (*send_rtp)(pjmedia_transport *tp, + const void *pkt, + pj_size_t size); + + /** + * This function is called by the stream to send RTCP packet using the + * transport. + */ + pj_status_t (*send_rtcp)(pjmedia_transport *tp, + const void *pkt, + pj_size_t size); + +}; + + +/** + * @see pjmedia_transport_op. + */ +typedef struct pjmedia_transport_op pjmedia_transport_op; + + +/** + * This structure declares stream transport. A stream transport is called + * by the stream to transmit a packet, and will notify stream when + * incoming packet is arrived. + */ +struct pjmedia_transport +{ + char name[PJ_MAX_OBJ_NAME]; + + pjmedia_transport_op *op; +}; + + + +/** * Create a media stream based on the specified parameter. After the stream * has been created, application normally would want to get the media port * interface of the streams, by calling pjmedia_stream_get_port(). The @@ -114,6 +187,9 @@ typedef struct pjmedia_stream pjmedia_stream; * number of memory may be needed because jitter * buffer needs to preallocate some storage. * @param info Stream information. + * @param tp Stream transport instance used to transmit + * and receive RTP/RTCP packets to/from the underlying + * transport. * @param user_data Arbitrary user data (for future callback feature). * @param p_stream Pointer to receive the media stream. * @@ -122,6 +198,7 @@ typedef struct pjmedia_stream pjmedia_stream; PJ_DECL(pj_status_t) pjmedia_stream_create(pjmedia_endpt *endpt, pj_pool_t *pool, const pjmedia_stream_info *info, + pjmedia_transport *tp, void *user_data, pjmedia_stream **p_stream); @@ -151,6 +228,16 @@ PJ_DECL(pj_status_t) pjmedia_stream_get_port(pjmedia_stream *stream, /** + * Get the media transport object associated with this stream. + * + * @param st The media stream. + * + * @return The transport object being used by the stream. + */ +PJ_DECL(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st); + + +/** * Start the media stream. This will start the appropriate channels * in the media stream, depending on the media direction that was set * when the stream was created. @@ -224,8 +311,9 @@ PJ_DECL(pj_bool_t) pjmedia_stream_check_dtmf(pjmedia_stream *stream); /** - * Retrieve the incoming DTMF digits from the stream. Note that the digits - * buffer will not be NULL terminated. + * Retrieve the incoming DTMF digits from the stream, and remove the digits + * from stream's DTMF buffer. Note that the digits buffer will not be NULL + * terminated. * * @param stream The media stream. * @param ascii_digits Buffer to receive the digits. The length of this diff --git a/pjmedia/include/pjmedia/transport_udp.h b/pjmedia/include/pjmedia/transport_udp.h new file mode 100644 index 00000000..a84da98c --- /dev/null +++ b/pjmedia/include/pjmedia/transport_udp.h @@ -0,0 +1,59 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#ifndef __PJMEDIA_TRANSPORT_UDP_H__ +#define __PJMEDIA_TRANSPORT_UDP_H__ + + +/** + * @file stream_transport_udp.h + * @brief Stream transport with UDP. + */ + +#include <pjmedia/stream.h> + + +/** + * Create UDP stream transport. + */ +PJ_DECL(pj_status_t) pjmedia_transport_udp_create(pjmedia_endpt *endpt, + const char *name, + int port, + pjmedia_transport **p_tp); + + +/** + * Create UDP stream transport from existing socket info. + */ +PJ_DECL(pj_status_t) pjmedia_transport_udp_attach(pjmedia_endpt *endpt, + const char *name, + const pjmedia_sock_info *si, + pjmedia_transport **p_tp); + + +/** + * Close UDP transport. + */ +PJ_DECL(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp); + + + + +#endif /* __PJMEDIA_TRANSPORT_UDP_H__ */ + + diff --git a/pjmedia/src/pjmedia/session.c b/pjmedia/src/pjmedia/session.c index 6bd5d020..9073bef1 100644 --- a/pjmedia/src/pjmedia/session.c +++ b/pjmedia/src/pjmedia/session.c @@ -64,7 +64,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_info_from_sdp( pjmedia_stream_info *si, pj_pool_t *pool, pjmedia_endpt *endpt, - const pjmedia_sock_info *skinfo, const pjmedia_sdp_session *local, const pjmedia_sdp_session *remote, unsigned stream_idx) @@ -342,10 +341,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_info_from_sdp( } } - /* Copy skinfo */ - if (skinfo) - si->sock_info = *skinfo; - /* Leave SSRC to random. */ si->ssrc = pj_rand(); @@ -364,7 +359,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool, pjmedia_endpt *endpt, unsigned max_streams, pjmedia_session_info *si, - const pjmedia_sock_info skinfo[], const pjmedia_sdp_session *local, const pjmedia_sdp_session *remote) { @@ -381,7 +375,6 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool, status = pjmedia_stream_info_from_sdp( &si->stream_info[i], pool, endpt, - (skinfo ? &skinfo[i] : NULL), local, remote, i); if (status != PJ_SUCCESS) return status; @@ -397,6 +390,7 @@ pjmedia_session_info_from_sdp( pj_pool_t *pool, */ PJ_DEF(pj_status_t) pjmedia_session_create( pjmedia_endpt *endpt, const pjmedia_session_info *si, + pjmedia_transport *transports[], void *user_data, pjmedia_session **p_session ) { @@ -432,6 +426,7 @@ PJ_DEF(pj_status_t) pjmedia_session_create( pjmedia_endpt *endpt, /* Create the stream */ status = pjmedia_stream_create(endpt, session->pool, &session->stream_info[i], + (transports?transports[i]:NULL), session, &session->stream[i]); if (status == PJ_SUCCESS) diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c index 29430ccd..36acd1a4 100644 --- a/pjmedia/src/pjmedia/stream.c +++ b/pjmedia/src/pjmedia/stream.c @@ -85,32 +85,18 @@ struct pjmedia_stream pjmedia_dir dir; /**< Stream direction. */ void *user_data; /**< User data. */ + pjmedia_transport *transport; /**< Stream transport. */ + pjmedia_codec *codec; /**< Codec instance being used. */ pjmedia_codec_param codec_param; /**< Codec param. */ unsigned frame_size; /**< Size of encoded base frame.*/ pj_mutex_t *jb_mutex; pjmedia_jbuf *jb; /**< Jitter buffer. */ - pjmedia_sock_info skinfo; /**< Transport info. */ - pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address. */ - pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address. */ - - pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ - pj_ioqueue_key_t *rtp_key; /**< RTP ioqueue key. */ - pj_ioqueue_op_key_t rtp_op_key; /**< The pending read op key. */ - pj_sockaddr_in rtp_src_addr; /**< addr of src pkt from remote*/ - unsigned rtp_src_cnt; /**< if different, # of pkt rcv */ - int rtp_addrlen; /**< Address length. */ - - pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */ - pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */ - pj_size_t rtcp_pkt_size; /**< Size of RTCP packet buf. */ - char rtcp_pkt[512]; /**< RTCP packet buffer. */ pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ - int rtcp_addrlen; /**< Address length. */ /* RFC 2833 DTMF transmission queue: */ int tx_event_pt; /**< Outgoing pt for dtmf. */ @@ -423,24 +409,12 @@ static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp) } else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) { pjmedia_rtcp_pkt *rtcp_pkt; - pj_ssize_t size; int len; - pj_status_t status; pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); - size = len; - status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, - &stream->rem_rtcp_addr, - sizeof(stream->rem_rtcp_addr)); -#if 0 - if (status != PJ_SUCCESS) { - char errmsg[PJ_ERR_MSG_SIZE]; - - pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(port->info.name.ptr, "Error sending RTCP: %s [%d]", - errmsg, status)); - } -#endif + + (*stream->transport->op->send_rtcp)(stream->transport, + rtcp_pkt, len); stream->rtcp_last_tx = timestamp; } @@ -586,11 +560,10 @@ static pj_status_t put_frame( pjmedia_port *port, /* Send. */ sent = frame_out.size+sizeof(pjmedia_rtp_hdr); - status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt, - &sent, 0, &stream->rem_rtp_addr, - sizeof(stream->rem_rtp_addr)); - if (status != PJ_SUCCESS) - return status; + + (*stream->transport->op->send_rtp)(stream->transport, + channel->out_pkt, sent); + /* Update stat */ pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size); @@ -682,249 +655,150 @@ static void handle_incoming_dtmf( pjmedia_stream *stream, /* - * This callback is called by ioqueue framework on receipt of packets + * This callback is called by stream transport on receipt of packets * in the RTP socket. */ -static void on_rx_rtp( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, +static void on_rx_rtp( pjmedia_stream *stream, + const void *pkt, pj_ssize_t bytes_read) { - pjmedia_stream *stream = pj_ioqueue_get_user_data(key); pjmedia_channel *channel = stream->dec; + const pjmedia_rtp_hdr *hdr; + const void *payload; + unsigned payloadlen; + pjmedia_rtp_status seq_st; pj_status_t status; - - PJ_UNUSED_ARG(op_key); - - - /* - * Loop while we have packet. - */ - do { - const pjmedia_rtp_hdr *hdr; - const void *payload; - unsigned payloadlen; - pjmedia_rtp_status seq_st; - - /* Go straight to read next packet if bytes_read == 0. - */ - if (bytes_read == 0) - goto read_next_packet; - - if (bytes_read < 0) - goto read_next_packet; - - /* Update RTP and RTCP session. */ - status = pjmedia_rtp_decode_rtp(&channel->rtp, - channel->in_pkt, bytes_read, - &hdr, &payload, &payloadlen); - if (status != PJ_SUCCESS) { - LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); - goto read_next_packet; - } - - - /* Inform RTCP session */ - pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), - pj_ntohl(hdr->ts), payloadlen); - - /* Handle incoming DTMF. */ - if (hdr->pt == stream->rx_event_pt) { - handle_incoming_dtmf(stream, payload, payloadlen); - goto read_next_packet; - } - - /* Update RTP session (also checks if RTP session can accept - * the incoming packet. - */ - pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); - if (seq_st.status.value) { - TRC_ ((stream->port.info.name.ptr, - "RTP status: badpt=%d, badssrc=%d, dup=%d, " - "outorder=%d, probation=%d, restart=%d", - seq_st.status.flag.badpt, - seq_st.status.flag.badssrc, - seq_st.status.flag.dup, - seq_st.status.flag.outorder, - seq_st.status.flag.probation, - seq_st.status.flag.restart)); - - if (seq_st.status.flag.badpt) { - PJ_LOG(4,(stream->port.info.name.ptr, - "Bad RTP pt %d (expecting %d)", - hdr->pt, channel->rtp.out_pt)); - } - } + /* Update RTP and RTCP session. */ + status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, + &hdr, &payload, &payloadlen); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); + return; + } - /* Skip bad RTP packet */ - if (seq_st.status.flag.bad) - goto read_next_packet; + /* Inform RTCP session */ + pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts), payloadlen); - /* See if source address of RTP packet is different than the - * configured address. - */ - if ((stream->rem_rtp_addr.sin_addr.s_addr != - stream->rtp_src_addr.sin_addr.s_addr) || - (stream->rem_rtp_addr.sin_port != stream->rtp_src_addr.sin_port)) - { - stream->rtp_src_cnt++; + /* Handle incoming DTMF. */ + if (hdr->pt == stream->rx_event_pt) { + handle_incoming_dtmf(stream, payload, payloadlen); + return; + } - if (stream->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { - - stream->rem_rtp_addr = stream->rtp_src_addr; - stream->rtp_src_cnt = 0; - PJ_LOG(4,(stream->port.info.name.ptr, - "Remote RTP address switched to %s:%d", - pj_inet_ntoa(stream->rtp_src_addr.sin_addr), - pj_ntohs(stream->rtp_src_addr.sin_port))); - } + /* Update RTP session (also checks if RTP session can accept + * the incoming packet. + */ + pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); + if (seq_st.status.value) { + TRC_ ((stream->port.info.name.ptr, + "RTP status: badpt=%d, badssrc=%d, dup=%d, " + "outorder=%d, probation=%d, restart=%d", + seq_st.status.flag.badpt, + seq_st.status.flag.badssrc, + seq_st.status.flag.dup, + seq_st.status.flag.outorder, + seq_st.status.flag.probation, + seq_st.status.flag.restart)); + + if (seq_st.status.flag.badpt) { + PJ_LOG(4,(stream->port.info.name.ptr, + "Bad RTP pt %d (expecting %d)", + hdr->pt, channel->rtp.out_pt)); } + } + /* Skip bad RTP packet */ + if (seq_st.status.flag.bad) + return; - /* Put "good" packet to jitter buffer, or reset the jitter buffer - * when RTP session is restarted. - */ - pj_mutex_lock( stream->jb_mutex ); - if (seq_st.status.flag.restart) { - status = pjmedia_jbuf_reset(stream->jb); - PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); - - } else { - /* - * Packets may contain more than one frames, while the jitter - * buffer can only take one frame per "put" operation. So we need - * to ask the codec to "parse" the payload into multiple frames. - */ - enum { MAX = 16 }; - pj_timestamp ts; - unsigned i, count = MAX; - unsigned samples_per_frame; - pjmedia_frame frames[MAX]; - - /* Get the timestamp of the first sample */ - ts.u64 = pj_ntohl(hdr->ts); - - /* Parse the payload. */ - status = (*stream->codec->op->parse)(stream->codec, - (void*)payload, - payloadlen, - &ts, - &count, - frames); - if (status != PJ_SUCCESS) { - LOGERR_((stream->port.info.name.ptr, - "Codec parse() error", - status)); - count = 0; - } - - /* Put each frame to jitter buffer. */ - samples_per_frame = stream->codec_param.info.frm_ptime * - stream->codec_param.info.clock_rate * - stream->codec_param.info.channel_cnt / - 1000; - - for (i=0; i<count; ++i) { - unsigned ext_seq; - - ext_seq = (unsigned)(frames[i].timestamp.u64 / - samples_per_frame); - pjmedia_jbuf_put_frame(stream->jb, frames[i].buf, - frames[i].size, ext_seq); - - } - } - pj_mutex_unlock( stream->jb_mutex ); - + /* Put "good" packet to jitter buffer, or reset the jitter buffer + * when RTP session is restarted. + */ + pj_mutex_lock( stream->jb_mutex ); + if (seq_st.status.flag.restart) { + status = pjmedia_jbuf_reset(stream->jb); + PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); - /* Check if now is the time to transmit RTCP SR/RR report. - * We only do this when stream direction is "decoding only", - * because otherwise check_tx_rtcp() will be handled by put_frame() + } else { + /* + * Packets may contain more than one frames, while the jitter + * buffer can only take one frame per "put" operation. So we need + * to ask the codec to "parse" the payload into multiple frames. */ - if (stream->dir == PJMEDIA_DIR_DECODING) { - check_tx_rtcp(stream, pj_ntohl(hdr->ts)); - } - - if (status != 0) { - LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", - status)); - goto read_next_packet; + enum { MAX = 16 }; + pj_timestamp ts; + unsigned i, count = MAX; + unsigned samples_per_frame; + pjmedia_frame frames[MAX]; + + /* Get the timestamp of the first sample */ + ts.u64 = pj_ntohl(hdr->ts); + + /* Parse the payload. */ + status = (*stream->codec->op->parse)(stream->codec, + (void*)payload, + payloadlen, + &ts, + &count, + frames); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, + "Codec parse() error", + status)); + count = 0; } + /* Put each frame to jitter buffer. */ + samples_per_frame = stream->codec_param.info.frm_ptime * + stream->codec_param.info.clock_rate * + stream->codec_param.info.channel_cnt / + 1000; + + for (i=0; i<count; ++i) { + unsigned ext_seq; -read_next_packet: - bytes_read = channel->in_pkt_size; - stream->rtp_addrlen = sizeof(stream->rtp_src_addr); - status = pj_ioqueue_recvfrom( stream->rtp_key, - &stream->rtp_op_key, - channel->in_pkt, - &bytes_read, 0, - &stream->rtp_src_addr, - &stream->rtp_addrlen); + ext_seq = (unsigned)(frames[i].timestamp.u64 / + samples_per_frame); + pjmedia_jbuf_put_frame(stream->jb, frames[i].buf, + frames[i].size, ext_seq); - if (status != PJ_SUCCESS) { - bytes_read = -status; } + } + pj_mutex_unlock( stream->jb_mutex ); - } while (status == PJ_SUCCESS || - status == PJ_STATUS_FROM_OS(OSERR_ECONNRESET)); - if (status != PJ_SUCCESS && status != PJ_EPENDING) { - char errmsg[PJ_ERR_MSG_SIZE]; + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is "decoding only", + * because otherwise check_tx_rtcp() will be handled by put_frame() + */ + if (stream->dir == PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(hdr->ts)); + } - pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(stream->port.info.name.ptr, - "Error reading RTP packet: %s [status=%d]. " - "RTP stream thread quitting!", - errmsg, status)); + if (status != 0) { + LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", + status)); + return; } } /* - * This callback is called by ioqueue framework on receipt of packets + * This callback is called by stream transport on receipt of packets * in the RTCP socket. */ -static void on_rx_rtcp( pj_ioqueue_key_t *key, - pj_ioqueue_op_key_t *op_key, +static void on_rx_rtcp( pjmedia_stream *stream, + const void *pkt, pj_ssize_t bytes_read) { - pjmedia_stream *stream = pj_ioqueue_get_user_data(key); - pj_status_t status; - - PJ_UNUSED_ARG(op_key); - - do { - if (bytes_read > 0) { - pjmedia_rtcp_rx_rtcp(&stream->rtcp, stream->rtcp_pkt, - bytes_read); - } - - bytes_read = stream->rtcp_pkt_size; - stream->rtcp_addrlen = sizeof(stream->rem_rtcp_addr); - status = pj_ioqueue_recvfrom( stream->rtcp_key, - &stream->rtcp_op_key, - stream->rtcp_pkt, - &bytes_read, 0, - &stream->rem_rtcp_addr, - &stream->rtcp_addrlen); - - } while (status == PJ_SUCCESS); - - if (status != PJ_SUCCESS && status != PJ_EPENDING) { - char errmsg[PJ_ERR_MSG_SIZE]; - - pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(stream->port.info.name.ptr, - "Error reading RTCP packet: %s [status=%d]", - errmsg, status)); - } - + pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); } @@ -993,13 +867,12 @@ static pj_status_t create_channel( pj_pool_t *pool, PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, pj_pool_t *pool, const pjmedia_stream_info *info, + pjmedia_transport *tp, void *user_data, pjmedia_stream **p_stream) { pjmedia_stream *stream; - pj_ioqueue_callback ioqueue_cb; - pj_uint16_t rtcp_port; unsigned jb_init, jb_max, jb_min_pre, jb_max_pre; pj_status_t status; @@ -1036,11 +909,6 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); stream->dir = info->dir; stream->user_data = user_data; - stream->skinfo = info->sock_info; - stream->rem_rtp_addr = info->rem_addr; - rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1); - stream->rem_rtcp_addr = stream->rem_rtp_addr; - stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port); stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) * info->fmt.clock_rate / 1000; @@ -1048,6 +916,15 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; stream->last_dtmf = -1; + /* Attach transport */ + status = (*tp->op->attach)(tp, stream, &info->rem_addr, + sizeof(info->rem_addr), &on_rx_rtp, + &on_rx_rtcp); + if (status != PJ_SUCCESS) + goto err_cleanup; + + stream->transport = tp; + /* Create mutex to protect jitter buffer: */ @@ -1160,45 +1037,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, if (status != PJ_SUCCESS) goto err_cleanup; - /* Register RTP socket to ioqueue */ - pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); - ioqueue_cb.on_read_complete = &on_rx_rtp; - - status = pj_ioqueue_register_sock( pool, - pjmedia_endpt_get_ioqueue(endpt), - stream->skinfo.rtp_sock, - stream, &ioqueue_cb, &stream->rtp_key); - if (status != PJ_SUCCESS) - goto err_cleanup; - - /* Init pending operation key. */ - pj_ioqueue_op_key_init(&stream->rtp_op_key, sizeof(stream->rtp_op_key)); - - /* Bootstrap the first recvfrom() operation. */ - on_rx_rtp( stream->rtp_key, &stream->rtp_op_key, 0); - - - /* Register RTCP socket to ioqueue. */ - if (stream->skinfo.rtcp_sock != PJ_INVALID_SOCKET) { - pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb)); - ioqueue_cb.on_read_complete = &on_rx_rtcp; - status = pj_ioqueue_register_sock( pool, - pjmedia_endpt_get_ioqueue(endpt), - stream->skinfo.rtcp_sock, - stream, &ioqueue_cb, - &stream->rtcp_key); - if (status != PJ_SUCCESS) - goto err_cleanup; - } - - /* Init pending operation key. */ - pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); - - stream->rtcp_pkt_size = sizeof(stream->rtcp_pkt); - - /* Bootstrap the first recvfrom() operation. */ - on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); /* Success! */ *p_stream = stream; @@ -1227,14 +1066,10 @@ PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) pj_mutex_lock(stream->jb_mutex); - /* Unregister from ioqueue. */ - if (stream->rtp_key) { - pj_ioqueue_unregister(stream->rtp_key); - stream->rtp_key = NULL; - } - if (stream->rtcp_key) { - pj_ioqueue_unregister(stream->rtcp_key); - stream->rtcp_key = NULL; + /* Detach from transport */ + if (stream->transport) { + (*stream->transport->op->detach)(stream->transport, stream); + stream->transport = NULL; } /* Free codec. */ @@ -1269,6 +1104,15 @@ PJ_DEF(pj_status_t) pjmedia_stream_get_port( pjmedia_stream *stream, /* + * Get the transport object + */ +PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st) +{ + return st->transport; +} + + +/* * Start stream. */ PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) diff --git a/pjmedia/src/pjmedia/transport_udp.c b/pjmedia/src/pjmedia/transport_udp.c new file mode 100644 index 00000000..c1936cfe --- /dev/null +++ b/pjmedia/src/pjmedia/transport_udp.c @@ -0,0 +1,487 @@ +/* $Id$ */ +/* + * Copyright (C) 2003-2006 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pjmedia/transport_udp.h> +#include <pj/assert.h> +#include <pj/errno.h> +#include <pj/ioqueue.h> +#include <pj/log.h> +#include <pj/string.h> + + +/* Maximum size of incoming RTP packet */ +#define RTP_LEN 1500 + +/* Maximum size of incoming RTCP packet */ +#define RTCP_LEN 600 + + +struct transport_udp +{ + pjmedia_transport base; /**< Base transport. */ + + pj_pool_t *pool; /**< Memory pool */ + + pjmedia_stream *stream; /**< Stream user (may be NULL) */ + pj_sockaddr_in rem_rtp_addr; /**< Remote RTP address */ + pj_sockaddr_in rem_rtcp_addr; /**< Remote RTCP address */ + void (*rtp_cb)( pjmedia_stream*,/**< To report incoming RTP. */ + const void*, + pj_ssize_t); + void (*rtcp_cb)( pjmedia_stream*,/**< To report incoming RTCP. */ + const void*, + pj_ssize_t); + + pj_sock_t rtp_sock; /**< RTP socket */ + pj_ioqueue_key_t *rtp_key; /**< RTP socket key in ioqueue */ + pj_ioqueue_op_key_t rtp_read_op; /**< Pending read operation */ + pj_ioqueue_op_key_t rtp_write_op; /**< Pending write operation */ + pj_sockaddr_in rtp_src_addr; /**< Actual packet src addr. */ + unsigned rtp_src_cnt; /**< How many pkt from this addr. */ + int rtp_addrlen; /**< Address length. */ + char rtp_pkt[RTP_LEN];/**< Incoming RTP packet buffer */ + + pj_sock_t rtcp_sock; /**< RTCP socket */ + pj_ioqueue_key_t *rtcp_key; /**< RTCP socket key in ioqueue */ + pj_ioqueue_op_key_t rtcp_read_op; /**< Pending read operation */ + pj_ioqueue_op_key_t rtcp_write_op; /**< Pending write operation */ + char rtcp_pkt[RTCP_LEN];/**< Incoming RTCP packet buffer */ +}; + + + +static void on_rx_rtp( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); +static void on_rx_rtcp(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read); + +static pj_status_t transport_attach( pjmedia_transport *tp, + pjmedia_stream *strm, + const pj_sockaddr_t *rem_addr, + unsigned addr_len, + void (*rtp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t), + void (*rtcp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t)); +static void transport_detach( pjmedia_transport *tp, + pjmedia_stream *strm); +static pj_status_t transport_send_rtp( pjmedia_transport *tp, + const void *pkt, + pj_size_t size); +static pj_status_t transport_send_rtcp(pjmedia_transport *tp, + const void *pkt, + pj_size_t size); + + +static pjmedia_transport_op transport_udp_op = +{ + &transport_attach, + &transport_detach, + &transport_send_rtp, + &transport_send_rtcp, +}; + + +/** + * Create UDP stream transport. + */ +PJ_DEF(pj_status_t) pjmedia_transport_udp_create( pjmedia_endpt *endpt, + const char *name, + int port, + pjmedia_transport **p_tp) +{ + pjmedia_sock_info si; + pj_status_t status; + + + /* Sanity check */ + PJ_ASSERT_RETURN(endpt && port && p_tp, PJ_EINVAL); + + + pj_memset(&si, 0, sizeof(pjmedia_sock_info)); + si.rtp_sock = si.rtcp_sock = PJ_INVALID_SOCKET; + + /* Create RTP socket */ + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtp_sock); + if (status != PJ_SUCCESS) + goto on_error; + + /* Bind RTP socket */ + si.rtp_addr_name.sin_family = PJ_AF_INET; + si.rtp_addr_name.sin_port = pj_htons((pj_uint16_t)port); + + status = pj_sock_bind(si.rtp_sock, &si.rtp_addr_name, + sizeof(si.rtp_addr_name)); + if (status != PJ_SUCCESS) + goto on_error; + + + /* Create RTCP socket */ + status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, &si.rtcp_sock); + if (status != PJ_SUCCESS) + goto on_error; + + /* Bind RTCP socket */ + si.rtcp_addr_name.sin_family = PJ_AF_INET; + si.rtcp_addr_name.sin_port = pj_htons((pj_uint16_t)(port+1)); + + status = pj_sock_bind(si.rtcp_sock, &si.rtcp_addr_name, + sizeof(si.rtcp_addr_name)); + if (status != PJ_SUCCESS) + goto on_error; + + + /* Create UDP transport by attaching socket info */ + return pjmedia_transport_udp_attach( endpt, name, &si, p_tp); + + +on_error: + if (si.rtp_sock != PJ_INVALID_SOCKET) + pj_sock_close(si.rtp_sock); + if (si.rtcp_sock != PJ_INVALID_SOCKET) + pj_sock_close(si.rtcp_sock); + return status; +} + + +/** + * Create UDP stream transport from existing socket info. + */ +PJ_DEF(pj_status_t) pjmedia_transport_udp_attach( pjmedia_endpt *endpt, + const char *name, + const pjmedia_sock_info *si, + pjmedia_transport **p_tp) +{ + struct transport_udp *tp; + pj_pool_t *pool; + pj_ioqueue_t *ioqueue; + pj_ioqueue_callback rtp_cb, rtcp_cb; + pj_ssize_t size; + pj_status_t status; + + + /* Sanity check */ + PJ_ASSERT_RETURN(endpt && si && p_tp, PJ_EINVAL); + + /* Check name */ + if (!name) + name = "udpmedia"; + + /* Get ioqueue instance */ + ioqueue = pjmedia_endpt_get_ioqueue(endpt); + + + /* Create transport structure */ + pool = pjmedia_endpt_create_pool(endpt, name, 4000, 4000); + if (!pool) + return PJ_ENOMEM; + + tp = pj_pool_zalloc(pool, sizeof(struct transport_udp)); + tp->pool = pool; + pj_ansi_strcpy(tp->base.name, name); + tp->base.op = &transport_udp_op; + + /* Copy socket infos */ + tp->rtp_sock = si->rtp_sock; + tp->rtcp_sock = si->rtcp_sock; + + + /* Setup RTP socket with the ioqueue */ + pj_memset(&rtp_cb, 0, sizeof(rtp_cb)); + rtp_cb.on_read_complete = &on_rx_rtp; + + status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtp_sock, tp, + &rtp_cb, &tp->rtp_key); + if (status != PJ_SUCCESS) + goto on_error; + + pj_ioqueue_op_key_init(&tp->rtp_read_op, sizeof(tp->rtp_read_op)); + pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op)); + + /* Kick of pending RTP read from the ioqueue */ + tp->rtp_addrlen = sizeof(tp->rtp_src_addr); + size = sizeof(tp->rtp_pkt); + status = pj_ioqueue_recvfrom(tp->rtp_key, &tp->rtp_read_op, + tp->rtp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC, + &tp->rtp_src_addr, &tp->rtp_addrlen); + if (status != PJ_EPENDING) + goto on_error; + + + /* Setup RTCP socket with ioqueue */ + pj_memset(&rtcp_cb, 0, sizeof(rtcp_cb)); + rtcp_cb.on_read_complete = &on_rx_rtcp; + + status = pj_ioqueue_register_sock(pool, ioqueue, tp->rtcp_sock, tp, + &rtcp_cb, &tp->rtcp_key); + if (status != PJ_SUCCESS) + goto on_error; + + pj_ioqueue_op_key_init(&tp->rtcp_read_op, sizeof(tp->rtcp_read_op)); + pj_ioqueue_op_key_init(&tp->rtcp_write_op, sizeof(tp->rtcp_write_op)); + + + /* Kick of pending RTCP read from the ioqueue */ + size = sizeof(tp->rtcp_pkt); + status = pj_ioqueue_recv(tp->rtcp_key, &tp->rtcp_read_op, + tp->rtcp_pkt, &size, PJ_IOQUEUE_ALWAYS_ASYNC); + if (status != PJ_EPENDING) + goto on_error; + + + /* Done */ + *p_tp = &tp->base; + return PJ_SUCCESS; + + +on_error: + pjmedia_transport_udp_close(&tp->base); + return status; +} + + +/** + * Close UDP transport. + */ +PJ_DEF(pj_status_t) pjmedia_transport_udp_close(pjmedia_transport *tp) +{ + struct transport_udp *udp = (struct transport_udp*) tp; + + /* Sanity check */ + PJ_ASSERT_RETURN(tp, PJ_EINVAL); + + /* Must not close while stream is using this */ + PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + + + if (udp->rtp_key) { + pj_ioqueue_unregister(udp->rtp_key); + udp->rtp_key = NULL; + } else if (udp->rtp_sock != PJ_INVALID_SOCKET) { + pj_sock_close(udp->rtp_sock); + udp->rtp_sock = PJ_INVALID_SOCKET; + } + + if (udp->rtcp_key) { + pj_ioqueue_unregister(udp->rtcp_key); + udp->rtcp_key = NULL; + } else if (udp->rtcp_sock != PJ_INVALID_SOCKET) { + pj_sock_close(udp->rtcp_sock); + udp->rtcp_sock = PJ_INVALID_SOCKET; + } + + pj_pool_release(udp->pool); + + return PJ_SUCCESS; +} + + +/* Notification from ioqueue about incoming RTP packet */ +static void on_rx_rtp( pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + struct transport_udp *udp; + pj_status_t status; + + PJ_UNUSED_ARG(op_key); + + udp = pj_ioqueue_get_user_data(key); + + do { + void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); + pjmedia_stream *stream; + + cb = udp->rtp_cb; + stream = udp->stream; + + if (bytes_read > 0 && cb && stream) + (*cb)(stream, udp->rtp_pkt, bytes_read); + + /* See if source address of RTP packet is different than the + * configured address. + */ + if ((udp->rem_rtp_addr.sin_addr.s_addr != + udp->rtp_src_addr.sin_addr.s_addr) || + (udp->rem_rtp_addr.sin_port != + udp->rtp_src_addr.sin_port)) + { + udp->rtp_src_cnt++; + + if (udp->rtp_src_cnt >= PJMEDIA_RTP_NAT_PROBATION_CNT) { + + udp->rem_rtp_addr = udp->rtp_src_addr; + udp->rtp_src_cnt = 0; + + PJ_LOG(4,(udp->base.name, + "Remote RTP address switched to %s:%d", + pj_inet_ntoa(udp->rtp_src_addr.sin_addr), + pj_ntohs(udp->rtp_src_addr.sin_port))); + } + } + + bytes_read = sizeof(udp->rtp_pkt); + udp->rtp_addrlen = sizeof(pj_sockaddr_in); + status = pj_ioqueue_recvfrom(udp->rtp_key, &udp->rtp_read_op, + udp->rtp_pkt, &bytes_read, 0, + &udp->rtp_src_addr, + &udp->rtp_addrlen); + + } while (status == PJ_SUCCESS); +} + + +/* Notification from ioqueue about incoming RTCP packet */ +static void on_rx_rtcp(pj_ioqueue_key_t *key, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t bytes_read) +{ + struct transport_udp *udp; + pj_status_t status; + + PJ_UNUSED_ARG(op_key); + + udp = pj_ioqueue_get_user_data(key); + + do { + void (*cb)(pjmedia_stream*,const void*,pj_ssize_t); + pjmedia_stream *stream; + + cb = udp->rtcp_cb; + stream = udp->stream; + + if (bytes_read > 0 && cb && stream) + (*cb)(stream, udp->rtcp_pkt, bytes_read); + + bytes_read = sizeof(udp->rtcp_pkt); + status = pj_ioqueue_recv(udp->rtcp_key, &udp->rtcp_read_op, + udp->rtcp_pkt, &bytes_read, 0); + + } while (status == PJ_SUCCESS); +} + + +/* Called by stream to initialize the transport */ +static pj_status_t transport_attach( pjmedia_transport *tp, + pjmedia_stream *strm, + const pj_sockaddr_t *rem_addr, + unsigned addr_len, + void (*rtp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t), + void (*rtcp_cb)(pjmedia_stream*, + const void*, + pj_ssize_t)) +{ + struct transport_udp *udp = (struct transport_udp*) tp; + + /* Validate arguments */ + PJ_ASSERT_RETURN(tp && strm && rem_addr && addr_len, PJ_EINVAL); + + /* Remote address must be Internet address */ + PJ_ASSERT_RETURN(addr_len == sizeof(pj_sockaddr_in) && + ((pj_sockaddr_in*)rem_addr)->sin_family == PJ_AF_INET, + PJ_EINVAL); + + /* Must not be "attached" to existing stream */ + PJ_ASSERT_RETURN(udp->stream == NULL, PJ_EINVALIDOP); + + /* "Attach" the stream: */ + + /* Copy remote RTP address */ + pj_memcpy(&udp->rem_rtp_addr, rem_addr, sizeof(pj_sockaddr_in)); + + /* Guess RTCP address from RTP address */ + pj_memcpy(&udp->rem_rtcp_addr, rem_addr, sizeof(pj_sockaddr_in)); + udp->rem_rtcp_addr.sin_port = (pj_uint16_t) pj_htons((pj_uint16_t)( + pj_ntohs(udp->rem_rtp_addr.sin_port)+1)); + + /* Save the callbacks */ + udp->rtp_cb = rtp_cb; + udp->rtcp_cb = rtcp_cb; + + /* Last, save the stream to mark that we have a "client" */ + udp->stream = strm; + + return PJ_SUCCESS; +} + + +/* Called by stream when it no longer needs the transport */ +static void transport_detach( pjmedia_transport *tp, + pjmedia_stream *strm) +{ + struct transport_udp *udp = (struct transport_udp*) tp; + + pj_assert(tp && strm); + + /* Clear up stream infos from transport */ + udp->stream = NULL; + udp->rtp_cb = NULL; + udp->rtcp_cb = NULL; +} + + +/* Called by stream to send RTP packet */ +static pj_status_t transport_send_rtp( pjmedia_transport *tp, + const void *pkt, + pj_size_t size) +{ + struct transport_udp *udp = (struct transport_udp*)tp; + pj_ssize_t sent; + pj_status_t status; + + PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + + sent = size; + status = pj_ioqueue_sendto( udp->rtp_key, &udp->rtp_write_op, + pkt, &sent, 0, + &udp->rem_rtp_addr, sizeof(pj_sockaddr_in)); + + if (status==PJ_SUCCESS || status==PJ_EPENDING) + return PJ_SUCCESS; + + return status; +} + +/* Called by stream to send RTCP packet */ +static pj_status_t transport_send_rtcp(pjmedia_transport *tp, + const void *pkt, + pj_size_t size) +{ + struct transport_udp *udp = (struct transport_udp*)tp; + pj_ssize_t sent; + pj_status_t status; + + PJ_ASSERT_RETURN(udp->stream, PJ_EINVALIDOP); + + sent = size; + status = pj_ioqueue_sendto( udp->rtcp_key, &udp->rtcp_write_op, + pkt, &sent, 0, + &udp->rem_rtcp_addr, sizeof(pj_sockaddr_in)); + + if (status==PJ_SUCCESS || status==PJ_EPENDING) + return PJ_SUCCESS; + + return status; +} + diff --git a/pjsip-apps/src/samples/simpleua.c b/pjsip-apps/src/samples/simpleua.c index c55c3e99..2cf7de7b 100644 --- a/pjsip-apps/src/samples/simpleua.c +++ b/pjsip-apps/src/samples/simpleua.c @@ -73,6 +73,7 @@ static pj_caching_pool cp; /* Global pool factory. */ static pjmedia_endpt *g_med_endpt; /* Media endpoint. */ static pjmedia_sock_info g_med_skinfo; /* Socket info for media */ +static pjmedia_transport *g_med_transport;/* Media stream transport */ /* Call variables: */ static pjsip_inv_session *g_inv; /* Current invite session. */ @@ -280,6 +281,14 @@ int main(int argc, char *argv[]) g_med_skinfo.rtcp_addr_name = g_med_skinfo.rtp_addr_name; + /* Create media transport */ + status = pjmedia_transport_udp_attach(g_med_endpt, NULL, &g_med_skinfo, + &g_med_transport); + if (status != PJ_SUCCESS) { + app_perror(THIS_FILE, "Unable to create media transport", status); + return 1; + } + /* * If URL is specified, then make call immediately. */ @@ -611,8 +620,8 @@ static void call_on_media_update( pjsip_inv_session *inv, /* Create session info based on the two SDPs. * We only support one stream per session for now. */ - status = pjmedia_session_info_from_sdp(inv->dlg->pool, g_med_endpt, 1, - &sess_info, &g_med_skinfo, + status = pjmedia_session_info_from_sdp(inv->dlg->pool, g_med_endpt, + 1, &sess_info, local_sdp, remote_sdp); if (status != PJ_SUCCESS) { app_perror( THIS_FILE, "Unable to create media session", status); @@ -629,7 +638,7 @@ static void call_on_media_update( pjsip_inv_session *inv, * The media session is active immediately. */ status = pjmedia_session_create( g_med_endpt, &sess_info, - NULL, &g_med_session ); + &g_med_transport, NULL, &g_med_session ); if (status != PJ_SUCCESS) { app_perror( THIS_FILE, "Unable to create media session", status); return; diff --git a/pjsip-apps/src/samples/siprtp.c b/pjsip-apps/src/samples/siprtp.c index 8b896b27..14830640 100644 --- a/pjsip-apps/src/samples/siprtp.c +++ b/pjsip-apps/src/samples/siprtp.c @@ -1276,7 +1276,7 @@ static void call_on_media_update( pjsip_inv_session *inv, pjmedia_sdp_neg_get_active_remote(inv->neg, &remote_sdp); status = pjmedia_stream_info_from_sdp(&audio->si, inv->pool, app.med_endpt, - NULL, local_sdp, remote_sdp, 0); + local_sdp, remote_sdp, 0); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error creating stream info from SDP", status); return; diff --git a/pjsip-apps/src/samples/streamutil.c b/pjsip-apps/src/samples/streamutil.c index d87dae74..aea56178 100644 --- a/pjsip-apps/src/samples/streamutil.c +++ b/pjsip-apps/src/samples/streamutil.c @@ -110,6 +110,7 @@ static pj_status_t create_stream( pj_pool_t *pool, pjmedia_stream **p_stream ) { pjmedia_stream_info info; + pjmedia_transport *transport; pj_status_t status; @@ -129,58 +130,23 @@ static pj_status_t create_stream( pj_pool_t *pool, pj_memcpy(&info.rem_addr, rem_addr, sizeof(pj_sockaddr_in)); - /* Create RTP socket */ - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, - &info.sock_info.rtp_sock); - PJ_ASSERT_RETURN(status == PJ_SUCCESS, status); - - - /* Bind RTP socket to local port */ - info.sock_info.rtp_addr_name.sin_family = PJ_AF_INET; - info.sock_info.rtp_addr_name.sin_port = pj_htons(local_port); - - status = pj_sock_bind(info.sock_info.rtp_sock, - &info.sock_info.rtp_addr_name, - sizeof(pj_sockaddr_in)); - if (status != PJ_SUCCESS) { - app_perror(THIS_FILE, "Unable to bind RTP socket", status); - pj_sock_close(info.sock_info.rtp_sock); - return status; - } - - - /* Create RTCP socket */ - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, - &info.sock_info.rtcp_sock); - PJ_ASSERT_RETURN(status == PJ_SUCCESS, status); - - - /* Bind RTP socket to local port + 1 */ - ++local_port; - info.sock_info.rtcp_addr_name.sin_family = PJ_AF_INET; - info.sock_info.rtcp_addr_name.sin_port = pj_htons(local_port); - - status = pj_sock_bind(info.sock_info.rtcp_sock, - &info.sock_info.rtcp_addr_name, - sizeof(pj_sockaddr_in)); - if (status != PJ_SUCCESS) { - app_perror(THIS_FILE, "Unable to bind RTCP socket", status); - pj_sock_close(info.sock_info.rtp_sock); - pj_sock_close(info.sock_info.rtcp_sock); + /* Create media transport */ + status = pjmedia_transport_udp_create(med_endpt, NULL, local_port, + &transport); + if (status != PJ_SUCCESS) return status; - } /* Now that the stream info is initialized, we can create the * stream. */ - status = pjmedia_stream_create( med_endpt, pool, &info, NULL, p_stream); + status = pjmedia_stream_create( med_endpt, pool, &info, + transport, NULL, p_stream); if (status != PJ_SUCCESS) { app_perror(THIS_FILE, "Error creating stream", status); - pj_sock_close(info.sock_info.rtp_sock); - pj_sock_close(info.sock_info.rtcp_sock); + pjmedia_transport_udp_close(transport); return status; } @@ -211,7 +177,7 @@ int main(int argc, char *argv[]) pjmedia_stream *stream = NULL; pjmedia_port *stream_port; char tmp[10]; - pj_status_t status; + pj_status_t status; /* Default values */ @@ -513,7 +479,11 @@ on_exit: /* Destroy stream */ if (stream) { + pjmedia_transport *tp; + + tp = pjmedia_stream_get_transport(stream); pjmedia_stream_destroy(stream); + pjmedia_transport_udp_close(tp); } /* Destroy file ports */ diff --git a/pjsip/include/pjsua-lib/pjsua.h b/pjsip/include/pjsua-lib/pjsua.h index 18467129..a3c0c3a4 100644 --- a/pjsip/include/pjsua-lib/pjsua.h +++ b/pjsip/include/pjsua-lib/pjsua.h @@ -102,6 +102,7 @@ struct pjsua_call pjsip_evsub *xfer_sub; /**< Xfer server subscription, if this call was triggered by xfer. */ pjmedia_sock_info skinfo; /**< Preallocated media sockets. */ + pjmedia_transport *med_tp; /**< Media transport. */ void *app_data; /**< Application data. */ pj_timer_entry refresh_tm;/**< Timer to send re-INVITE. */ pj_timer_entry hangup_tm; /**< Timer to hangup call. */ diff --git a/pjsip/src/pjsua-lib/pjsua_call.c b/pjsip/src/pjsua-lib/pjsua_call.c index 73f5aac1..4dbfb596 100644 --- a/pjsip/src/pjsua-lib/pjsua_call.c +++ b/pjsip/src/pjsua-lib/pjsua_call.c @@ -97,26 +97,6 @@ static void schedule_call_timer( pjsua_call *call, pj_timer_entry *e, } -/* Close and reopen socket. */ -static pj_status_t reopen_sock( pj_sock_t *sock, pj_sockaddr_in *addr) -{ - pj_status_t status; - - status = pj_sock_socket(PJ_AF_INET, PJ_SOCK_DGRAM, 0, sock); - if (status != PJ_SUCCESS) { - pjsua_perror(THIS_FILE, "Unable to create socket", status); - return status; - } - - status = pj_sock_bind(*sock, addr, sizeof(pj_sockaddr_in)); - if (status != PJ_SUCCESS) { - pjsua_perror(THIS_FILE, "Unable to re-bind RTP/RTCP socket", status); - return status; - } - - return PJ_SUCCESS; -} - /* * Destroy the call's media */ @@ -130,27 +110,8 @@ static pj_status_t call_destroy_media(int call_index) } if (call->session) { - pj_sockaddr_in rtp_addr, rtcp_addr; - int addrlen; - - addrlen = sizeof(rtp_addr); - pj_sock_getsockname(call->skinfo.rtp_sock, &rtp_addr, &addrlen); - - addrlen = sizeof(rtcp_addr); - pj_sock_getsockname(call->skinfo.rtcp_sock, &rtcp_addr, &addrlen); - /* Destroy session (this will also close RTP/RTCP sockets). */ pjmedia_session_destroy(call->session); - - /* Close and reopen RTP socket. - * This is necessary to get the socket unregistered from ioqueue, - * when IOCompletionPort is used. - */ - reopen_sock(&call->skinfo.rtp_sock, &rtp_addr); - - /* Close and reopen RTCP socket too. */ - reopen_sock(&call->skinfo.rtcp_sock, &rtcp_addr); - call->session = NULL; PJ_LOG(3,(THIS_FILE, "Media session for call %d is destroyed", @@ -1011,8 +972,8 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv, * We only support one stream per session at the moment */ status = pjmedia_session_info_from_sdp( call->inv->dlg->pool, - pjsua.med_endpt, 1, - &sess_info, &call->skinfo, + pjsua.med_endpt, + 1,&sess_info, local_sdp, remote_sdp); if (status != PJ_SUCCESS) { pjsua_perror(THIS_FILE, "Unable to create media session", @@ -1035,6 +996,7 @@ static void pjsua_call_on_media_update(pjsip_inv_session *inv, /* Create session based on session info. */ status = pjmedia_session_create( pjsua.med_endpt, &sess_info, + &call->med_tp, call, &call->session ); if (status != PJ_SUCCESS) { pjsua_perror(THIS_FILE, "Unable to create media session", diff --git a/pjsip/src/pjsua-lib/pjsua_core.c b/pjsip/src/pjsua-lib/pjsua_core.c index eafcd78b..5959d065 100644 --- a/pjsip/src/pjsua-lib/pjsua_core.c +++ b/pjsip/src/pjsua-lib/pjsua_core.c @@ -320,7 +320,7 @@ static pj_status_t init_sockets(pj_bool_t sip, PJ_LOG(4,(THIS_FILE, "RTP socket reachable at %s:%d", pj_inet_ntoa(skinfo->rtp_addr_name.sin_addr), pj_ntohs(skinfo->rtp_addr_name.sin_port))); - PJ_LOG(4,(THIS_FILE, "RTCP UDP socket reachable at %s:%d", + PJ_LOG(4,(THIS_FILE, "RTCP socket reachable at %s:%d", pj_inet_ntoa(skinfo->rtcp_addr_name.sin_addr), pj_ntohs(skinfo->rtcp_addr_name.sin_port))); @@ -779,6 +779,10 @@ pj_status_t pjsua_start(void) /* Init sockets (STUN etc): */ for (i=0; i<(int)pjsua.max_calls; ++i) { status = init_sockets(i==0, &pjsua.calls[i].skinfo); + if (status == PJ_SUCCESS) + status = pjmedia_transport_udp_attach(pjsua.med_endpt, NULL, + &pjsua.calls[i].skinfo, + &pjsua.calls[i].med_tp); if (status != PJ_SUCCESS) { pjsua_perror(THIS_FILE, "init_sockets() has returned error", status); @@ -786,8 +790,7 @@ pj_status_t pjsua_start(void) if (i >= 0) pj_sock_close(pjsua.sip_sock); while (i >= 0) { - pj_sock_close(pjsua.calls[i].skinfo.rtp_sock); - pj_sock_close(pjsua.calls[i].skinfo.rtcp_sock); + pjmedia_transport_udp_close(pjsua.calls[i].med_tp); } return status; } @@ -1048,6 +1051,12 @@ pj_status_t pjsua_destroy(void) pjmedia_codec_l16_deinit(); #endif /* PJMEDIA_HAS_L16_CODEC */ + + /* Close transports */ + for (i=0; i<pjsua.call_cnt; ++i) { + pjmedia_transport_udp_close(pjsua.calls[i].med_tp); + } + /* Destroy media endpoint. */ pjmedia_endpt_destroy(pjsua.med_endpt); |