diff options
author | David M. Lee <dlee@digium.com> | 2013-01-07 14:24:28 -0600 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-01-07 14:24:28 -0600 |
commit | f3ab456a17af1c89a6e3be4d20c5944853df1cb0 (patch) | |
tree | d00e1a332cd038a6d906a1ea0ac91e1a4458e617 /pjmedia/src/pjmedia/stream.c |
Import pjproject-2.0.1
Diffstat (limited to 'pjmedia/src/pjmedia/stream.c')
-rw-r--r-- | pjmedia/src/pjmedia/stream.c | 2803 |
1 files changed, 2803 insertions, 0 deletions
diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c new file mode 100644 index 0000000..5c7c873 --- /dev/null +++ b/pjmedia/src/pjmedia/stream.c @@ -0,0 +1,2803 @@ +/* $Id: stream.c 4120 2012-05-12 07:18:09Z ming $ */ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pjmedia/stream.h> +#include <pjmedia/errno.h> +#include <pjmedia/rtp.h> +#include <pjmedia/rtcp.h> +#include <pjmedia/jbuf.h> +#include <pjmedia/stream_common.h> +#include <pj/array.h> +#include <pj/assert.h> +#include <pj/ctype.h> +#include <pj/compat/socket.h> +#include <pj/errno.h> +#include <pj/ioqueue.h> +#include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> +#include <pj/rand.h> +#include <pj/sock_select.h> +#include <pj/string.h> /* memcpy() */ + + +#define THIS_FILE "stream.c" +#define ERRLEVEL 1 +#define LOGERR_(expr) stream_perror expr +#define TRC_(expr) PJ_LOG(5,expr) + +#define BYTES_PER_SAMPLE 2 + +/* Limit the number of synthetic audio samples that are generated by PLC. + * Normally PLC should have it's own means to limit the number of + * synthetic frames, so we need to set this to a reasonably large value + * just as precaution + */ +#define MAX_PLC_MSEC PJMEDIA_MAX_PLC_DURATION_MSEC + + +/* Tracing jitter buffer operations in a stream session to a CSV file. + * The trace will contain JB operation timestamp, frame info, RTP info, and + * the JB state right after the operation. + */ +#define TRACE_JB 0 /* Enable/disable trace. */ +#define TRACE_JB_PATH_PREFIX "" /* Optional path/prefix + for the CSV filename. */ +#if TRACE_JB +# include <pj/file_io.h> +# define TRACE_JB_INVALID_FD ((pj_oshandle_t)-1) +# define TRACE_JB_OPENED(s) (s->trace_jb_fd != TRACE_JB_INVALID_FD) +#endif + +#ifndef PJMEDIA_STREAM_SIZE +# define PJMEDIA_STREAM_SIZE 1000 +#endif + +#ifndef PJMEDIA_STREAM_INC +# define PJMEDIA_STREAM_INC 1000 +#endif + + +/** + * Media channel. + */ +struct pjmedia_channel +{ + pjmedia_stream *stream; /**< Parent stream. */ + pjmedia_dir dir; /**< Channel direction. */ + unsigned pt; /**< Payload type. */ + pj_bool_t paused; /**< Paused?. */ + unsigned out_pkt_size; /**< Size of output buffer. */ + void *out_pkt; /**< Output buffer. */ + unsigned out_pkt_len; /**< Length of data in buffer. */ + pjmedia_rtp_session rtp; /**< RTP session. */ +}; + + +struct dtmf +{ + int event; + pj_uint32_t duration; +}; + +/** + * This structure describes media stream. + * A media stream is bidirectional media transmission between two endpoints. + * It consists of two channels, i.e. encoding and decoding channels. + * A media stream corresponds to a single "m=" line in a SDP session + * description. + */ +struct pjmedia_stream +{ + pjmedia_endpt *endpt; /**< Media endpoint. */ + pjmedia_codec_mgr *codec_mgr; /**< Codec manager instance. */ + pjmedia_stream_info si; /**< Creation parameter. */ + pjmedia_port port; /**< Port interface. */ + pjmedia_channel *enc; /**< Encoding channel. */ + pjmedia_channel *dec; /**< Decoding channel. */ + + pj_pool_t *own_pool; /**< Only created if not given */ + + pjmedia_dir dir; /**< Stream direction. */ + void *user_data; /**< User data. */ + pj_str_t cname; /**< SDES CNAME */ + + pjmedia_transport *transport; /**< Stream transport. */ + + pjmedia_codec *codec; /**< Codec instance being used. */ + pjmedia_codec_param codec_param; /**< Codec param. */ + pj_int16_t *enc_buf; /**< Encoding buffer, when enc's + ptime is different than dec. + Otherwise it's NULL. */ + + unsigned enc_samples_per_pkt; + unsigned enc_buf_size; /**< Encoding buffer size, in + samples. */ + unsigned enc_buf_pos; /**< First position in buf. */ + unsigned enc_buf_count; /**< Number of samples in the + encoding buffer. */ + + unsigned plc_cnt; /**< # of consecutive PLC frames*/ + unsigned max_plc_cnt; /**< Max # of PLC frames */ + + unsigned vad_enabled; /**< VAD enabled in param. */ + unsigned frame_size; /**< Size of encoded base frame.*/ + pj_bool_t is_streaming; /**< Currently streaming?. This + is used to put RTP marker + bit. */ + pj_uint32_t ts_vad_disabled;/**< TS when VAD was disabled. */ + pj_uint32_t tx_duration; /**< TX duration in timestamp. */ + + pj_mutex_t *jb_mutex; + pjmedia_jbuf *jb; /**< Jitter buffer. */ + char jb_last_frm; /**< Last frame type from jb */ + unsigned jb_last_frm_cnt;/**< Last JB frame type counter*/ + + pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ + + pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ + pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ + pj_bool_t initial_rr; /**< Initial RTCP RR sent */ + pj_bool_t rtcp_sdes_bye_disabled;/**< Send RTCP SDES/BYE?*/ + void *out_rtcp_pkt; /**< Outgoing RTCP packet. */ + unsigned out_rtcp_pkt_size; + /**< Outgoing RTCP packet size. */ + + /* RFC 2833 DTMF transmission queue: */ + int tx_event_pt; /**< Outgoing pt for dtmf. */ + int tx_dtmf_count; /**< # of digits in tx dtmf buf.*/ + struct dtmf tx_dtmf_buf[32];/**< Outgoing dtmf queue. */ + + /* Incoming DTMF: */ + int rx_event_pt; /**< Incoming pt for dtmf. */ + int last_dtmf; /**< Current digit, or -1. */ + pj_uint32_t last_dtmf_dur; /**< Start ts for cur digit. */ + unsigned rx_dtmf_count; /**< # of digits in dtmf rx buf.*/ + char rx_dtmf_buf[32];/**< Incoming DTMF buffer. */ + + /* DTMF callback */ + void (*dtmf_cb)(pjmedia_stream*, void*, int); + void *dtmf_cb_user_data; + +#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) + /* Enable support to handle codecs with inconsistent clock rate + * between clock rate in SDP/RTP & the clock rate that is actually used. + * This happens for example with G.722 and MPEG audio codecs. + */ + pj_bool_t has_g722_mpeg_bug; + /**< Flag to specify whether + normalization process + is needed */ + unsigned rtp_tx_ts_len_per_pkt; + /**< Normalized ts length per packet + transmitted according to + 'erroneous' definition */ + unsigned rtp_rx_ts_len_per_frame; + /**< Normalized ts length per frame + received according to + 'erroneous' definition */ + unsigned rtp_rx_last_cnt;/**< Nb of frames in last pkt */ + unsigned rtp_rx_check_cnt; + /**< Counter of remote timestamp + checking */ +#endif + +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) + pj_uint32_t rtcp_xr_last_tx; /**< RTCP XR tx time + in timestamp. */ + pj_uint32_t rtcp_xr_interval; /**< Interval, in timestamp. */ + pj_sockaddr rtcp_xr_dest; /**< Additional remote RTCP XR + dest. If sin_family is + zero, it will be ignored*/ + unsigned rtcp_xr_dest_len; /**< Length of RTCP XR dest + address */ +#endif + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + pj_bool_t use_ka; /**< Stream keep-alive with non- + codec-VAD mechanism is + enabled? */ + pj_timestamp last_frm_ts_sent; /**< Timestamp of last sending + packet */ +#endif + +#if TRACE_JB + pj_oshandle_t trace_jb_fd; /**< Jitter tracing file handle.*/ + char *trace_jb_buf; /**< Jitter tracing buffer. */ +#endif + + pj_uint32_t rtp_rx_last_ts; /**< Last received RTP timestamp*/ +}; + + +/* RFC 2833 digit */ +static const char digitmap[16] = { '0', '1', '2', '3', + '4', '5', '6', '7', + '8', '9', '*', '#', + 'A', 'B', 'C', 'D'}; + +/* Zero audio frame samples */ +static pj_int16_t zero_frame[2 * 30 * 16000 / 1000]; + +/* + * Print error. + */ +static void stream_perror(const char *sender, const char *title, + pj_status_t status) +{ + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(sender, "%s: %s [err:%d]", title, errmsg, status)); +} + + +static pj_status_t send_rtcp(pjmedia_stream *stream, + pj_bool_t with_sdes, + pj_bool_t with_bye, + pj_bool_t with_xr); + + +#if TRACE_JB + +PJ_INLINE(int) trace_jb_print_timestamp(char **buf, pj_ssize_t len) +{ + pj_time_val now; + pj_parsed_time ptime; + char *p = *buf; + + if (len < 14) + return -1; + + pj_gettimeofday(&now); + pj_time_decode(&now, &ptime); + p += pj_utoa_pad(ptime.hour, p, 2, '0'); + *p++ = ':'; + p += pj_utoa_pad(ptime.min, p, 2, '0'); + *p++ = ':'; + p += pj_utoa_pad(ptime.sec, p, 2, '0'); + *p++ = '.'; + p += pj_utoa_pad(ptime.msec, p, 3, '0'); + *p++ = ','; + + *buf = p; + + return 0; +} + +PJ_INLINE(int) trace_jb_print_state(pjmedia_stream *stream, + char **buf, pj_ssize_t len) +{ + char *p = *buf; + char *endp = *buf + len; + pjmedia_jb_state state; + + pjmedia_jbuf_get_state(stream->jb, &state); + + len = pj_ansi_snprintf(p, endp-p, "%d, %d, %d", + state.size, state.burst, state.prefetch); + if ((len < 0) || (len >= endp-p)) + return -1; + + p += len; + *buf = p; + return 0; +} + +static void trace_jb_get(pjmedia_stream *stream, pjmedia_jb_frame_type ft, + pj_size_t fsize) +{ + char *p = stream->trace_jb_buf; + char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; + pj_ssize_t len = 0; + const char* ft_st; + + if (!TRACE_JB_OPENED(stream)) + return; + + /* Print timestamp. */ + if (trace_jb_print_timestamp(&p, endp-p)) + goto on_insuff_buffer; + + /* Print frame type and size */ + switch(ft) { + case PJMEDIA_JB_MISSING_FRAME: + ft_st = "missing"; + break; + case PJMEDIA_JB_NORMAL_FRAME: + ft_st = "normal"; + break; + case PJMEDIA_JB_ZERO_PREFETCH_FRAME: + ft_st = "prefetch"; + break; + case PJMEDIA_JB_ZERO_EMPTY_FRAME: + ft_st = "empty"; + break; + default: + ft_st = "unknown"; + break; + } + + /* Print operation, size, frame count, frame type */ + len = pj_ansi_snprintf(p, endp-p, "GET,%d,1,%s,,,,", fsize, ft_st); + if ((len < 0) || (len >= endp-p)) + goto on_insuff_buffer; + p += len; + + /* Print JB state */ + if (trace_jb_print_state(stream, &p, endp-p)) + goto on_insuff_buffer; + + /* Print end of line */ + if (endp-p < 2) + goto on_insuff_buffer; + *p++ = '\n'; + + /* Write and flush */ + len = p - stream->trace_jb_buf; + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + return; + +on_insuff_buffer: + pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); +} + +static void trace_jb_put(pjmedia_stream *stream, const pjmedia_rtp_hdr *hdr, + unsigned payloadlen, unsigned frame_cnt) +{ + char *p = stream->trace_jb_buf; + char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; + pj_ssize_t len = 0; + + if (!TRACE_JB_OPENED(stream)) + return; + + /* Print timestamp. */ + if (trace_jb_print_timestamp(&p, endp-p)) + goto on_insuff_buffer; + + /* Print operation, size, frame count, RTP info */ + len = pj_ansi_snprintf(p, endp-p, + "PUT,%d,%d,,%d,%d,%d,", + payloadlen, frame_cnt, + pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), hdr->m); + if ((len < 0) || (len >= endp-p)) + goto on_insuff_buffer; + p += len; + + /* Print JB state */ + if (trace_jb_print_state(stream, &p, endp-p)) + goto on_insuff_buffer; + + /* Print end of line */ + if (endp-p < 2) + goto on_insuff_buffer; + *p++ = '\n'; + + /* Write and flush */ + len = p - stream->trace_jb_buf; + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + return; + +on_insuff_buffer: + pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); +} + +#endif /* TRACE_JB */ + + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 +/* + * Send keep-alive packet using non-codec frame. + */ +static void send_keep_alive_packet(pjmedia_stream *stream) +{ +#if PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_EMPTY_RTP + + /* Keep-alive packet is empty RTP */ + pj_status_t status; + void *pkt; + int pkt_len; + + TRC_((stream->port.info.name.ptr, + "Sending keep-alive (RTCP and empty RTP)")); + + /* Send RTP */ + status = pjmedia_rtp_encode_rtp( &stream->enc->rtp, + stream->enc->pt, 0, + 1, + 0, + (const void**)&pkt, + &pkt_len); + pj_assert(status == PJ_SUCCESS); + + pj_memcpy(stream->enc->out_pkt, pkt, pkt_len); + pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt, + pkt_len); + + /* Send RTCP */ + send_rtcp(stream, PJ_TRUE, PJ_FALSE, PJ_FALSE); + +#elif PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_USER + + /* Keep-alive packet is defined in PJMEDIA_STREAM_KA_USER_PKT */ + int pkt_len; + const pj_str_t str_ka = PJMEDIA_STREAM_KA_USER_PKT; + + TRC_((stream->port.info.name.ptr, + "Sending keep-alive (custom RTP/RTCP packets)")); + + /* Send to RTP port */ + pj_memcpy(stream->enc->out_pkt, str_ka.ptr, str_ka.slen); + pkt_len = str_ka.slen; + pjmedia_transport_send_rtp(stream->transport, stream->enc->out_pkt, + pkt_len); + + /* Send to RTCP port */ + pjmedia_transport_send_rtcp(stream->transport, stream->enc->out_pkt, + pkt_len); + +#else + + PJ_UNUSED_ARG(stream); + +#endif +} +#endif /* defined(PJMEDIA_STREAM_ENABLE_KA) */ + +/* + * play_callback() + * + * This callback is called by sound device's player thread when it + * needs to feed the player with some frames. + */ +static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame) +{ + pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; + pjmedia_channel *channel = stream->dec; + unsigned samples_count, samples_per_frame, samples_required; + pj_int16_t *p_out_samp; + pj_status_t status; + + + /* Return no frame is channel is paused */ + if (channel->paused) { + frame->type = PJMEDIA_FRAME_TYPE_NONE; + return PJ_SUCCESS; + } + + /* Repeat get frame from the jitter buffer and decode the frame + * until we have enough frames according to codec's ptime. + */ + + /* Lock jitter buffer mutex first */ + pj_mutex_lock( stream->jb_mutex ); + + samples_required = PJMEDIA_PIA_SPF(&stream->port.info); + samples_per_frame = stream->codec_param.info.frm_ptime * + stream->codec_param.info.clock_rate * + stream->codec_param.info.channel_cnt / + 1000; + p_out_samp = (pj_int16_t*) frame->buf; + + for (samples_count=0; samples_count < samples_required; + samples_count += samples_per_frame) + { + char frame_type; + pj_size_t frame_size; + pj_uint32_t bit_info; + + /* Get frame from jitter buffer. */ + pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, + &frame_type, &bit_info); + +#if TRACE_JB + trace_jb_get(stream, frame_type, frame_size); +#endif + + if (frame_type == PJMEDIA_JB_MISSING_FRAME) { + + /* Activate PLC */ + if (stream->codec->op->recover && + stream->codec_param.setting.plc && + stream->plc_cnt < stream->max_plc_cnt) + { + pjmedia_frame frame_out; + + frame_out.buf = p_out_samp + samples_count; + frame_out.size = frame->size - samples_count*2; + status = pjmedia_codec_recover(stream->codec, + frame_out.size, + &frame_out); + + ++stream->plc_cnt; + + } else { + status = -1; + } + + if (status != PJ_SUCCESS) { + /* Either PLC failed or PLC not supported/enabled */ + pjmedia_zero_samples(p_out_samp + samples_count, + samples_required - samples_count); + } + + if (frame_type != stream->jb_last_frm) { + /* Report changing frame type event */ + PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost%s!", + (status == PJ_SUCCESS? ", recovered":""))); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + + } else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) { + + const char *with_plc = ""; + + /* Jitter buffer is empty. If this is the first "empty" state, + * activate PLC to smoothen the fade-out, otherwise zero + * the frame. + */ + //Using this "if" will only invoke PLC for the first packet + //lost and not the subsequent ones. + //if (frame_type != stream->jb_last_frm) { + if (1) { + /* Activate PLC to smoothen the missing frame */ + if (stream->codec->op->recover && + stream->codec_param.setting.plc && + stream->plc_cnt < stream->max_plc_cnt) + { + pjmedia_frame frame_out; + + do { + frame_out.buf = p_out_samp + samples_count; + frame_out.size = frame->size - samples_count*2; + status = pjmedia_codec_recover(stream->codec, + frame_out.size, + &frame_out); + if (status != PJ_SUCCESS) + break; + + samples_count += samples_per_frame; + ++stream->plc_cnt; + + } while (samples_count < samples_required && + stream->plc_cnt < stream->max_plc_cnt); + + with_plc = ", plc invoked"; + } + } + + if (samples_count < samples_required) { + pjmedia_zero_samples(p_out_samp + samples_count, + samples_required - samples_count); + samples_count = samples_required; + } + + if (stream->jb_last_frm != frame_type) { + pjmedia_jb_state jb_state; + + /* Report changing frame type event */ + pjmedia_jbuf_get_state(stream->jb, &jb_state); + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer empty (prefetch=%d)%s", + jb_state.prefetch, with_plc)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + break; + + } else if (frame_type != PJMEDIA_JB_NORMAL_FRAME) { + + const char *with_plc = ""; + + /* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */ + pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME); + + /* Always activate PLC when it's available.. */ + if (stream->codec->op->recover && + stream->codec_param.setting.plc && + stream->plc_cnt < stream->max_plc_cnt) + { + pjmedia_frame frame_out; + + do { + frame_out.buf = p_out_samp + samples_count; + frame_out.size = frame->size - samples_count*2; + status = pjmedia_codec_recover(stream->codec, + frame_out.size, + &frame_out); + if (status != PJ_SUCCESS) + break; + samples_count += samples_per_frame; + + ++stream->plc_cnt; + + } while (samples_count < samples_required && + stream->plc_cnt < stream->max_plc_cnt); + + with_plc = ", plc invoked"; + } + + if (samples_count < samples_required) { + pjmedia_zero_samples(p_out_samp + samples_count, + samples_required - samples_count); + samples_count = samples_required; + } + + if (stream->jb_last_frm != frame_type) { + pjmedia_jb_state jb_state; + + /* Report changing frame type event */ + pjmedia_jbuf_get_state(stream->jb, &jb_state); + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer is bufferring (prefetch=%d)%s", + jb_state.prefetch, with_plc)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + break; + + } else { + /* Got "NORMAL" frame from jitter buffer */ + pjmedia_frame frame_in, frame_out; + + stream->plc_cnt = 0; + + /* Decode */ + frame_in.buf = channel->out_pkt; + frame_in.size = frame_size; + frame_in.bit_info = bit_info; + frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; /* ignored */ + + frame_out.buf = p_out_samp + samples_count; + frame_out.size = frame->size - samples_count*BYTES_PER_SAMPLE; + status = pjmedia_codec_decode( stream->codec, &frame_in, + frame_out.size, &frame_out); + if (status != 0) { + LOGERR_((port->info.name.ptr, "codec decode() error", + status)); + + pjmedia_zero_samples(p_out_samp + samples_count, + samples_per_frame); + } + + if (stream->jb_last_frm != frame_type) { + /* Report changing frame type event */ + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer starts returning normal frames " + "(after %d empty/lost)", + stream->jb_last_frm_cnt, stream->jb_last_frm)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + } + } + + + /* Unlock jitter buffer mutex. */ + pj_mutex_unlock( stream->jb_mutex ); + + /* Return PJMEDIA_FRAME_TYPE_NONE if we have no frames at all + * (it can happen when jitter buffer returns PJMEDIA_JB_ZERO_EMPTY_FRAME). + */ + if (samples_count == 0) { + frame->type = PJMEDIA_FRAME_TYPE_NONE; + frame->size = 0; + } else { + frame->type = PJMEDIA_FRAME_TYPE_AUDIO; + frame->size = samples_count * BYTES_PER_SAMPLE; + frame->timestamp.u64 = 0; + } + + return PJ_SUCCESS; +} + + +/* The other version of get_frame callback used when stream port format + * is non linear PCM. + */ +static pj_status_t get_frame_ext( pjmedia_port *port, pjmedia_frame *frame) +{ + pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; + pjmedia_channel *channel = stream->dec; + pjmedia_frame_ext *f = (pjmedia_frame_ext*)frame; + unsigned samples_per_frame, samples_required; + pj_status_t status; + + /* Return no frame if channel is paused */ + if (channel->paused) { + frame->type = PJMEDIA_FRAME_TYPE_NONE; + return PJ_SUCCESS; + } + + /* Repeat get frame from the jitter buffer and decode the frame + * until we have enough frames according to codec's ptime. + */ + + samples_required = PJMEDIA_PIA_SPF(&stream->port.info); + samples_per_frame = stream->codec_param.info.frm_ptime * + stream->codec_param.info.clock_rate * + stream->codec_param.info.channel_cnt / + 1000; + + pj_bzero(f, sizeof(pjmedia_frame_ext)); + f->base.type = PJMEDIA_FRAME_TYPE_EXTENDED; + + while (f->samples_cnt < samples_required) { + char frame_type; + pj_size_t frame_size; + pj_uint32_t bit_info; + + /* Lock jitter buffer mutex first */ + pj_mutex_lock( stream->jb_mutex ); + + /* Get frame from jitter buffer. */ + pjmedia_jbuf_get_frame2(stream->jb, channel->out_pkt, &frame_size, + &frame_type, &bit_info); + +#if TRACE_JB + trace_jb_get(stream, frame_type, frame_size); +#endif + + /* Unlock jitter buffer mutex. */ + pj_mutex_unlock( stream->jb_mutex ); + + if (frame_type == PJMEDIA_JB_NORMAL_FRAME) { + /* Got "NORMAL" frame from jitter buffer */ + pjmedia_frame frame_in; + + /* Decode */ + frame_in.buf = channel->out_pkt; + frame_in.size = frame_size; + frame_in.bit_info = bit_info; + frame_in.type = PJMEDIA_FRAME_TYPE_AUDIO; + + status = pjmedia_codec_decode( stream->codec, &frame_in, + 0, frame); + if (status != PJ_SUCCESS) { + LOGERR_((port->info.name.ptr, "codec decode() error", + status)); + pjmedia_frame_ext_append_subframe(f, NULL, 0, + (pj_uint16_t)samples_per_frame); + } + + if (stream->jb_last_frm != frame_type) { + /* Report changing frame type event */ + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer starts returning normal frames " + "(after %d empty/lost)", + stream->jb_last_frm_cnt, stream->jb_last_frm)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + + } else { + + /* Try to generate frame by invoking PLC (when any) */ + status = PJ_SUCCESS; + if (stream->codec->op->recover) { + status = pjmedia_codec_recover(stream->codec, 0, frame); + } + + /* No PLC or PLC failed */ + if (!stream->codec->op->recover || status != PJ_SUCCESS) { + pjmedia_frame_ext_append_subframe(f, NULL, 0, + (pj_uint16_t)samples_per_frame); + } + + if (frame_type == PJMEDIA_JB_MISSING_FRAME) { + if (frame_type != stream->jb_last_frm) { + /* Report changing frame type event */ + PJ_LOG(5,(stream->port.info.name.ptr, "Frame lost!")); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + } else if (frame_type == PJMEDIA_JB_ZERO_EMPTY_FRAME) { + if (frame_type != stream->jb_last_frm) { + pjmedia_jb_state jb_state; + + /* Report changing frame type event */ + pjmedia_jbuf_get_state(stream->jb, &jb_state); + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer empty (prefetch=%d)", + jb_state.prefetch)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + } else { + + /* It can only be PJMEDIA_JB_ZERO_PREFETCH frame */ + pj_assert(frame_type == PJMEDIA_JB_ZERO_PREFETCH_FRAME); + + if (stream->jb_last_frm != frame_type) { + pjmedia_jb_state jb_state; + + /* Report changing frame type event */ + pjmedia_jbuf_get_state(stream->jb, &jb_state); + PJ_LOG(5,(stream->port.info.name.ptr, + "Jitter buffer is bufferring (prefetch=%d)", + jb_state.prefetch)); + + stream->jb_last_frm = frame_type; + stream->jb_last_frm_cnt = 1; + } else { + stream->jb_last_frm_cnt++; + } + } + } + } + + return PJ_SUCCESS; +} + + +/* + * Transmit DTMF + */ +static void create_dtmf_payload(pjmedia_stream *stream, + struct pjmedia_frame *frame_out, + int *first, int *last) +{ + pjmedia_rtp_dtmf_event *event; + struct dtmf *digit = &stream->tx_dtmf_buf[0]; + pj_uint32_t cur_ts; + + pj_assert(sizeof(pjmedia_rtp_dtmf_event) == 4); + + *first = *last = 0; + + event = (pjmedia_rtp_dtmf_event*) frame_out->buf; + cur_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts); + + if (digit->duration == 0) { + PJ_LOG(5,(stream->port.info.name.ptr, "Sending DTMF digit id %c", + digitmap[digit->event])); + *first = 1; + } + + digit->duration += PJMEDIA_PIA_SPF(&stream->port.info); + + event->event = (pj_uint8_t)digit->event; + event->e_vol = 10; + event->duration = pj_htons((pj_uint16_t)digit->duration); + + + if (digit->duration >= PJMEDIA_DTMF_DURATION) { + + event->e_vol |= 0x80; + *last = 1; + + /* Prepare next digit. */ + pj_mutex_lock(stream->jb_mutex); + + pj_array_erase(stream->tx_dtmf_buf, sizeof(stream->tx_dtmf_buf[0]), + stream->tx_dtmf_count, 0); + --stream->tx_dtmf_count; + + pj_mutex_unlock(stream->jb_mutex); + } + + frame_out->size = 4; +} + + +static pj_status_t send_rtcp(pjmedia_stream *stream, + pj_bool_t with_sdes, + pj_bool_t with_bye, + pj_bool_t with_xr) +{ + void *sr_rr_pkt; + pj_uint8_t *pkt; + int len, max_len; + pj_status_t status; + + /* Build RTCP RR/SR packet */ + pjmedia_rtcp_build_rtcp(&stream->rtcp, &sr_rr_pkt, &len); + +#if !defined(PJMEDIA_HAS_RTCP_XR) || (PJMEDIA_HAS_RTCP_XR == 0) + with_xr = PJ_FALSE; +#endif + + if (with_sdes || with_bye || with_xr) { + pkt = (pj_uint8_t*) stream->out_rtcp_pkt; + pj_memcpy(pkt, sr_rr_pkt, len); + max_len = stream->out_rtcp_pkt_size; + } else { + pkt = (pj_uint8_t*)sr_rr_pkt; + max_len = len; + } + + /* Build RTCP SDES packet */ + if (with_sdes) { + pjmedia_rtcp_sdes sdes; + pj_size_t sdes_len; + + pj_bzero(&sdes, sizeof(sdes)); + sdes.cname = stream->cname; + sdes_len = max_len - len; + status = pjmedia_rtcp_build_rtcp_sdes(&stream->rtcp, pkt+len, + &sdes_len, &sdes); + if (status != PJ_SUCCESS) { + PJ_PERROR(4,(stream->port.info.name.ptr, status, + "Error generating RTCP SDES")); + } else { + len += (int)sdes_len; + } + } + + /* Build RTCP XR packet */ +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) + if (with_xr) { + int i; + pjmedia_jb_state jb_state; + void *xr_pkt; + int xr_len; + + /* Update RTCP XR with current JB states */ + pjmedia_jbuf_get_state(stream->jb, &jb_state); + + i = jb_state.avg_delay; + status = pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_JB_NOM, i); + pj_assert(status == PJ_SUCCESS); + + i = jb_state.max_delay; + status = pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_JB_MAX, i); + pj_assert(status == PJ_SUCCESS); + + pjmedia_rtcp_build_rtcp_xr(&stream->rtcp.xr_session, 0, + &xr_pkt, &xr_len); + + if (xr_len + len <= max_len) { + pj_memcpy(pkt+len, xr_pkt, xr_len); + len += xr_len; + + /* Send the RTCP XR to third-party destination if specified */ + if (stream->rtcp_xr_dest_len) { + pjmedia_transport_send_rtcp2(stream->transport, + &stream->rtcp_xr_dest, + stream->rtcp_xr_dest_len, + xr_pkt, xr_len); + } + + } else { + PJ_PERROR(4,(stream->port.info.name.ptr, PJ_ETOOBIG, + "Error generating RTCP-XR")); + } + } +#endif + + /* Build RTCP BYE packet */ + if (with_bye) { + pj_size_t bye_len; + + bye_len = max_len - len; + status = pjmedia_rtcp_build_rtcp_bye(&stream->rtcp, pkt+len, + &bye_len, NULL); + if (status != PJ_SUCCESS) { + PJ_PERROR(4,(stream->port.info.name.ptr, status, + "Error generating RTCP BYE")); + } else { + len += (int)bye_len; + } + } + + /* Send! */ + status = pjmedia_transport_send_rtcp(stream->transport, pkt, len); + + return status; +} + +/** + * check_tx_rtcp() + * + * This function is can be called by either put_frame() or get_frame(), + * to transmit periodic RTCP SR/RR report. + */ +static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp) +{ + /* Note that timestamp may represent local or remote timestamp, + * depending on whether this function is called from put_frame() + * or get_frame(). + */ + + if (stream->rtcp_last_tx == 0) { + + stream->rtcp_last_tx = timestamp; + + } else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) { + pj_bool_t with_xr = PJ_FALSE; + pj_status_t status; + +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) + if (stream->rtcp.xr_enabled) { + if (stream->rtcp_xr_last_tx == 0) { + stream->rtcp_xr_last_tx = timestamp; + } else if (timestamp - stream->rtcp_xr_last_tx >= + stream->rtcp_xr_interval) + { + with_xr = PJ_TRUE; + + /* Update last tx RTCP XR */ + stream->rtcp_xr_last_tx = timestamp; + } + } +#endif + + status = send_rtcp(stream, !stream->rtcp_sdes_bye_disabled, PJ_FALSE, + with_xr); + if (status != PJ_SUCCESS) { + PJ_PERROR(4,(stream->port.info.name.ptr, status, + "Error sending RTCP")); + } + + stream->rtcp_last_tx = timestamp; + } +} + + +/** + * Rebuffer the frame when encoder and decoder has different ptime + * (such as when different iLBC modes are used by local and remote) + */ +static void rebuffer(pjmedia_stream *stream, + pjmedia_frame *frame) +{ + /* How many samples are needed */ + unsigned count; + + /* Normalize frame */ + if (frame->type != PJMEDIA_FRAME_TYPE_AUDIO) + frame->size = 0; + + /* Remove used frame from the buffer. */ + if (stream->enc_buf_pos) { + if (stream->enc_buf_count) { + pj_memmove(stream->enc_buf, + stream->enc_buf + stream->enc_buf_pos, + (stream->enc_buf_count << 1)); + } + stream->enc_buf_pos = 0; + } + + /* Make sure we have space to store the new frame */ + pj_assert(stream->enc_buf_count + (frame->size >> 1) < + stream->enc_buf_size); + + /* Append new frame to the buffer */ + if (frame->size) { + /* Handle case when there is no port transmitting to this port */ + if (frame->buf) { + pj_memcpy(stream->enc_buf + stream->enc_buf_count, + frame->buf, frame->size); + } else { + pj_bzero(stream->enc_buf + stream->enc_buf_count, frame->size); + } + stream->enc_buf_count += (frame->size >> 1); + } + + /* How many samples are needed */ + count = stream->codec_param.info.enc_ptime * + PJMEDIA_PIA_SRATE(&stream->port.info) / 1000; + + /* See if we have enough samples */ + if (stream->enc_buf_count >= count) { + + frame->type = PJMEDIA_FRAME_TYPE_AUDIO; + frame->buf = stream->enc_buf; + frame->size = (count << 1); + + stream->enc_buf_pos = count; + stream->enc_buf_count -= count; + + } else { + /* We don't have enough samples */ + frame->type = PJMEDIA_FRAME_TYPE_NONE; + } +} + + +/** + * put_frame_imp() + */ +static pj_status_t put_frame_imp( pjmedia_port *port, + pjmedia_frame *frame ) +{ + pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; + pjmedia_channel *channel = stream->enc; + pj_status_t status = 0; + pjmedia_frame frame_out; + unsigned ts_len, rtp_ts_len, samples_per_frame; + void *rtphdr; + int rtphdrlen; + int inc_timestamp = 0; + + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 + /* If the interval since last sending packet is greater than + * PJMEDIA_STREAM_KA_INTERVAL, send keep-alive packet. + */ + if (stream->use_ka) + { + pj_uint32_t dtx_duration; + + dtx_duration = pj_timestamp_diff32(&stream->last_frm_ts_sent, + &frame->timestamp); + if (dtx_duration > + PJMEDIA_STREAM_KA_INTERVAL * PJMEDIA_PIA_SRATE(&stream->port.info)) + { + send_keep_alive_packet(stream); + stream->last_frm_ts_sent = frame->timestamp; + } + } +#endif + + /* Don't do anything if stream is paused */ + if (channel->paused) { + stream->enc_buf_pos = stream->enc_buf_count = 0; + return PJ_SUCCESS; + } + + /* Number of samples in the frame */ + if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO) + ts_len = (frame->size >> 1) / stream->codec_param.info.channel_cnt; + else if (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED) + ts_len = PJMEDIA_PIA_SPF(&stream->port.info) / + PJMEDIA_PIA_CCNT(&stream->port.info); + else + ts_len = 0; + + /* Increment transmit duration */ + stream->tx_duration += ts_len; + +#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) + /* Handle special case for audio codec with RTP timestamp inconsistence + * e.g: G722, MPEG audio. + */ + if (stream->has_g722_mpeg_bug) + rtp_ts_len = stream->rtp_tx_ts_len_per_pkt; + else + rtp_ts_len = ts_len; +#else + rtp_ts_len = ts_len; +#endif + + /* Init frame_out buffer. */ + frame_out.buf = ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr); + frame_out.size = 0; + + /* Calculate number of samples per frame */ + samples_per_frame = stream->enc_samples_per_pkt; + + + /* If we have DTMF digits in the queue, transmit the digits. + * Otherwise encode the PCM buffer. + */ + if (stream->tx_dtmf_count) { + int first=0, last=0; + + create_dtmf_payload(stream, &frame_out, &first, &last); + + /* Encapsulate into RTP packet. Note that: + * - RTP marker should be set on the beginning of a new event + * - RTP timestamp is constant for the same packet. + */ + status = pjmedia_rtp_encode_rtp( &channel->rtp, + stream->tx_event_pt, first, + frame_out.size, + (first ? rtp_ts_len : 0), + (const void**)&rtphdr, + &rtphdrlen); + + if (last) { + /* This is the last packet for the event. + * Increment the RTP timestamp of the RTP session, for next + * RTP packets. + */ + inc_timestamp = PJMEDIA_DTMF_DURATION - rtp_ts_len; + } + + + /* + * Special treatment for FRAME_TYPE_AUDIO but with frame->buf==NULL. + * This happens when stream input is disconnected from the bridge. + * In this case we periodically transmit RTP frame to keep NAT binding + * open, by giving zero PCM frame to the codec. + * + * This was originally done in http://trac.pjsip.org/repos/ticket/56, + * but then disabled in http://trac.pjsip.org/repos/ticket/439, but + * now it's enabled again. + */ + } else if (frame->type == PJMEDIA_FRAME_TYPE_AUDIO && + frame->buf == NULL && + stream->port.info.fmt.id == PJMEDIA_FORMAT_L16 && + (stream->dir & PJMEDIA_DIR_ENCODING) && + stream->codec_param.info.frm_ptime * + stream->codec_param.info.channel_cnt * + stream->codec_param.info.clock_rate/1000 < + PJ_ARRAY_SIZE(zero_frame)) + { + pjmedia_frame silence_frame; + + pj_bzero(&silence_frame, sizeof(silence_frame)); + silence_frame.buf = zero_frame; + silence_frame.size = stream->codec_param.info.frm_ptime * 2 * + stream->codec_param.info.channel_cnt * + stream->codec_param.info.clock_rate / 1000; + silence_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; + silence_frame.timestamp.u32.lo = pj_ntohl(stream->enc->rtp.out_hdr.ts); + + /* Encode! */ + status = pjmedia_codec_encode( stream->codec, &silence_frame, + channel->out_pkt_size - + sizeof(pjmedia_rtp_hdr), + &frame_out); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, + "Codec encode() error", status)); + return status; + } + + /* Encapsulate. */ + status = pjmedia_rtp_encode_rtp( &channel->rtp, + channel->pt, 0, + frame_out.size, rtp_ts_len, + (const void**)&rtphdr, + &rtphdrlen); + + + /* Encode audio frame */ + } else if ((frame->type == PJMEDIA_FRAME_TYPE_AUDIO && + frame->buf != NULL) || + (frame->type == PJMEDIA_FRAME_TYPE_EXTENDED)) + { + /* Encode! */ + status = pjmedia_codec_encode( stream->codec, frame, + channel->out_pkt_size - + sizeof(pjmedia_rtp_hdr), + &frame_out); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, + "Codec encode() error", status)); + return status; + } + + /* Encapsulate. */ + status = pjmedia_rtp_encode_rtp( &channel->rtp, + channel->pt, 0, + frame_out.size, rtp_ts_len, + (const void**)&rtphdr, + &rtphdrlen); + + } else { + + /* Just update RTP session's timestamp. */ + status = pjmedia_rtp_encode_rtp( &channel->rtp, + 0, 0, + 0, rtp_ts_len, + (const void**)&rtphdr, + &rtphdrlen); + + } + + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, + "RTP encode_rtp() error", status)); + return status; + } + + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is not "decoding only", because + * when it is, check_tx_rtcp() will be handled by get_frame(). + */ + if (stream->dir != PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); + } + + /* Do nothing if we have nothing to transmit */ + if (frame_out.size == 0) { + if (stream->is_streaming) { + PJ_LOG(5,(stream->port.info.name.ptr,"Starting silence")); + stream->is_streaming = PJ_FALSE; + } + + return PJ_SUCCESS; + } + + + /* Copy RTP header to the beginning of packet */ + pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); + + /* Special case for DTMF: timestamp remains constant for + * the same event, and is only updated after a complete event + * has been transmitted. + */ + if (inc_timestamp) { + pjmedia_rtp_encode_rtp( &channel->rtp, stream->tx_event_pt, 0, + 0, inc_timestamp, NULL, NULL); + } + + /* Set RTP marker bit if currently not streaming */ + if (stream->is_streaming == PJ_FALSE) { + pjmedia_rtp_hdr *rtp = (pjmedia_rtp_hdr*) channel->out_pkt; + + rtp->m = 1; + PJ_LOG(5,(stream->port.info.name.ptr,"Start talksprut..")); + } + + stream->is_streaming = PJ_TRUE; + + /* Send the RTP packet to the transport. */ + status = pjmedia_transport_send_rtp(stream->transport, channel->out_pkt, + frame_out.size + + sizeof(pjmedia_rtp_hdr)); + if (status != PJ_SUCCESS) { + PJ_PERROR(4,(stream->port.info.name.ptr, status, + "Error sending RTP")); + } + + /* Update stat */ + pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size); + stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts); + stream->rtcp.stat.rtp_tx_last_seq = pj_ntohs(stream->enc->rtp.out_hdr.seq); + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + /* Update timestamp of last sending packet. */ + stream->last_frm_ts_sent = frame->timestamp; +#endif + + return PJ_SUCCESS; +} + + +/** + * put_frame() + * + * This callback is called by upstream component when it has PCM frame + * to transmit. This function encodes the PCM frame, pack it into + * RTP packet, and transmit to peer. + */ +static pj_status_t put_frame( pjmedia_port *port, + pjmedia_frame *frame ) +{ + pjmedia_stream *stream = (pjmedia_stream*) port->port_data.pdata; + pjmedia_frame tmp_zero_frame; + unsigned samples_per_frame; + + samples_per_frame = stream->enc_samples_per_pkt; + + /* http://www.pjsip.org/trac/ticket/56: + * when input is PJMEDIA_FRAME_TYPE_NONE, feed zero PCM frame + * instead so that encoder can decide whether or not to transmit + * silence frame. + */ + if (frame->type == PJMEDIA_FRAME_TYPE_NONE) { + pj_memcpy(&tmp_zero_frame, frame, sizeof(pjmedia_frame)); + frame = &tmp_zero_frame; + + tmp_zero_frame.buf = NULL; + tmp_zero_frame.size = samples_per_frame * 2; + tmp_zero_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; + } + +#if 0 + // This is no longer needed because each TYPE_NONE frame will + // be converted into zero frame above + + /* If VAD is temporarily disabled during creation, feed zero PCM frame + * to the codec. + */ + if (stream->vad_enabled != stream->codec_param.setting.vad && + stream->vad_enabled != 0 && + frame->type == PJMEDIA_FRAME_TYPE_NONE && + samples_per_frame <= ZERO_PCM_MAX_SIZE) + { + pj_memcpy(&tmp_in_frame, frame, sizeof(pjmedia_frame)); + frame = &tmp_in_frame; + + tmp_in_frame.buf = NULL; + tmp_in_frame.size = samples_per_frame * 2; + tmp_in_frame.type = PJMEDIA_FRAME_TYPE_AUDIO; + } +#endif + + /* If VAD is temporarily disabled during creation, enable it + * after transmitting for VAD_SUSPEND_SEC seconds. + */ + if (stream->vad_enabled != stream->codec_param.setting.vad && + (stream->tx_duration - stream->ts_vad_disabled) > + PJMEDIA_PIA_SRATE(&stream->port.info) * + PJMEDIA_STREAM_VAD_SUSPEND_MSEC / 1000) + { + stream->codec_param.setting.vad = stream->vad_enabled; + pjmedia_codec_modify(stream->codec, &stream->codec_param); + PJ_LOG(4,(stream->port.info.name.ptr,"VAD re-enabled")); + } + + + /* If encoder has different ptime than decoder, then the frame must + * be passed through the encoding buffer via rebuffer() function. + */ + if (stream->enc_buf != NULL) { + pjmedia_frame tmp_rebuffer_frame; + pj_status_t status = PJ_SUCCESS; + + /* Copy original frame to temporary frame since we need + * to modify it. + */ + pj_memcpy(&tmp_rebuffer_frame, frame, sizeof(pjmedia_frame)); + + /* Loop while we have full frame in enc_buffer */ + for (;;) { + pj_status_t st; + + /* Run rebuffer() */ + rebuffer(stream, &tmp_rebuffer_frame); + + /* Process this frame */ + st = put_frame_imp(port, &tmp_rebuffer_frame); + if (st != PJ_SUCCESS) + status = st; + + /* If we still have full frame in the buffer, re-run + * rebuffer() with NULL frame. + */ + if (stream->enc_buf_count >= stream->enc_samples_per_pkt) { + + tmp_rebuffer_frame.type = PJMEDIA_FRAME_TYPE_NONE; + + } else { + + /* Otherwise break */ + break; + } + } + + return status; + + } else { + return put_frame_imp(port, frame); + } +} + + +#if 0 +static void dump_bin(const char *buf, unsigned len) +{ + unsigned i; + + PJ_LOG(3,(THIS_FILE, "begin dump")); + for (i=0; i<len; ++i) { + int j; + char bits[9]; + unsigned val = buf[i] & 0xFF; + + bits[8] = '\0'; + for (j=0; j<8; ++j) { + if (val & (1 << (7-j))) + bits[j] = '1'; + else + bits[j] = '0'; + } + + PJ_LOG(3,(THIS_FILE, "%2d %s [%d]", i, bits, val)); + } + PJ_LOG(3,(THIS_FILE, "end dump")); +} +#endif + +/* + * Handle incoming DTMF digits. + */ +static void handle_incoming_dtmf( pjmedia_stream *stream, + const void *payload, unsigned payloadlen) +{ + pjmedia_rtp_dtmf_event *event = (pjmedia_rtp_dtmf_event*) payload; + + /* Check compiler packing. */ + pj_assert(sizeof(pjmedia_rtp_dtmf_event)==4); + + /* Must have sufficient length before we proceed. */ + if (payloadlen < sizeof(pjmedia_rtp_dtmf_event)) + return; + + //dump_bin(payload, payloadlen); + + /* Check if this is the same/current digit of the last packet. */ + if (stream->last_dtmf != -1 && + event->event == stream->last_dtmf && + pj_ntohs(event->duration) >= stream->last_dtmf_dur) + { + /* Yes, this is the same event. */ + stream->last_dtmf_dur = pj_ntohs(event->duration); + return; + } + + /* Ignore unknown event. */ + if (event->event > 15) { + PJ_LOG(5,(stream->port.info.name.ptr, + "Ignored RTP pkt with bad DTMF event %d", + event->event)); + return; + } + + /* New event! */ + PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d", + digitmap[event->event], + (event->e_vol & 0x3F))); + + stream->last_dtmf = event->event; + stream->last_dtmf_dur = pj_ntohs(event->duration); + + /* If DTMF callback is installed, call the callback, otherwise keep + * the DTMF digits in the buffer. + */ + if (stream->dtmf_cb) { + + stream->dtmf_cb(stream, stream->dtmf_cb_user_data, + digitmap[event->event]); + + } else { + /* By convention, we use jitter buffer's mutex to access shared + * DTMF variables. + */ + pj_mutex_lock(stream->jb_mutex); + if (stream->rx_dtmf_count >= PJ_ARRAY_SIZE(stream->rx_dtmf_buf)) { + /* DTMF digits overflow. Discard the oldest digit. */ + pj_array_erase(stream->rx_dtmf_buf, + sizeof(stream->rx_dtmf_buf[0]), + stream->rx_dtmf_count, 0); + --stream->rx_dtmf_count; + } + stream->rx_dtmf_buf[stream->rx_dtmf_count++] = digitmap[event->event]; + pj_mutex_unlock(stream->jb_mutex); + } +} + + +/* + * This callback is called by stream transport on receipt of packets + * in the RTP socket. + */ +static void on_rx_rtp( void *data, + void *pkt, + pj_ssize_t bytes_read) + +{ + pjmedia_stream *stream = (pjmedia_stream*) data; + pjmedia_channel *channel = stream->dec; + const pjmedia_rtp_hdr *hdr; + const void *payload; + unsigned payloadlen; + pjmedia_rtp_status seq_st; + pj_status_t status; + pj_bool_t pkt_discarded = PJ_FALSE; + + /* Check for errors */ + if (bytes_read < 0) { + LOGERR_((stream->port.info.name.ptr, "RTP recv() error", -bytes_read)); + return; + } + + /* Ignore keep-alive packets */ + if (bytes_read < (pj_ssize_t) sizeof(pjmedia_rtp_hdr)) + return; + + /* Update RTP and RTCP session. */ + status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, + &hdr, &payload, &payloadlen); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); + stream->rtcp.stat.rx.discard++; + return; + } + + /* Ignore the packet if decoder is paused */ + if (channel->paused) + goto on_return; + + /* Update RTP session (also checks if RTP session can accept + * the incoming packet. + */ + pjmedia_rtp_session_update2(&channel->rtp, hdr, &seq_st, + hdr->pt != stream->rx_event_pt); + if (seq_st.status.value) { + TRC_ ((stream->port.info.name.ptr, + "RTP status: badpt=%d, badssrc=%d, dup=%d, " + "outorder=%d, probation=%d, restart=%d", + seq_st.status.flag.badpt, + seq_st.status.flag.badssrc, + seq_st.status.flag.dup, + seq_st.status.flag.outorder, + seq_st.status.flag.probation, + seq_st.status.flag.restart)); + + if (seq_st.status.flag.badpt) { + PJ_LOG(4,(stream->port.info.name.ptr, + "Bad RTP pt %d (expecting %d)", + hdr->pt, channel->rtp.out_pt)); + } + + if (seq_st.status.flag.badssrc) { + PJ_LOG(4,(stream->port.info.name.ptr, + "Changed RTP peer SSRC %d (previously %d)", + channel->rtp.peer_ssrc, stream->rtcp.peer_ssrc)); + stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; + } + + + } + + /* Skip bad RTP packet */ + if (seq_st.status.flag.bad) { + pkt_discarded = PJ_TRUE; + goto on_return; + } + + /* Ignore if payloadlen is zero */ + if (payloadlen == 0) { + pkt_discarded = PJ_TRUE; + goto on_return; + } + + /* Handle incoming DTMF. */ + if (hdr->pt == stream->rx_event_pt) { + /* Ignore out-of-order packet as it will be detected as new + * digit. Also ignore duplicate packet as it serves no use. + */ + if (seq_st.status.flag.outorder || seq_st.status.flag.dup) { + goto on_return; + } + + handle_incoming_dtmf(stream, payload, payloadlen); + goto on_return; + } + + /* Put "good" packet to jitter buffer, or reset the jitter buffer + * when RTP session is restarted. + */ + pj_mutex_lock( stream->jb_mutex ); + if (seq_st.status.flag.restart) { + status = pjmedia_jbuf_reset(stream->jb); + PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); + } else { + /* + * Packets may contain more than one frames, while the jitter + * buffer can only take one frame per "put" operation. So we need + * to ask the codec to "parse" the payload into multiple frames. + */ + enum { MAX = 16 }; + pj_timestamp ts; + unsigned i, count = MAX; + unsigned ts_span; + pjmedia_frame frames[MAX]; + + /* Get the timestamp of the first sample */ + ts.u64 = pj_ntohl(hdr->ts); + + /* Parse the payload. */ + status = pjmedia_codec_parse(stream->codec, (void*)payload, + payloadlen, &ts, &count, frames); + if (status != PJ_SUCCESS) { + LOGERR_((stream->port.info.name.ptr, + "Codec parse() error", + status)); + count = 0; + } + +#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) + /* This code is used to learn the samples per frame value that is put + * by remote endpoint, for codecs with inconsistent clock rate such + * as G.722 or MPEG audio. We need to learn the samples per frame + * value as it is used as divider when inserting frames into the + * jitter buffer. + */ + if (stream->has_g722_mpeg_bug) { + if (stream->rtp_rx_check_cnt) { + /* Make sure the detection performed only on two consecutive + * packets with valid RTP sequence and no wrapped timestamp. + */ + if (seq_st.diff == 1 && stream->rtp_rx_last_ts && + ts.u64 > stream->rtp_rx_last_ts && + stream->rtp_rx_last_cnt > 0) + { + unsigned peer_frm_ts_diff; + unsigned frm_ts_span; + + /* Calculate actual frame timestamp span */ + frm_ts_span = PJMEDIA_PIA_SPF(&stream->port.info) / + stream->codec_param.setting.frm_per_pkt/ + PJMEDIA_PIA_CCNT(&stream->port.info); + + /* Get remote frame timestamp span */ + peer_frm_ts_diff = + ((pj_uint32_t)ts.u64-stream->rtp_rx_last_ts) / + stream->rtp_rx_last_cnt; + + /* Possibilities remote's samples per frame for G.722 + * are only (frm_ts_span) and (frm_ts_span/2), this + * validation is needed to avoid wrong decision because + * of silence frames. + */ + if (stream->codec_param.info.pt == PJMEDIA_RTP_PT_G722 && + (peer_frm_ts_diff == frm_ts_span || + peer_frm_ts_diff == (frm_ts_span>>1))) + { + if (peer_frm_ts_diff < stream->rtp_rx_ts_len_per_frame) + stream->rtp_rx_ts_len_per_frame = peer_frm_ts_diff; + + if (--stream->rtp_rx_check_cnt == 0) { + PJ_LOG(4, (THIS_FILE, "G722 codec used, remote" + " samples per frame detected = %d", + stream->rtp_rx_ts_len_per_frame)); + + /* Reset jitter buffer once detection done */ + pjmedia_jbuf_reset(stream->jb); + } + } + } + + stream->rtp_rx_last_ts = (pj_uint32_t)ts.u64; + stream->rtp_rx_last_cnt = count; + } + + ts_span = stream->rtp_rx_ts_len_per_frame; + + /* Adjust the timestamp of the parsed frames */ + for (i=0; i<count; ++i) { + frames[i].timestamp.u64 = ts.u64 + ts_span * i; + } + + } else { + ts_span = stream->codec_param.info.frm_ptime * + stream->codec_param.info.clock_rate / + 1000; + } +#else + ts_span = stream->codec_param.info.frm_ptime * + stream->codec_param.info.clock_rate / + 1000; +#endif + + /* Put each frame to jitter buffer. */ + for (i=0; i<count; ++i) { + unsigned ext_seq; + pj_bool_t discarded; + + ext_seq = (unsigned)(frames[i].timestamp.u64 / ts_span); + pjmedia_jbuf_put_frame2(stream->jb, frames[i].buf, frames[i].size, + frames[i].bit_info, ext_seq, &discarded); + if (discarded) + pkt_discarded = PJ_TRUE; + } + +#if TRACE_JB + trace_jb_put(stream, hdr, payloadlen, count); +#endif + + } + pj_mutex_unlock( stream->jb_mutex ); + + + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is "decoding only", + * because otherwise check_tx_rtcp() will be handled by put_frame() + */ + if (stream->dir == PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(hdr->ts)); + } + + if (status != 0) { + LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", + status)); + pkt_discarded = PJ_TRUE; + goto on_return; + } + +on_return: + /* Update RTCP session */ + if (stream->rtcp.peer_ssrc == 0) + stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; + + pjmedia_rtcp_rx_rtp2(&stream->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts), payloadlen, pkt_discarded); + + /* Send RTCP RR and SDES after we receive some RTP packets */ + if (stream->rtcp.received >= 10 && !stream->initial_rr) { + status = send_rtcp(stream, !stream->rtcp_sdes_bye_disabled, + PJ_FALSE, PJ_FALSE); + if (status != PJ_SUCCESS) { + PJ_PERROR(4,(stream->port.info.name.ptr, status, + "Error sending initial RTCP RR")); + } else { + stream->initial_rr = PJ_TRUE; + } + } +} + + +/* + * This callback is called by stream transport on receipt of packets + * in the RTCP socket. + */ +static void on_rx_rtcp( void *data, + void *pkt, + pj_ssize_t bytes_read) +{ + pjmedia_stream *stream = (pjmedia_stream*) data; + + /* Check for errors */ + if (bytes_read < 0) { + LOGERR_((stream->port.info.name.ptr, "RTCP recv() error", + -bytes_read)); + return; + } + + pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); +} + + +/* + * Create media channel. + */ +static pj_status_t create_channel( pj_pool_t *pool, + pjmedia_stream *stream, + pjmedia_dir dir, + unsigned pt, + const pjmedia_stream_info *param, + pjmedia_channel **p_channel) +{ + pjmedia_channel *channel; + pj_status_t status; + + /* Allocate memory for channel descriptor */ + + channel = PJ_POOL_ZALLOC_T(pool, pjmedia_channel); + PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); + + /* Init channel info. */ + + channel->stream = stream; + channel->dir = dir; + channel->paused = 1; + channel->pt = pt; + + + /* Allocate buffer for outgoing packet. */ + + if (param->type == PJMEDIA_TYPE_AUDIO) { + channel->out_pkt_size = sizeof(pjmedia_rtp_hdr) + + stream->codec_param.info.max_bps * + PJMEDIA_MAX_FRAME_DURATION_MS / + 8 / 1000; + if (channel->out_pkt_size > PJMEDIA_MAX_MTU - + PJMEDIA_STREAM_RESV_PAYLOAD_LEN) + { + channel->out_pkt_size = PJMEDIA_MAX_MTU - + PJMEDIA_STREAM_RESV_PAYLOAD_LEN; + } + } else { + return PJ_ENOTSUP; + } + + channel->out_pkt = pj_pool_alloc(pool, channel->out_pkt_size); + PJ_ASSERT_RETURN(channel->out_pkt != NULL, PJ_ENOMEM); + + + + /* Create RTP and RTCP sessions: */ + + if (param->rtp_seq_ts_set == 0) { + status = pjmedia_rtp_session_init(&channel->rtp, pt, param->ssrc); + } else { + pjmedia_rtp_session_setting settings; + + settings.flags = (pj_uint8_t)((param->rtp_seq_ts_set << 2) | 3); + settings.default_pt = pt; + settings.sender_ssrc = param->ssrc; + settings.seq = param->rtp_seq; + settings.ts = param->rtp_ts; + status = pjmedia_rtp_session_init2(&channel->rtp, settings); + } + if (status != PJ_SUCCESS) + return status; + + /* Done. */ + *p_channel = channel; + return PJ_SUCCESS; +} + + +/* + * Create media stream. + */ +PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, + pj_pool_t *pool, + const pjmedia_stream_info *info, + pjmedia_transport *tp, + void *user_data, + pjmedia_stream **p_stream) + +{ + enum { M = 32 }; + pjmedia_stream *stream; + pj_str_t name; + unsigned jb_init, jb_max, jb_min_pre, jb_max_pre; + pjmedia_audio_format_detail *afd; + pj_pool_t *own_pool = NULL; + char *p; + pj_status_t status; + + PJ_ASSERT_RETURN(endpt && info && p_stream, PJ_EINVAL); + + if (pool == NULL) { + own_pool = pjmedia_endpt_create_pool( endpt, "strm%p", + PJMEDIA_STREAM_SIZE, + PJMEDIA_STREAM_INC); + PJ_ASSERT_RETURN(own_pool != NULL, PJ_ENOMEM); + pool = own_pool; + } + + /* Allocate the media stream: */ + + stream = PJ_POOL_ZALLOC_T(pool, pjmedia_stream); + PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); + stream->own_pool = own_pool; + pj_memcpy(&stream->si, info, sizeof(*info)); + + /* Init stream/port name */ + name.ptr = (char*) pj_pool_alloc(pool, M); + name.slen = pj_ansi_snprintf(name.ptr, M, "strm%p", stream); + + /* Init some port-info. Some parts of the info will be set later + * once we have more info about the codec. + */ + pjmedia_port_info_init(&stream->port.info, &name, + PJMEDIA_SIG_PORT_STREAM, + info->fmt.clock_rate, info->fmt.channel_cnt, + 16, 80); + afd = pjmedia_format_get_audio_format_detail(&stream->port.info.fmt, 1); + + /* Init port. */ + + //No longer there in 2.0 + //pj_strdup(pool, &stream->port.info.encoding_name, &info->fmt.encoding_name); + afd->clock_rate = info->fmt.clock_rate; + afd->channel_count = info->fmt.channel_cnt; + stream->port.port_data.pdata = stream; + + /* Init stream: */ + stream->endpt = endpt; + stream->codec_mgr = pjmedia_endpt_get_codec_mgr(endpt); + stream->dir = info->dir; + stream->user_data = user_data; + stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL-500 + (pj_rand()%1000)) * + info->fmt.clock_rate / 1000; + stream->rtcp_sdes_bye_disabled = info->rtcp_sdes_bye_disabled; + + stream->tx_event_pt = info->tx_event_pt ? info->tx_event_pt : -1; + stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; + stream->last_dtmf = -1; + stream->jb_last_frm = PJMEDIA_JB_NORMAL_FRAME; + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + stream->use_ka = info->use_ka; +#endif + + /* Build random RTCP CNAME. CNAME has user@host format */ + stream->cname.ptr = p = (char*) pj_pool_alloc(pool, 20); + pj_create_random_string(p, 5); + p += 5; + *p++ = '@'; *p++ = 'p'; *p++ = 'j'; + pj_create_random_string(p, 6); + p += 6; + *p++ = '.'; *p++ = 'o'; *p++ = 'r'; *p++ = 'g'; + stream->cname.slen = p - stream->cname.ptr; + + + /* Create mutex to protect jitter buffer: */ + + status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); + if (status != PJ_SUCCESS) + goto err_cleanup; + + + /* Create and initialize codec: */ + + status = pjmedia_codec_mgr_alloc_codec( stream->codec_mgr, + &info->fmt, &stream->codec); + if (status != PJ_SUCCESS) + goto err_cleanup; + + + /* Get codec param: */ + if (info->param) + stream->codec_param = *info->param; + else { + status = pjmedia_codec_mgr_get_default_param(stream->codec_mgr, + &info->fmt, + &stream->codec_param); + if (status != PJ_SUCCESS) + goto err_cleanup; + } + + /* Check for invalid max_bps. */ + if (stream->codec_param.info.max_bps < stream->codec_param.info.avg_bps) + stream->codec_param.info.max_bps = stream->codec_param.info.avg_bps; + + /* Check for invalid frame per packet. */ + if (stream->codec_param.setting.frm_per_pkt < 1) + stream->codec_param.setting.frm_per_pkt = 1; + + /* Init the codec. */ + status = pjmedia_codec_init(stream->codec, pool); + if (status != PJ_SUCCESS) + goto err_cleanup; + + /* Open the codec. */ + status = pjmedia_codec_open(stream->codec, &stream->codec_param); + if (status != PJ_SUCCESS) + goto err_cleanup; + + /* Set additional info and callbacks. */ + afd->bits_per_sample = 16; + afd->frame_time_usec = stream->codec_param.info.frm_ptime * + stream->codec_param.setting.frm_per_pkt * 1000; + stream->port.info.fmt.id = stream->codec_param.info.fmt_id; + if (stream->codec_param.info.fmt_id == PJMEDIA_FORMAT_L16) { + /* Raw format */ + afd->avg_bps = afd->max_bps = afd->clock_rate * afd->channel_count * + afd->bits_per_sample; + + stream->port.put_frame = &put_frame; + stream->port.get_frame = &get_frame; + } else { + /* Encoded format */ + afd->avg_bps = stream->codec_param.info.avg_bps; + afd->max_bps = stream->codec_param.info.max_bps; + + /* Not applicable for 2.0 + if ((stream->codec_param.info.max_bps * + stream->codec_param.info.frm_ptime * + stream->codec_param.setting.frm_per_pkt) % 8000 != 0) + { + ++stream->port.info.bytes_per_frame; + } + stream->port.info.format.bitrate = stream->codec_param.info.avg_bps; + stream->port.info.format.vad = (stream->codec_param.setting.vad != 0); + */ + + stream->port.put_frame = &put_frame; + stream->port.get_frame = &get_frame_ext; + } + + /* If encoder and decoder's ptime are asymmetric, then we need to + * create buffer on the encoder side. This could happen for example + * with iLBC + */ + if (stream->codec_param.info.enc_ptime!=0 && + stream->codec_param.info.enc_ptime!=stream->codec_param.info.frm_ptime) + { + unsigned ptime; + + stream->enc_samples_per_pkt = stream->codec_param.info.enc_ptime * + stream->codec_param.info.channel_cnt * + afd->clock_rate / 1000; + + /* Set buffer size as twice the largest ptime value between + * stream's ptime, encoder ptime, or decoder ptime. + */ + + ptime = afd->frame_time_usec / 1000; + + if (stream->codec_param.info.enc_ptime > ptime) + ptime = stream->codec_param.info.enc_ptime; + + if (stream->codec_param.info.frm_ptime > ptime) + ptime = stream->codec_param.info.frm_ptime; + + ptime <<= 1; + + /* Allocate buffer */ + stream->enc_buf_size = afd->clock_rate * ptime / 1000; + stream->enc_buf = (pj_int16_t*) + pj_pool_alloc(pool, stream->enc_buf_size * 2); + + } else { + stream->enc_samples_per_pkt = PJMEDIA_AFD_SPF(afd); + } + + + /* Initially disable the VAD in the stream, to help traverse NAT better */ + stream->vad_enabled = stream->codec_param.setting.vad; + if (PJMEDIA_STREAM_VAD_SUSPEND_MSEC > 0 && stream->vad_enabled) { + stream->codec_param.setting.vad = 0; + stream->ts_vad_disabled = 0; + pjmedia_codec_modify(stream->codec, &stream->codec_param); + PJ_LOG(4,(stream->port.info.name.ptr,"VAD temporarily disabled")); + } + + /* Get the frame size */ + stream->frame_size = stream->codec_param.info.max_bps * + stream->codec_param.info.frm_ptime / 8 / 1000; + if ((stream->codec_param.info.max_bps * stream->codec_param.info.frm_ptime) + % 8000 != 0) + { + ++stream->frame_size; + } + + /* How many consecutive PLC frames can be generated */ + stream->max_plc_cnt = (MAX_PLC_MSEC+stream->codec_param.info.frm_ptime-1)/ + stream->codec_param.info.frm_ptime; + +#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) + stream->rtp_rx_check_cnt = 5; + stream->has_g722_mpeg_bug = PJ_FALSE; + stream->rtp_rx_last_ts = 0; + stream->rtp_rx_last_cnt = 0; + stream->rtp_tx_ts_len_per_pkt = stream->enc_samples_per_pkt / + stream->codec_param.info.channel_cnt; + stream->rtp_rx_ts_len_per_frame = PJMEDIA_AFD_SPF(afd) / + stream->codec_param.setting.frm_per_pkt / + stream->codec_param.info.channel_cnt; + + if (info->fmt.pt == PJMEDIA_RTP_PT_G722) { + stream->has_g722_mpeg_bug = PJ_TRUE; + /* RTP clock rate = 1/2 real clock rate */ + stream->rtp_tx_ts_len_per_pkt >>= 1; + } +#endif + + /* Init jitter buffer parameters: */ + if (info->jb_max >= stream->codec_param.info.frm_ptime) + jb_max = (info->jb_max + stream->codec_param.info.frm_ptime - 1) / + stream->codec_param.info.frm_ptime; + else + jb_max = 500 / stream->codec_param.info.frm_ptime; + + if (info->jb_min_pre >= stream->codec_param.info.frm_ptime) + jb_min_pre = info->jb_min_pre / stream->codec_param.info.frm_ptime; + else + //jb_min_pre = 60 / stream->codec_param.info.frm_ptime; + jb_min_pre = 1; + + if (info->jb_max_pre >= stream->codec_param.info.frm_ptime) + jb_max_pre = info->jb_max_pre / stream->codec_param.info.frm_ptime; + else + //jb_max_pre = 240 / stream->codec_param.info.frm_ptime; + jb_max_pre = jb_max * 4 / 5; + + if (info->jb_init >= stream->codec_param.info.frm_ptime) + jb_init = info->jb_init / stream->codec_param.info.frm_ptime; + else + //jb_init = (jb_min_pre + jb_max_pre) / 2; + jb_init = 0; + + /* Create jitter buffer */ + status = pjmedia_jbuf_create(pool, &stream->port.info.name, + stream->frame_size, + stream->codec_param.info.frm_ptime, + jb_max, &stream->jb); + if (status != PJ_SUCCESS) + goto err_cleanup; + + + /* Set up jitter buffer */ + pjmedia_jbuf_set_adaptive( stream->jb, jb_init, jb_min_pre, jb_max_pre); + + /* Create decoder channel: */ + + status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, + info->rx_pt, info, &stream->dec); + if (status != PJ_SUCCESS) + goto err_cleanup; + + + /* Create encoder channel: */ + + status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING, + info->tx_pt, info, &stream->enc); + if (status != PJ_SUCCESS) + goto err_cleanup; + + + /* Init RTCP session: */ + + { + pjmedia_rtcp_session_setting rtcp_setting; + + pjmedia_rtcp_session_setting_default(&rtcp_setting); + rtcp_setting.name = stream->port.info.name.ptr; + rtcp_setting.ssrc = info->ssrc; + rtcp_setting.rtp_ts_base = pj_ntohl(stream->enc->rtp.out_hdr.ts); + rtcp_setting.clock_rate = info->fmt.clock_rate; + rtcp_setting.samples_per_frame = PJMEDIA_AFD_SPF(afd); + +#if defined(PJMEDIA_HANDLE_G722_MPEG_BUG) && (PJMEDIA_HANDLE_G722_MPEG_BUG!=0) + /* Special case for G.722 */ + if (info->fmt.pt == PJMEDIA_RTP_PT_G722) { + rtcp_setting.clock_rate = 8000; + rtcp_setting.samples_per_frame = 160; + } +#endif + + pjmedia_rtcp_init2(&stream->rtcp, &rtcp_setting); + + if (info->rtp_seq_ts_set) { + stream->rtcp.stat.rtp_tx_last_seq = info->rtp_seq; + stream->rtcp.stat.rtp_tx_last_ts = info->rtp_ts; + } + } + + /* Allocate outgoing RTCP buffer, should be enough to hold SR/RR, SDES, + * BYE, and XR. + */ + stream->out_rtcp_pkt_size = sizeof(pjmedia_rtcp_sr_pkt) + + sizeof(pjmedia_rtcp_common) + + (4 + stream->cname.slen) + + 32; +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) + if (info->rtcp_xr_enabled) { + stream->out_rtcp_pkt_size += sizeof(pjmedia_rtcp_xr_pkt); + } +#endif + + if (stream->out_rtcp_pkt_size > PJMEDIA_MAX_MTU) + stream->out_rtcp_pkt_size = PJMEDIA_MAX_MTU; + + stream->out_rtcp_pkt = pj_pool_alloc(pool, stream->out_rtcp_pkt_size); + + /* Only attach transport when stream is ready. */ + status = pjmedia_transport_attach(tp, stream, &info->rem_addr, + &info->rem_rtcp, + pj_sockaddr_get_len(&info->rem_addr), + &on_rx_rtp, &on_rx_rtcp); + if (status != PJ_SUCCESS) + goto err_cleanup; + + stream->transport = tp; + +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) + /* Enable RTCP XR and update stream info/config to RTCP XR */ + if (info->rtcp_xr_enabled) { + int i; + + pjmedia_rtcp_enable_xr(&stream->rtcp, PJ_TRUE); + + /* Set RTCP XR TX interval */ + if (info->rtcp_xr_interval != 0) + stream->rtcp_xr_interval = info->rtcp_xr_interval; + else + stream->rtcp_xr_interval = (PJMEDIA_RTCP_INTERVAL + + (pj_rand() % 8000)) * + info->fmt.clock_rate / 1000; + + /* Additional third-party RTCP XR destination */ + if (info->rtcp_xr_dest.addr.sa_family != 0) { + stream->rtcp_xr_dest_len = pj_sockaddr_get_len(&info->rtcp_xr_dest); + pj_memcpy(&stream->rtcp_xr_dest, &info->rtcp_xr_dest, + stream->rtcp_xr_dest_len); + } + + /* jitter buffer adaptive info */ + i = PJMEDIA_RTCP_XR_JB_ADAPTIVE; + pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_CONF_JBA, + i); + + /* Jitter buffer aggressiveness info (estimated) */ + i = 7; + pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_CONF_JBR, + i); + + /* Jitter buffer absolute maximum delay */ + i = jb_max * stream->codec_param.info.frm_ptime; + pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_JB_ABS_MAX, + i); + + /* PLC info */ + if (stream->codec_param.setting.plc == 0) + i = PJMEDIA_RTCP_XR_PLC_DIS; + else +#if PJMEDIA_WSOLA_IMP==PJMEDIA_WSOLA_IMP_WSOLA + i = PJMEDIA_RTCP_XR_PLC_ENH; +#else + i = PJMEDIA_RTCP_XR_PLC_DIS; +#endif + pjmedia_rtcp_xr_update_info(&stream->rtcp.xr_session, + PJMEDIA_RTCP_XR_INFO_CONF_PLC, + i); + } +#endif + + /* Send RTCP SDES */ + if (!stream->rtcp_sdes_bye_disabled) { + pjmedia_stream_send_rtcp_sdes(stream); + } + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + /* NAT hole punching by sending KA packet via RTP transport. */ + if (stream->use_ka) + send_keep_alive_packet(stream); +#endif + +#if TRACE_JB + { + char trace_name[PJ_MAXPATH]; + pj_ssize_t len; + + pj_ansi_snprintf(trace_name, sizeof(trace_name), + TRACE_JB_PATH_PREFIX "%s.csv", + stream->port.info.name.ptr); + status = pj_file_open(pool, trace_name, PJ_O_WRONLY, &stream->trace_jb_fd); + if (status != PJ_SUCCESS) { + stream->trace_jb_fd = TRACE_JB_INVALID_FD; + PJ_LOG(3,(THIS_FILE, "Failed creating RTP trace file '%s'", + trace_name)); + } else { + stream->trace_jb_buf = (char*)pj_pool_alloc(pool, PJ_LOG_MAX_SIZE); + + /* Print column header */ + len = pj_ansi_snprintf(stream->trace_jb_buf, PJ_LOG_MAX_SIZE, + "Time, Operation, Size, Frame Count, " + "Frame type, RTP Seq, RTP TS, RTP M, " + "JB size, JB burst level, JB prefetch\n"); + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + } + } +#endif + + /* Success! */ + *p_stream = stream; + + PJ_LOG(5,(THIS_FILE, "Stream %s created", stream->port.info.name.ptr)); + + return PJ_SUCCESS; + + +err_cleanup: + pjmedia_stream_destroy(stream); + return status; +} + + +/* + * Destroy stream. + */ +PJ_DEF(pj_status_t) pjmedia_stream_destroy( pjmedia_stream *stream ) +{ + PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); + + /* Send RTCP BYE (also SDES & XR) */ + if (!stream->rtcp_sdes_bye_disabled) { + send_rtcp(stream, PJ_TRUE, PJ_TRUE, PJ_TRUE); + } + + /* Detach from transport + * MUST NOT hold stream mutex while detaching from transport, as + * it may cause deadlock. See ticket #460 for the details. + */ + if (stream->transport) { + pjmedia_transport_detach(stream->transport, stream); + stream->transport = NULL; + } + + /* This function may be called when stream is partly initialized. */ + if (stream->jb_mutex) + pj_mutex_lock(stream->jb_mutex); + + + /* Free codec. */ + + if (stream->codec) { + pjmedia_codec_close(stream->codec); + pjmedia_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); + stream->codec = NULL; + } + + /* Free mutex */ + + if (stream->jb_mutex) { + pj_mutex_destroy(stream->jb_mutex); + stream->jb_mutex = NULL; + } + + /* Destroy jitter buffer */ + if (stream->jb) + pjmedia_jbuf_destroy(stream->jb); + +#if TRACE_JB + if (TRACE_JB_OPENED(stream)) { + pj_file_close(stream->trace_jb_fd); + stream->trace_jb_fd = TRACE_JB_INVALID_FD; + } +#endif + + if (stream->own_pool) { + pj_pool_t *pool = stream->own_pool; + stream->own_pool = NULL; + pj_pool_release(pool); + } + return PJ_SUCCESS; +} + + +/* + * Get the last frame frame type retreived from the jitter buffer. + */ +PJ_DEF(char) pjmedia_stream_get_last_jb_frame_type(pjmedia_stream *stream) +{ + return stream->jb_last_frm; +} + + +/* + * Get the port interface. + */ +PJ_DEF(pj_status_t) pjmedia_stream_get_port( pjmedia_stream *stream, + pjmedia_port **p_port ) +{ + *p_port = &stream->port; + return PJ_SUCCESS; +} + + +/* + * Get the transport object + */ +PJ_DEF(pjmedia_transport*) pjmedia_stream_get_transport(pjmedia_stream *st) +{ + return st->transport; +} + + +/* + * Start stream. + */ +PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) +{ + + PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP); + + if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { + stream->enc->paused = 0; + //pjmedia_snd_stream_start(stream->enc->snd_stream); + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream started")); + } else { + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); + } + + if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { + stream->dec->paused = 0; + //pjmedia_snd_stream_start(stream->dec->snd_stream); + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream started")); + } else { + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); + } + + return PJ_SUCCESS; +} + + +PJ_DEF(pj_status_t) pjmedia_stream_get_info( const pjmedia_stream *stream, + pjmedia_stream_info *info) +{ + PJ_ASSERT_RETURN(stream && info, PJ_EINVAL); + + pj_memcpy(info, &stream->si, sizeof(pjmedia_stream_info)); + return PJ_SUCCESS; +} + +/* + * Get stream statistics. + */ +PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream, + pjmedia_rtcp_stat *stat) +{ + PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); + + pj_memcpy(stat, &stream->rtcp.stat, sizeof(pjmedia_rtcp_stat)); + return PJ_SUCCESS; +} + + +/* + * Reset the stream statistics in the middle of a stream session. + */ +PJ_DEF(pj_status_t) pjmedia_stream_reset_stat(pjmedia_stream *stream) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + pjmedia_rtcp_init_stat(&stream->rtcp.stat); + + return PJ_SUCCESS; +} + + +#if defined(PJMEDIA_HAS_RTCP_XR) && (PJMEDIA_HAS_RTCP_XR != 0) +/* + * Get stream extended statistics. + */ +PJ_DEF(pj_status_t) pjmedia_stream_get_stat_xr( const pjmedia_stream *stream, + pjmedia_rtcp_xr_stat *stat) +{ + PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); + + if (stream->rtcp.xr_enabled) { + pj_memcpy(stat, &stream->rtcp.xr_session.stat, sizeof(pjmedia_rtcp_xr_stat)); + return PJ_SUCCESS; + } + return PJ_ENOTFOUND; +} +#endif + +/* + * Get jitter buffer state. + */ +PJ_DEF(pj_status_t) pjmedia_stream_get_stat_jbuf(const pjmedia_stream *stream, + pjmedia_jb_state *state) +{ + PJ_ASSERT_RETURN(stream && state, PJ_EINVAL); + return pjmedia_jbuf_get_state(stream->jb, state); +} + +/* + * Pause stream. + */ +PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream, + pjmedia_dir dir) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { + stream->enc->paused = 1; + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); + } + + if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { + stream->dec->paused = 1; + + /* Also reset jitter buffer */ + pj_mutex_lock( stream->jb_mutex ); + pjmedia_jbuf_reset(stream->jb); + pj_mutex_unlock( stream->jb_mutex ); + + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); + } + + return PJ_SUCCESS; +} + + +/* + * Resume stream + */ +PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream, + pjmedia_dir dir) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { + stream->enc->paused = 0; + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream resumed")); + } + + if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { + stream->dec->paused = 0; + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream resumed")); + } + + return PJ_SUCCESS; +} + +/* + * Dial DTMF + */ +PJ_DEF(pj_status_t) pjmedia_stream_dial_dtmf( pjmedia_stream *stream, + const pj_str_t *digit_char) +{ + pj_status_t status = PJ_SUCCESS; + + /* By convention we use jitter buffer mutex to access DTMF + * queue. + */ + PJ_ASSERT_RETURN(stream && digit_char, PJ_EINVAL); + + /* Check that remote can receive DTMF events. */ + if (stream->tx_event_pt < 0) { + return PJMEDIA_RTP_EREMNORFC2833; + } + + pj_mutex_lock(stream->jb_mutex); + + if (stream->tx_dtmf_count+digit_char->slen >= + (long)PJ_ARRAY_SIZE(stream->tx_dtmf_buf)) + { + status = PJ_ETOOMANY; + } else { + int i; + + /* convert ASCII digits into payload type first, to make sure + * that all digits are valid. + */ + for (i=0; i<digit_char->slen; ++i) { + unsigned pt; + int dig = pj_tolower(digit_char->ptr[i]); + + if (dig >= '0' && dig <= '9') + { + pt = dig - '0'; + } + else if (dig >= 'a' && dig <= 'd') + { + pt = dig - 'a' + 12; + } + else if (dig == '*') + { + pt = 10; + } + else if (dig == '#') + { + pt = 11; + } + else + { + status = PJMEDIA_RTP_EINDTMF; + break; + } + + stream->tx_dtmf_buf[stream->tx_dtmf_count+i].event = pt; + stream->tx_dtmf_buf[stream->tx_dtmf_count+i].duration = 0; + } + + if (status != PJ_SUCCESS) + goto on_return; + + /* Increment digit count only if all digits are valid. */ + stream->tx_dtmf_count += digit_char->slen; + } + +on_return: + pj_mutex_unlock(stream->jb_mutex); + + return status; +} + + +/* + * See if we have DTMF digits in the rx buffer. + */ +PJ_DEF(pj_bool_t) pjmedia_stream_check_dtmf(pjmedia_stream *stream) +{ + return stream->rx_dtmf_count != 0; +} + + +/* + * Retrieve incoming DTMF digits from the stream's DTMF buffer. + */ +PJ_DEF(pj_status_t) pjmedia_stream_get_dtmf( pjmedia_stream *stream, + char *digits, + unsigned *size) +{ + PJ_ASSERT_RETURN(stream && digits && size, PJ_EINVAL); + + pj_assert(sizeof(stream->rx_dtmf_buf[0]) == 0); + + /* By convention, we use jitter buffer's mutex to access DTMF + * digits resources. + */ + pj_mutex_lock(stream->jb_mutex); + + if (stream->rx_dtmf_count < *size) + *size = stream->rx_dtmf_count; + + if (*size) { + pj_memcpy(digits, stream->rx_dtmf_buf, *size); + stream->rx_dtmf_count -= *size; + if (stream->rx_dtmf_count) { + pj_memmove(stream->rx_dtmf_buf, + &stream->rx_dtmf_buf[*size], + stream->rx_dtmf_count); + } + } + + pj_mutex_unlock(stream->jb_mutex); + + return PJ_SUCCESS; +} + + +/* + * Set callback to be called upon receiving DTMF digits. + */ +PJ_DEF(pj_status_t) pjmedia_stream_set_dtmf_callback(pjmedia_stream *stream, + void (*cb)(pjmedia_stream*, + void *user_data, + int digit), + void *user_data) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + /* By convention, we use jitter buffer's mutex to access DTMF + * digits resources. + */ + pj_mutex_lock(stream->jb_mutex); + + stream->dtmf_cb = cb; + stream->dtmf_cb_user_data = user_data; + + pj_mutex_unlock(stream->jb_mutex); + + return PJ_SUCCESS; +} + +/* + * Send RTCP SDES. + */ +PJ_DEF(pj_status_t) +pjmedia_stream_send_rtcp_sdes( pjmedia_stream *stream ) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + return send_rtcp(stream, PJ_TRUE, PJ_FALSE, PJ_FALSE); +} + +/* + * Send RTCP BYE. + */ +PJ_DEF(pj_status_t) +pjmedia_stream_send_rtcp_bye( pjmedia_stream *stream ) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + if (stream->enc && stream->transport) { + return send_rtcp(stream, PJ_TRUE, PJ_TRUE, PJ_FALSE); + } + + return PJ_SUCCESS; +} |