diff options
Diffstat (limited to 'pjmedia/src/pjmedia/vid_stream.c')
-rw-r--r-- | pjmedia/src/pjmedia/vid_stream.c | 1940 |
1 files changed, 1940 insertions, 0 deletions
diff --git a/pjmedia/src/pjmedia/vid_stream.c b/pjmedia/src/pjmedia/vid_stream.c new file mode 100644 index 00000000..216674b5 --- /dev/null +++ b/pjmedia/src/pjmedia/vid_stream.c @@ -0,0 +1,1940 @@ +/* $Id$ */ +/* + * Copyright (C) 2011 Teluu Inc. (http://www.teluu.com) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include <pjmedia/vid_stream.h> +#include <pjmedia/errno.h> +#include <pjmedia/event.h> +#include <pjmedia/rtp.h> +#include <pjmedia/rtcp.h> +#include <pjmedia/jbuf.h> +#include <pjmedia/sdp_neg.h> +#include <pjmedia/stream_common.h> +#include <pj/array.h> +#include <pj/assert.h> +#include <pj/ctype.h> +#include <pj/compat/socket.h> +#include <pj/errno.h> +#include <pj/ioqueue.h> +#include <pj/log.h> +#include <pj/os.h> +#include <pj/pool.h> +#include <pj/rand.h> +#include <pj/sock_select.h> +#include <pj/string.h> /* memcpy() */ + + +#define THIS_FILE "vid_stream.c" +#define ERRLEVEL 1 +#define LOGERR_(expr) stream_perror expr +#define TRC_(expr) PJ_LOG(5,expr) +#define SIGNATURE PJMEDIA_SIG_PORT_VID_STREAM + +/* Tracing jitter buffer operations in a stream session to a CSV file. + * The trace will contain JB operation timestamp, frame info, RTP info, and + * the JB state right after the operation. + */ +#define TRACE_JB 0 /* Enable/disable trace. */ +#define TRACE_JB_PATH_PREFIX "" /* Optional path/prefix + for the CSV filename. */ +#if TRACE_JB +# include <pj/file_io.h> +# define TRACE_JB_INVALID_FD ((pj_oshandle_t)-1) +# define TRACE_JB_OPENED(s) (s->trace_jb_fd != TRACE_JB_INVALID_FD) +#endif + +#ifndef PJMEDIA_VSTREAM_SIZE +# define PJMEDIA_VSTREAM_SIZE 1000 +#endif + +#ifndef PJMEDIA_VSTREAM_INC +# define PJMEDIA_VSTREAM_INC 1000 +#endif + + +/** + * Media channel. + */ +typedef struct pjmedia_vid_channel +{ + pjmedia_vid_stream *stream; /**< Parent stream. */ + pjmedia_dir dir; /**< Channel direction. */ + pjmedia_port port; /**< Port interface. */ + unsigned pt; /**< Payload type. */ + pj_bool_t paused; /**< Paused?. */ + void *buf; /**< Output buffer. */ + unsigned buf_size; /**< Size of output buffer. */ + unsigned buf_len; /**< Length of data in buffer. */ + pjmedia_rtp_session rtp; /**< RTP session. */ +} pjmedia_vid_channel; + + +/** + * This structure describes media stream. + * A media stream is bidirectional media transmission between two endpoints. + * It consists of two channels, i.e. encoding and decoding channels. + * A media stream corresponds to a single "m=" line in a SDP session + * description. + */ +struct pjmedia_vid_stream +{ + pj_pool_t *own_pool; /**< Internal pool. */ + pjmedia_endpt *endpt; /**< Media endpoint. */ + pjmedia_vid_codec_mgr *codec_mgr; /**< Codec manager. */ + pjmedia_vid_stream_info info; /**< Stream info. */ + + pjmedia_vid_channel *enc; /**< Encoding channel. */ + pjmedia_vid_channel *dec; /**< Decoding channel. */ + + pjmedia_dir dir; /**< Stream direction. */ + void *user_data; /**< User data. */ + pj_str_t name; /**< Stream name */ + pj_str_t cname; /**< SDES CNAME */ + + pjmedia_transport *transport; /**< Stream transport. */ + + pj_mutex_t *jb_mutex; + pjmedia_jbuf *jb; /**< Jitter buffer. */ + char jb_last_frm; /**< Last frame type from jb */ + unsigned jb_last_frm_cnt;/**< Last JB frame type counter*/ + + pjmedia_rtcp_session rtcp; /**< RTCP for incoming RTP. */ + pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ + pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ + pj_bool_t initial_rr; /**< Initial RTCP RR sent */ + + unsigned frame_size; /**< Size of encoded base frame.*/ + unsigned frame_ts_len; /**< Frame length in timestamp. */ + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + pj_bool_t use_ka; /**< Stream keep-alive with non- + codec-VAD mechanism is + enabled? */ + pj_timestamp last_frm_ts_sent; /**< Timestamp of last sending + packet */ +#endif + +#if TRACE_JB + pj_oshandle_t trace_jb_fd; /**< Jitter tracing file handle.*/ + char *trace_jb_buf; /**< Jitter tracing buffer. */ +#endif + + pjmedia_vid_codec *codec; /**< Codec instance being used. */ + pj_uint32_t last_dec_ts; /**< Last decoded timestamp. */ + int last_dec_seq; /**< Last decoded sequence. */ + + pjmedia_event_subscription esub_codec; /**< To subscribe codec events */ + pjmedia_event_publisher epub; /**< To publish events */ +}; + + +/* + * Print error. + */ +static void stream_perror(const char *sender, const char *title, + pj_status_t status) +{ + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(sender, "%s: %s [err:%d]", title, errmsg, status)); +} + + +#if TRACE_JB + +PJ_INLINE(int) trace_jb_print_timestamp(char **buf, pj_ssize_t len) +{ + pj_time_val now; + pj_parsed_time ptime; + char *p = *buf; + + if (len < 14) + return -1; + + pj_gettimeofday(&now); + pj_time_decode(&now, &ptime); + p += pj_utoa_pad(ptime.hour, p, 2, '0'); + *p++ = ':'; + p += pj_utoa_pad(ptime.min, p, 2, '0'); + *p++ = ':'; + p += pj_utoa_pad(ptime.sec, p, 2, '0'); + *p++ = '.'; + p += pj_utoa_pad(ptime.msec, p, 3, '0'); + *p++ = ','; + + *buf = p; + + return 0; +} + +PJ_INLINE(int) trace_jb_print_state(pjmedia_vid_stream *stream, + char **buf, pj_ssize_t len) +{ + char *p = *buf; + char *endp = *buf + len; + pjmedia_jb_state state; + + pjmedia_jbuf_get_state(stream->jb, &state); + + len = pj_ansi_snprintf(p, endp-p, "%d, %d, %d", + state.size, state.burst, state.prefetch); + if ((len < 0) || (len >= endp-p)) + return -1; + + p += len; + *buf = p; + return 0; +} + +static void trace_jb_get(pjmedia_vid_stream *stream, pjmedia_jb_frame_type ft, + pj_size_t fsize) +{ + char *p = stream->trace_jb_buf; + char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; + pj_ssize_t len = 0; + const char* ft_st; + + if (!TRACE_JB_OPENED(stream)) + return; + + /* Print timestamp. */ + if (trace_jb_print_timestamp(&p, endp-p)) + goto on_insuff_buffer; + + /* Print frame type and size */ + switch(ft) { + case PJMEDIA_JB_MISSING_FRAME: + ft_st = "missing"; + break; + case PJMEDIA_JB_NORMAL_FRAME: + ft_st = "normal"; + break; + case PJMEDIA_JB_ZERO_PREFETCH_FRAME: + ft_st = "prefetch"; + break; + case PJMEDIA_JB_ZERO_EMPTY_FRAME: + ft_st = "empty"; + break; + default: + ft_st = "unknown"; + break; + } + + /* Print operation, size, frame count, frame type */ + len = pj_ansi_snprintf(p, endp-p, "GET,%d,1,%s,,,,", fsize, ft_st); + if ((len < 0) || (len >= endp-p)) + goto on_insuff_buffer; + p += len; + + /* Print JB state */ + if (trace_jb_print_state(stream, &p, endp-p)) + goto on_insuff_buffer; + + /* Print end of line */ + if (endp-p < 2) + goto on_insuff_buffer; + *p++ = '\n'; + + /* Write and flush */ + len = p - stream->trace_jb_buf; + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + return; + +on_insuff_buffer: + pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); +} + +static void trace_jb_put(pjmedia_vid_stream *stream, + const pjmedia_rtp_hdr *hdr, + unsigned payloadlen, unsigned frame_cnt) +{ + char *p = stream->trace_jb_buf; + char *endp = stream->trace_jb_buf + PJ_LOG_MAX_SIZE; + pj_ssize_t len = 0; + + if (!TRACE_JB_OPENED(stream)) + return; + + /* Print timestamp. */ + if (trace_jb_print_timestamp(&p, endp-p)) + goto on_insuff_buffer; + + /* Print operation, size, frame count, RTP info */ + len = pj_ansi_snprintf(p, endp-p, + "PUT,%d,%d,,%d,%d,%d,", + payloadlen, frame_cnt, + pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), hdr->m); + if ((len < 0) || (len >= endp-p)) + goto on_insuff_buffer; + p += len; + + /* Print JB state */ + if (trace_jb_print_state(stream, &p, endp-p)) + goto on_insuff_buffer; + + /* Print end of line */ + if (endp-p < 2) + goto on_insuff_buffer; + *p++ = '\n'; + + /* Write and flush */ + len = p - stream->trace_jb_buf; + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + return; + +on_insuff_buffer: + pj_assert(!"Trace buffer too small, check PJ_LOG_MAX_SIZE!"); +} + +#endif /* TRACE_JB */ + +static void dump_port_info(const pjmedia_vid_channel *chan, + const char *event_name) +{ + const pjmedia_port_info *pi = &chan->port.info; + char fourcc_name[5]; + + PJ_LOG(5, (pi->name.ptr, + " %s format %s: %dx%d %s%s %d/%d(~%d)fps", + (chan->dir==PJMEDIA_DIR_DECODING? "Decoding":"Encoding"), + event_name, + pi->fmt.det.vid.size.w, pi->fmt.det.vid.size.h, + pjmedia_fourcc_name(pi->fmt.id, fourcc_name), + (chan->dir==PJMEDIA_DIR_ENCODING?"->":"<-"), + pi->fmt.det.vid.fps.num, pi->fmt.det.vid.fps.denum, + pi->fmt.det.vid.fps.num/pi->fmt.det.vid.fps.denum)); +} + +/* + * Handle events from stream components. + */ +static pj_status_t stream_event_cb(pjmedia_event_subscription *esub, + pjmedia_event *event) +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*)esub->user_data; + + if (esub == &stream->esub_codec) { + /* This is codec event */ + switch (event->type) { + case PJMEDIA_EVENT_FMT_CHANGED: + /* Update param from codec */ + pjmedia_vid_codec_get_param(stream->codec, stream->info.codec_param); + + /* Update decoding channel port info */ + pjmedia_format_copy(&stream->dec->port.info.fmt, + &stream->info.codec_param->dec_fmt); + + /* we process the event */ + ++event->proc_cnt; + + dump_port_info(event->data.fmt_changed.dir==PJMEDIA_DIR_DECODING ? + stream->dec : stream->enc, + "changed"); + break; + default: + break; + } + } + + return pjmedia_event_publish(&stream->epub, event); +} + +static pjmedia_event_publisher *port_get_epub(pjmedia_port *port) +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata; + return &stream->epub; +} + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 +/* + * Send keep-alive packet using non-codec frame. + */ +static void send_keep_alive_packet(pjmedia_vid_stream *stream) +{ +#if PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_EMPTY_RTP + + /* Keep-alive packet is empty RTP */ + pj_status_t status; + void *pkt; + int pkt_len; + + TRC_((channel->port.info.name.ptr, + "Sending keep-alive (RTCP and empty RTP)")); + + /* Send RTP */ + status = pjmedia_rtp_encode_rtp( &stream->enc->rtp, + stream->enc->pt, 0, + 1, + 0, + (const void**)&pkt, + &pkt_len); + pj_assert(status == PJ_SUCCESS); + + pj_memcpy(stream->enc->buf, pkt, pkt_len); + pjmedia_transport_send_rtp(stream->transport, stream->enc->buf, + pkt_len); + + /* Send RTCP */ + pjmedia_rtcp_build_rtcp(&stream->rtcp, &pkt, &pkt_len); + pjmedia_transport_send_rtcp(stream->transport, pkt, pkt_len); + +#elif PJMEDIA_STREAM_ENABLE_KA == PJMEDIA_STREAM_KA_USER + + /* Keep-alive packet is defined in PJMEDIA_STREAM_KA_USER_PKT */ + int pkt_len; + const pj_str_t str_ka = PJMEDIA_STREAM_KA_USER_PKT; + + TRC_((channel->port.info.name.ptr, + "Sending keep-alive (custom RTP/RTCP packets)")); + + /* Send to RTP port */ + pj_memcpy(stream->enc->buf, str_ka.ptr, str_ka.slen); + pkt_len = str_ka.slen; + pjmedia_transport_send_rtp(stream->transport, stream->enc->buf, + pkt_len); + + /* Send to RTCP port */ + pjmedia_transport_send_rtcp(stream->transport, stream->enc->buf, + pkt_len); + +#else + + PJ_UNUSED_ARG(stream); + +#endif +} +#endif /* defined(PJMEDIA_STREAM_ENABLE_KA) */ + + +/** + * check_tx_rtcp() + * + * This function is can be called by either put_frame() or get_frame(), + * to transmit periodic RTCP SR/RR report. + */ +static void check_tx_rtcp(pjmedia_vid_stream *stream, pj_uint32_t timestamp) +{ + /* Note that timestamp may represent local or remote timestamp, + * depending on whether this function is called from put_frame() + * or get_frame(). + */ + + + if (stream->rtcp_last_tx == 0) { + + stream->rtcp_last_tx = timestamp; + + } else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) { + + void *rtcp_pkt; + int len; + + pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); + + pjmedia_transport_send_rtcp(stream->transport, rtcp_pkt, len); + + stream->rtcp_last_tx = timestamp; + } +} + +/* Build RTCP SDES packet */ +static unsigned create_rtcp_sdes(pjmedia_vid_stream *stream, pj_uint8_t *pkt, + unsigned max_len) +{ + pjmedia_rtcp_common hdr; + pj_uint8_t *p = pkt; + + /* SDES header */ + hdr.version = 2; + hdr.p = 0; + hdr.count = 1; + hdr.pt = 202; + hdr.length = 2 + (4+stream->cname.slen+3)/4 - 1; + if (max_len < (hdr.length << 2)) { + pj_assert(!"Not enough buffer for SDES packet"); + return 0; + } + hdr.length = pj_htons((pj_uint16_t)hdr.length); + hdr.ssrc = stream->enc->rtp.out_hdr.ssrc; + pj_memcpy(p, &hdr, sizeof(hdr)); + p += sizeof(hdr); + + /* CNAME item */ + *p++ = 1; + *p++ = (pj_uint8_t)stream->cname.slen; + pj_memcpy(p, stream->cname.ptr, stream->cname.slen); + p += stream->cname.slen; + + /* END */ + *p++ = '\0'; + *p++ = '\0'; + + /* Pad to 32bit */ + while ((p-pkt) % 4) + *p++ = '\0'; + + return (p - pkt); +} + +/* Build RTCP BYE packet */ +static unsigned create_rtcp_bye(pjmedia_vid_stream *stream, pj_uint8_t *pkt, + unsigned max_len) +{ + pjmedia_rtcp_common hdr; + + /* BYE header */ + hdr.version = 2; + hdr.p = 0; + hdr.count = 1; + hdr.pt = 203; + hdr.length = 1; + if (max_len < (hdr.length << 2)) { + pj_assert(!"Not enough buffer for SDES packet"); + return 0; + } + hdr.length = pj_htons((pj_uint16_t)hdr.length); + hdr.ssrc = stream->enc->rtp.out_hdr.ssrc; + pj_memcpy(pkt, &hdr, sizeof(hdr)); + + return sizeof(hdr); +} + + +#if 0 +static void dump_bin(const char *buf, unsigned len) +{ + unsigned i; + + PJ_LOG(3,(THIS_FILE, "begin dump")); + for (i=0; i<len; ++i) { + int j; + char bits[9]; + unsigned val = buf[i] & 0xFF; + + bits[8] = '\0'; + for (j=0; j<8; ++j) { + if (val & (1 << (7-j))) + bits[j] = '1'; + else + bits[j] = '0'; + } + + PJ_LOG(3,(THIS_FILE, "%2d %s [%d]", i, bits, val)); + } + PJ_LOG(3,(THIS_FILE, "end dump")); +} +#endif + + +/* + * This callback is called by stream transport on receipt of packets + * in the RTP socket. + */ +static void on_rx_rtp( void *data, + void *pkt, + pj_ssize_t bytes_read) + +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*) data; + pjmedia_vid_channel *channel = stream->dec; + const pjmedia_rtp_hdr *hdr; + const void *payload; + unsigned payloadlen; + pjmedia_rtp_status seq_st; + pj_status_t status; + pj_bool_t pkt_discarded = PJ_FALSE; + + /* Check for errors */ + if (bytes_read < 0) { + LOGERR_((channel->port.info.name.ptr, "RTP recv() error", -bytes_read)); + return; + } + + /* Ignore keep-alive packets */ + if (bytes_read < (pj_ssize_t) sizeof(pjmedia_rtp_hdr)) + return; + + /* Update RTP and RTCP session. */ + status = pjmedia_rtp_decode_rtp(&channel->rtp, pkt, bytes_read, + &hdr, &payload, &payloadlen); + if (status != PJ_SUCCESS) { + LOGERR_((channel->port.info.name.ptr, "RTP decode error", status)); + stream->rtcp.stat.rx.discard++; + return; + } + + /* Ignore the packet if decoder is paused */ + if (channel->paused) + goto on_return; + + /* Update RTP session (also checks if RTP session can accept + * the incoming packet. + */ + pjmedia_rtp_session_update2(&channel->rtp, hdr, &seq_st, PJ_TRUE); + if (seq_st.status.value) { + TRC_ ((channel->port.info.name.ptr, + "RTP status: badpt=%d, badssrc=%d, dup=%d, " + "outorder=%d, probation=%d, restart=%d", + seq_st.status.flag.badpt, + seq_st.status.flag.badssrc, + seq_st.status.flag.dup, + seq_st.status.flag.outorder, + seq_st.status.flag.probation, + seq_st.status.flag.restart)); + + if (seq_st.status.flag.badpt) { + PJ_LOG(4,(channel->port.info.name.ptr, + "Bad RTP pt %d (expecting %d)", + hdr->pt, channel->rtp.out_pt)); + } + + if (seq_st.status.flag.badssrc) { + PJ_LOG(4,(channel->port.info.name.ptr, + "Changed RTP peer SSRC %d (previously %d)", + channel->rtp.peer_ssrc, stream->rtcp.peer_ssrc)); + stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; + } + + + } + + /* Skip bad RTP packet */ + if (seq_st.status.flag.bad) { + pkt_discarded = PJ_TRUE; + goto on_return; + } + + /* Ignore if payloadlen is zero */ + if (payloadlen == 0) { + pkt_discarded = PJ_TRUE; + goto on_return; + } + + + /* Put "good" packet to jitter buffer, or reset the jitter buffer + * when RTP session is restarted. + */ + pj_mutex_lock( stream->jb_mutex ); + if (seq_st.status.flag.restart) { + status = pjmedia_jbuf_reset(stream->jb); + PJ_LOG(4,(channel->port.info.name.ptr, "Jitter buffer reset")); + } else { + /* Just put the payload into jitter buffer */ + pjmedia_jbuf_put_frame3(stream->jb, payload, payloadlen, 0, + pj_ntohs(hdr->seq), pj_ntohl(hdr->ts), NULL); + +#if TRACE_JB + trace_jb_put(stream, hdr, payloadlen, count); +#endif + + } + pj_mutex_unlock( stream->jb_mutex ); + + + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is "decoding only", + * because otherwise check_tx_rtcp() will be handled by put_frame() + */ + if (stream->dir == PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(hdr->ts)); + } + + if (status != 0) { + LOGERR_((channel->port.info.name.ptr, "Jitter buffer put() error", + status)); + pkt_discarded = PJ_TRUE; + goto on_return; + } + +on_return: + /* Update RTCP session */ + if (stream->rtcp.peer_ssrc == 0) + stream->rtcp.peer_ssrc = channel->rtp.peer_ssrc; + + pjmedia_rtcp_rx_rtp2(&stream->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts), payloadlen, pkt_discarded); + + /* Send RTCP RR and SDES after we receive some RTP packets */ + if (stream->rtcp.received >= 10 && !stream->initial_rr) { + void *sr_rr_pkt; + pj_uint8_t *pkt; + int len; + + /* Build RR or SR */ + pjmedia_rtcp_build_rtcp(&stream->rtcp, &sr_rr_pkt, &len); + pkt = (pj_uint8_t*) stream->enc->buf; + pj_memcpy(pkt, sr_rr_pkt, len); + pkt += len; + + /* Append SDES */ + len = create_rtcp_sdes(stream, (pj_uint8_t*)pkt, + stream->enc->buf_size - len); + if (len > 0) { + pkt += len; + len = ((pj_uint8_t*)pkt) - ((pj_uint8_t*)stream->enc->buf); + pjmedia_transport_send_rtcp(stream->transport, + stream->enc->buf, len); + } + + stream->initial_rr = PJ_TRUE; + } +} + + +/* + * This callback is called by stream transport on receipt of packets + * in the RTCP socket. + */ +static void on_rx_rtcp( void *data, + void *pkt, + pj_ssize_t bytes_read) +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*) data; + + /* Check for errors */ + if (bytes_read < 0) { + LOGERR_((stream->cname.ptr, "RTCP recv() error", + -bytes_read)); + return; + } + + pjmedia_rtcp_rx_rtcp(&stream->rtcp, pkt, bytes_read); +} + +static pj_status_t put_frame(pjmedia_port *port, + pjmedia_frame *frame) +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata; + pjmedia_vid_channel *channel = stream->enc; + pj_status_t status = 0; + pjmedia_frame frame_out; + unsigned rtp_ts_len; + void *rtphdr; + int rtphdrlen; + unsigned processed = 0; + + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA != 0 + /* If the interval since last sending packet is greater than + * PJMEDIA_STREAM_KA_INTERVAL, send keep-alive packet. + */ + if (stream->use_ka) + { + pj_uint32_t dtx_duration; + + dtx_duration = pj_timestamp_diff32(&stream->last_frm_ts_sent, + &frame->timestamp); + if (dtx_duration > + PJMEDIA_STREAM_KA_INTERVAL * channel->port.info.clock_rate) + { + send_keep_alive_packet(stream); + stream->last_frm_ts_sent = frame->timestamp; + } + } +#endif + + /* Don't do anything if stream is paused */ + if (channel->paused) { + return PJ_SUCCESS; + } + + /* Get frame length in timestamp unit */ + rtp_ts_len = stream->frame_ts_len; + + /* Init frame_out buffer. */ + frame_out.buf = ((char*)channel->buf) + sizeof(pjmedia_rtp_hdr); + frame_out.size = 0; + + /* Encode! */ + status = pjmedia_vid_codec_encode(stream->codec, frame, + channel->buf_size - + sizeof(pjmedia_rtp_hdr), + &frame_out); + if (status != PJ_SUCCESS) { + LOGERR_((channel->port.info.name.ptr, + "Codec encode() error", status)); + + /* Update RTP timestamp */ + pjmedia_rtp_encode_rtp(&channel->rtp, channel->pt, 1, 0, + rtp_ts_len, (const void**)&rtphdr, &rtphdrlen); + return status; + } + + + while (processed < frame_out.size) { + pj_uint8_t *payload; + pj_uint8_t *rtp_pkt; + pj_size_t payload_len; + + /* Generate RTP payload */ + status = pjmedia_vid_codec_packetize(stream->codec, + (pj_uint8_t*)frame_out.buf, + frame_out.size, + &processed, + (const pj_uint8_t**)&payload, + &payload_len); + if (status != PJ_SUCCESS) { + LOGERR_((channel->port.info.name.ptr, + "Codec pack() error", status)); + + /* Update RTP timestamp */ + pjmedia_rtp_encode_rtp(&channel->rtp, channel->pt, 1, 0, + rtp_ts_len, (const void**)&rtphdr, + &rtphdrlen); + return status; + } + + /* Encapsulate. */ + status = pjmedia_rtp_encode_rtp( &channel->rtp, + channel->pt, + (processed==frame_out.size?1:0), + payload_len, + rtp_ts_len, + (const void**)&rtphdr, + &rtphdrlen); + + if (status != PJ_SUCCESS) { + LOGERR_((channel->port.info.name.ptr, + "RTP encode_rtp() error", status)); + return status; + } + + /* Next packets use same timestamp */ + rtp_ts_len = 0; + + rtp_pkt = payload - sizeof(pjmedia_rtp_hdr); + + /* Copy RTP header to the beginning of packet */ + pj_memcpy(rtp_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); + + /* Send the RTP packet to the transport. */ + pjmedia_transport_send_rtp(stream->transport, rtp_pkt, + payload_len + sizeof(pjmedia_rtp_hdr)); + } + + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is not "decoding only", because + * when it is, check_tx_rtcp() will be handled by get_frame(). + */ + if (stream->dir != PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); + } + + /* Do nothing if we have nothing to transmit */ + if (frame_out.size == 0) { + return PJ_SUCCESS; + } + + /* Update stat */ + pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size); + stream->rtcp.stat.rtp_tx_last_ts = pj_ntohl(stream->enc->rtp.out_hdr.ts); + stream->rtcp.stat.rtp_tx_last_seq = pj_ntohs(stream->enc->rtp.out_hdr.seq); + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + /* Update timestamp of last sending packet. */ + stream->last_frm_ts_sent = frame->timestamp; +#endif + + return PJ_SUCCESS; +} + +static pj_status_t get_frame(pjmedia_port *port, + pjmedia_frame *frame) +{ + pjmedia_vid_stream *stream = (pjmedia_vid_stream*) port->port_data.pdata; + pjmedia_vid_channel *channel = stream->dec; + pjmedia_frame frame_in; + pj_uint32_t last_ts = 0; + int frm_first_seq = 0, frm_last_seq = 0; + pj_status_t status; + + /* Return no frame is channel is paused */ + if (channel->paused) { + frame->type = PJMEDIA_FRAME_TYPE_NONE; + return PJ_SUCCESS; + } + + /* Repeat get payload from the jitter buffer until all payloads with same + * timestamp are collected (a complete frame unpacketized). + */ + { + pj_bool_t got_frame; + unsigned cnt; + + channel->buf_len = 0; + got_frame = PJ_FALSE; + + /* Lock jitter buffer mutex first */ + pj_mutex_lock( stream->jb_mutex ); + + /* Check if we got a decodable frame */ + for (cnt=0; ; ++cnt) { + char ptype; + pj_uint32_t ts; + int seq; + + /* Peek frame from jitter buffer. */ + pjmedia_jbuf_peek_frame(stream->jb, cnt, NULL, NULL, + &ptype, NULL, &ts, &seq); + if (ptype == PJMEDIA_JB_NORMAL_FRAME) { + if (last_ts == 0) { + last_ts = ts; + frm_first_seq = seq; + } + if (ts != last_ts) { + got_frame = PJ_TRUE; + break; + } + frm_last_seq = seq; + } else if (ptype == PJMEDIA_JB_ZERO_EMPTY_FRAME) { + /* No more packet in the jitter buffer */ + break; + } + } + + if (got_frame) { + unsigned i; + + /* Generate frame bitstream from the payload */ + channel->buf_len = 0; + for (i = 0; i < cnt; ++i) { + const pj_uint8_t *p; + pj_size_t psize; + char ptype; + + /* We use jbuf_peek_frame() as it will returns the pointer of + * the payload (no buffer and memcpy needed), just as we need. + */ + pjmedia_jbuf_peek_frame(stream->jb, i, (const void**)&p, + &psize, &ptype, NULL, NULL, NULL); + + if (ptype != PJMEDIA_JB_NORMAL_FRAME) { + /* Packet lost, must set payload to NULL and keep going */ + p = NULL; + psize = 0; + } + + status = pjmedia_vid_codec_unpacketize( + stream->codec, + p, psize, + (pj_uint8_t*)channel->buf, + channel->buf_size, + &channel->buf_len); + if (status != PJ_SUCCESS) { + LOGERR_((channel->port.info.name.ptr, + "Codec unpack() error", status)); + /* Just ignore this unpack error */ + } + } + + pjmedia_jbuf_remove_frame(stream->jb, cnt); + } + + /* Unlock jitter buffer mutex. */ + pj_mutex_unlock( stream->jb_mutex ); + + if (!got_frame) { + frame->type = PJMEDIA_FRAME_TYPE_NONE; + frame->size = 0; + return PJ_SUCCESS; + } + } + + /* Decode */ + frame_in.buf = channel->buf; + frame_in.size = channel->buf_len; + frame_in.bit_info = 0; + frame_in.type = PJMEDIA_FRAME_TYPE_VIDEO; + frame_in.timestamp.u64 = last_ts; + + status = pjmedia_vid_codec_decode(stream->codec, &frame_in, + frame->size, frame); + if (status != PJ_SUCCESS) { + LOGERR_((port->info.name.ptr, "codec decode() error", + status)); + frame->type = PJMEDIA_FRAME_TYPE_NONE; + frame->size = 0; + } + + /* Learn remote frame rate after successful decoding */ + if (0 && frame->type == PJMEDIA_FRAME_TYPE_VIDEO && frame->size) + { + /* Only check remote frame rate when timestamp is not wrapping and + * sequence is increased by 1. + */ + if (last_ts > stream->last_dec_ts && + frm_first_seq - stream->last_dec_seq == 1) + { + pj_uint32_t ts_diff; + pjmedia_video_format_detail *vfd; + + ts_diff = last_ts - stream->last_dec_ts; + vfd = pjmedia_format_get_video_format_detail( + &channel->port.info.fmt, PJ_TRUE); + if ((int)(stream->info.codec_info.clock_rate / ts_diff) != + vfd->fps.num / vfd->fps.denum) + { + /* Frame rate changed, update decoding port info */ + vfd->fps.num = stream->info.codec_info.clock_rate; + vfd->fps.denum = ts_diff; + + /* Update stream info */ + stream->info.codec_param->dec_fmt.det.vid.fps = vfd->fps; + + PJ_LOG(5, (channel->port.info.name.ptr, + "Frame rate changed to %d/%d(~%d)fps", + vfd->fps.num, vfd->fps.denum, + vfd->fps.num / vfd->fps.denum)); + + /* Publish PJMEDIA_EVENT_FMT_CHANGED event */ + if (pjmedia_event_publisher_has_sub(&stream->epub)) { + pjmedia_event event; + + dump_port_info(stream->dec, "changed"); + + pjmedia_event_init(&event, PJMEDIA_EVENT_FMT_CHANGED, + &frame_in.timestamp, &stream->epub); + event.data.fmt_changed.dir = PJMEDIA_DIR_DECODING; + pj_memcpy(&event.data.fmt_changed.new_fmt, + &stream->info.codec_param->dec_fmt, + sizeof(pjmedia_format)); + pjmedia_event_publish(&stream->epub, &event); + } + } + } + + /* Update last frame seq and timestamp */ + stream->last_dec_seq = frm_last_seq; + stream->last_dec_ts = last_ts; + } + + return PJ_SUCCESS; +} + + +/* + * Create media channel. + */ +static pj_status_t create_channel( pj_pool_t *pool, + pjmedia_vid_stream *stream, + pjmedia_dir dir, + unsigned pt, + const pjmedia_vid_stream_info *info, + pjmedia_vid_channel **p_channel) +{ + enum { M = 32 }; + pjmedia_vid_channel *channel; + pj_status_t status; + unsigned min_out_pkt_size; + pj_str_t name; + const char *type_name; + pjmedia_format *fmt; + char fourcc_name[5]; + pjmedia_port_info *pi; + + pj_assert(info->type == PJMEDIA_TYPE_VIDEO); + pj_assert(dir == PJMEDIA_DIR_DECODING || dir == PJMEDIA_DIR_ENCODING); + + /* Allocate memory for channel descriptor */ + channel = PJ_POOL_ZALLOC_T(pool, pjmedia_vid_channel); + PJ_ASSERT_RETURN(channel != NULL, PJ_ENOMEM); + + /* Init vars */ + if (dir==PJMEDIA_DIR_DECODING) { + type_name = "vstdec"; + fmt = &info->codec_param->dec_fmt; + } else { + type_name = "vstenc"; + fmt = &info->codec_param->enc_fmt; + } + name.ptr = (char*) pj_pool_alloc(pool, M); + name.slen = pj_ansi_snprintf(name.ptr, M, "%s%p", type_name, stream); + pi = &channel->port.info; + + /* Init channel info. */ + channel->stream = stream; + channel->dir = dir; + channel->paused = 1; + channel->pt = pt; + + /* Allocate buffer for outgoing packet. */ + channel->buf_size = sizeof(pjmedia_rtp_hdr) + stream->frame_size; + + /* It should big enough to hold (minimally) RTCP SR with an SDES. */ + min_out_pkt_size = sizeof(pjmedia_rtcp_sr_pkt) + + sizeof(pjmedia_rtcp_common) + + (4 + stream->cname.slen) + + 32; + + if (channel->buf_size < min_out_pkt_size) + channel->buf_size = min_out_pkt_size; + + channel->buf = pj_pool_alloc(pool, channel->buf_size); + PJ_ASSERT_RETURN(channel->buf != NULL, PJ_ENOMEM); + + /* Create RTP and RTCP sessions: */ + if (info->rtp_seq_ts_set == 0) { + status = pjmedia_rtp_session_init(&channel->rtp, pt, info->ssrc); + } else { + pjmedia_rtp_session_setting settings; + + settings.flags = (pj_uint8_t)((info->rtp_seq_ts_set << 2) | 3); + settings.default_pt = pt; + settings.sender_ssrc = info->ssrc; + settings.seq = info->rtp_seq; + settings.ts = info->rtp_ts; + status = pjmedia_rtp_session_init2(&channel->rtp, settings); + } + if (status != PJ_SUCCESS) + return status; + + /* Init port. */ + pjmedia_port_info_init2(pi, &name, SIGNATURE, dir, fmt); + if (dir == PJMEDIA_DIR_DECODING) { + channel->port.get_frame = &get_frame; + } else { + pi->fmt.id = info->codec_param->dec_fmt.id; + channel->port.put_frame = &put_frame; + } + + /* Init port. */ + channel->port.port_data.pdata = stream; + channel->port.get_event_pub = &port_get_epub; + + PJ_LOG(5, (name.ptr, + "%s channel created %dx%d %s%s%.*s %d/%d(~%d)fps", + (dir==PJMEDIA_DIR_ENCODING?"Encoding":"Decoding"), + pi->fmt.det.vid.size.w, pi->fmt.det.vid.size.h, + pjmedia_fourcc_name(pi->fmt.id, fourcc_name), + (dir==PJMEDIA_DIR_ENCODING?"->":"<-"), + info->codec_info.encoding_name.slen, + info->codec_info.encoding_name.ptr, + pi->fmt.det.vid.fps.num, pi->fmt.det.vid.fps.denum, + pi->fmt.det.vid.fps.num/pi->fmt.det.vid.fps.denum)); + + /* Done. */ + *p_channel = channel; + return PJ_SUCCESS; +} + + +/* + * Create stream. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_create( + pjmedia_endpt *endpt, + pj_pool_t *pool, + pjmedia_vid_stream_info *info, + pjmedia_transport *tp, + void *user_data, + pjmedia_vid_stream **p_stream) +{ + enum { M = 32 }; + pj_pool_t *own_pool = NULL; + pjmedia_vid_stream *stream; + unsigned jb_init, jb_max, jb_min_pre, jb_max_pre, len; + int frm_ptime, chunks_per_frm; + pjmedia_video_format_detail *vfd_enc; + char *p; + pj_status_t status; + + if (!pool) { + own_pool = pjmedia_endpt_create_pool( endpt, "vstrm%p", + PJMEDIA_VSTREAM_SIZE, + PJMEDIA_VSTREAM_INC); + PJ_ASSERT_RETURN(own_pool != NULL, PJ_ENOMEM); + pool = own_pool; + } + + /* Allocate stream */ + stream = PJ_POOL_ZALLOC_T(pool, pjmedia_vid_stream); + PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); + stream->own_pool = own_pool; + + /* Get codec manager */ + stream->codec_mgr = pjmedia_vid_codec_mgr_instance(); + PJ_ASSERT_RETURN(stream->codec_mgr, PJMEDIA_CODEC_EFAILED); + + /* Init stream/port name */ + stream->name.ptr = (char*) pj_pool_alloc(pool, M); + stream->name.slen = pj_ansi_snprintf(stream->name.ptr, M, + "vstrm%p", stream); + + /* Create and initialize codec: */ + status = pjmedia_vid_codec_mgr_alloc_codec(stream->codec_mgr, + &info->codec_info, + &stream->codec); + if (status != PJ_SUCCESS) + return status; + + + /* Get codec param: */ + if (!info->codec_param) { + pjmedia_vid_codec_param def_param; + + status = pjmedia_vid_codec_mgr_get_default_param(stream->codec_mgr, + &info->codec_info, + &def_param); + if (status != PJ_SUCCESS) + return status; + + info->codec_param = pjmedia_vid_codec_param_clone(pool, &def_param); + pj_assert(info->codec_param); + } + + vfd_enc = pjmedia_format_get_video_format_detail( + &info->codec_param->enc_fmt, PJ_TRUE); + + /* Init stream: */ + stream->endpt = endpt; + stream->dir = info->dir; + stream->user_data = user_data; + stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL-500 + (pj_rand()%1000)) * + info->codec_info.clock_rate / 1000; + + stream->jb_last_frm = PJMEDIA_JB_NORMAL_FRAME; + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + stream->use_ka = info->use_ka; +#endif + + /* Build random RTCP CNAME. CNAME has user@host format */ + stream->cname.ptr = p = (char*) pj_pool_alloc(pool, 20); + pj_create_random_string(p, 5); + p += 5; + *p++ = '@'; *p++ = 'p'; *p++ = 'j'; + pj_create_random_string(p, 6); + p += 6; + *p++ = '.'; *p++ = 'o'; *p++ = 'r'; *p++ = 'g'; + stream->cname.slen = p - stream->cname.ptr; + + + /* Create mutex to protect jitter buffer: */ + + status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); + if (status != PJ_SUCCESS) + return status; + + /* Init codec param */ + info->codec_param->dir = info->dir; + info->codec_param->enc_mtu = PJMEDIA_MAX_MTU - sizeof(pjmedia_rtp_hdr) - + PJMEDIA_STREAM_RESV_PAYLOAD_LEN; + + /* Init and open the codec. */ + status = pjmedia_vid_codec_init(stream->codec, pool); + if (status != PJ_SUCCESS) + return status; + status = pjmedia_vid_codec_open(stream->codec, info->codec_param); + if (status != PJ_SUCCESS) + return status; + + /* Init event publisher and subscribe to codec events */ + pjmedia_event_publisher_init(&stream->epub, SIGNATURE); + pjmedia_event_subscription_init(&stream->esub_codec, &stream_event_cb, + stream); + pjmedia_event_subscribe(&stream->codec->epub, &stream->esub_codec); + + /* Estimate the maximum frame size */ + stream->frame_size = vfd_enc->size.w * vfd_enc->size.h * 4; + +#if 0 + stream->frame_size = vfd_enc->max_bps/8 * vfd_enc->fps.denum / + vfd_enc->fps.num; + + /* As the maximum frame_size is not represented directly by maximum bps + * (which includes intra and predicted frames), let's increase the + * frame size value for safety. + */ + stream->frame_size <<= 4; +#endif + + /* Validate the frame size */ + if (stream->frame_size == 0 || + stream->frame_size > PJMEDIA_MAX_VIDEO_ENC_FRAME_SIZE) + { + stream->frame_size = PJMEDIA_MAX_VIDEO_ENC_FRAME_SIZE; + } + + /* Get frame length in timestamp unit */ + stream->frame_ts_len = info->codec_info.clock_rate * + vfd_enc->fps.denum / vfd_enc->fps.num; + + /* Create decoder channel */ + status = create_channel( pool, stream, PJMEDIA_DIR_DECODING, + info->rx_pt, info, &stream->dec); + if (status != PJ_SUCCESS) + return status; + + + /* Create encoder channel */ + status = create_channel( pool, stream, PJMEDIA_DIR_ENCODING, + info->tx_pt, info, &stream->enc); + if (status != PJ_SUCCESS) + return status; + + /* Init jitter buffer parameters: */ + frm_ptime = 1000 * vfd_enc->fps.denum / vfd_enc->fps.num; + chunks_per_frm = stream->frame_size / PJMEDIA_MAX_MTU; + + /* JB max count, default 500ms */ + if (info->jb_max >= frm_ptime) + jb_max = info->jb_max * chunks_per_frm / frm_ptime; + else + jb_max = 500 * chunks_per_frm / frm_ptime; + + /* JB min prefetch, default 1 frame */ + if (info->jb_min_pre >= frm_ptime) + jb_min_pre = info->jb_min_pre * chunks_per_frm / frm_ptime; + else + jb_min_pre = 1; + + /* JB max prefetch, default 4/5 JB max count */ + if (info->jb_max_pre >= frm_ptime) + jb_max_pre = info->jb_max_pre * chunks_per_frm / frm_ptime; + else + jb_max_pre = jb_max * 4 / 5; + + /* JB init prefetch, default 0 */ + if (info->jb_init >= frm_ptime) + jb_init = info->jb_init * chunks_per_frm / frm_ptime; + else + jb_init = 0; + + /* Create jitter buffer */ + status = pjmedia_jbuf_create(pool, &stream->dec->port.info.name, + PJMEDIA_MAX_MTU, + 1000 * vfd_enc->fps.denum / vfd_enc->fps.num, + jb_max, &stream->jb); + if (status != PJ_SUCCESS) + return status; + + + /* Set up jitter buffer */ + pjmedia_jbuf_set_adaptive( stream->jb, jb_init, jb_min_pre, jb_max_pre); + //pjmedia_jbuf_enable_discard(stream->jb, PJ_FALSE); + + /* Init RTCP session: */ + { + pjmedia_rtcp_session_setting rtcp_setting; + + pjmedia_rtcp_session_setting_default(&rtcp_setting); + rtcp_setting.name = stream->name.ptr; + rtcp_setting.ssrc = info->ssrc; + rtcp_setting.rtp_ts_base = pj_ntohl(stream->enc->rtp.out_hdr.ts); + rtcp_setting.clock_rate = info->codec_info.clock_rate; + rtcp_setting.samples_per_frame = 1; + + pjmedia_rtcp_init2(&stream->rtcp, &rtcp_setting); + } + + /* Only attach transport when stream is ready. */ + status = pjmedia_transport_attach(tp, stream, &info->rem_addr, + &info->rem_rtcp, + pj_sockaddr_get_len(&info->rem_addr), + &on_rx_rtp, &on_rx_rtcp); + if (status != PJ_SUCCESS) + return status; + + stream->transport = tp; + + /* Send RTCP SDES */ + len = create_rtcp_sdes(stream, (pj_uint8_t*)stream->enc->buf, + stream->enc->buf_size); + if (len != 0) { + pjmedia_transport_send_rtcp(stream->transport, + stream->enc->buf, len); + } + +#if defined(PJMEDIA_STREAM_ENABLE_KA) && PJMEDIA_STREAM_ENABLE_KA!=0 + /* NAT hole punching by sending KA packet via RTP transport. */ + if (stream->use_ka) + send_keep_alive_packet(stream); +#endif + +#if TRACE_JB + { + char trace_name[PJ_MAXPATH]; + pj_ssize_t len; + + pj_ansi_snprintf(trace_name, sizeof(trace_name), + TRACE_JB_PATH_PREFIX "%s.csv", + channel->port.info.name.ptr); + status = pj_file_open(pool, trace_name, PJ_O_RDWR, + &stream->trace_jb_fd); + if (status != PJ_SUCCESS) { + stream->trace_jb_fd = TRACE_JB_INVALID_FD; + PJ_LOG(3,(THIS_FILE, "Failed creating RTP trace file '%s'", + trace_name)); + } else { + stream->trace_jb_buf = (char*)pj_pool_alloc(pool, PJ_LOG_MAX_SIZE); + + /* Print column header */ + len = pj_ansi_snprintf(stream->trace_jb_buf, PJ_LOG_MAX_SIZE, + "Time, Operation, Size, Frame Count, " + "Frame type, RTP Seq, RTP TS, RTP M, " + "JB size, JB burst level, JB prefetch\n"); + pj_file_write(stream->trace_jb_fd, stream->trace_jb_buf, &len); + pj_file_flush(stream->trace_jb_fd); + } + } +#endif + + /* Save the stream info */ + pj_memcpy(&stream->info, info, sizeof(*info)); + stream->info.codec_param = pjmedia_vid_codec_param_clone( + pool, info->codec_param); + + /* Success! */ + *p_stream = stream; + + PJ_LOG(5,(THIS_FILE, "Video stream %s created", stream->name.ptr)); + + return PJ_SUCCESS; +} + + +/* + * Destroy stream. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_destroy( pjmedia_vid_stream *stream ) +{ + unsigned len; + PJ_ASSERT_RETURN(stream != NULL, PJ_EINVAL); + + /* Send RTCP BYE */ + if (stream->enc && stream->transport) { + len = create_rtcp_bye(stream, (pj_uint8_t*)stream->enc->buf, + stream->enc->buf_size); + if (len != 0) { + pjmedia_transport_send_rtcp(stream->transport, + stream->enc->buf, len); + } + } + + /* Detach from transport + * MUST NOT hold stream mutex while detaching from transport, as + * it may cause deadlock. See ticket #460 for the details. + */ + if (stream->transport) { + pjmedia_transport_detach(stream->transport, stream); + stream->transport = NULL; + } + + /* This function may be called when stream is partly initialized. */ + if (stream->jb_mutex) + pj_mutex_lock(stream->jb_mutex); + + + /* Free codec. */ + if (stream->codec) { + pjmedia_vid_codec_close(stream->codec); + pjmedia_vid_codec_mgr_dealloc_codec(stream->codec_mgr, stream->codec); + stream->codec = NULL; + } + + /* Free mutex */ + + if (stream->jb_mutex) { + pj_mutex_destroy(stream->jb_mutex); + stream->jb_mutex = NULL; + } + + /* Destroy jitter buffer */ + if (stream->jb) + pjmedia_jbuf_destroy(stream->jb); + +#if TRACE_JB + if (TRACE_JB_OPENED(stream)) { + pj_file_close(stream->trace_jb_fd); + stream->trace_jb_fd = TRACE_JB_INVALID_FD; + } +#endif + + if (stream->own_pool) { + pj_pool_t *pool = stream->own_pool; + stream->own_pool = NULL; + pj_pool_release(pool); + } + + return PJ_SUCCESS; +} + + +/* + * Get the port interface. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_get_port(pjmedia_vid_stream *stream, + pjmedia_dir dir, + pjmedia_port **p_port ) +{ + PJ_ASSERT_RETURN(dir==PJMEDIA_DIR_ENCODING || dir==PJMEDIA_DIR_DECODING, + PJ_EINVAL); + + if (dir == PJMEDIA_DIR_ENCODING) + *p_port = &stream->enc->port; + else + *p_port = &stream->dec->port; + + return PJ_SUCCESS; +} + + +/* + * Get the transport object + */ +PJ_DEF(pjmedia_transport*) pjmedia_vid_stream_get_transport( + pjmedia_vid_stream *st) +{ + return st->transport; +} + + +/* + * Get stream statistics. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_get_stat( + const pjmedia_vid_stream *stream, + pjmedia_rtcp_stat *stat) +{ + PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); + + pj_memcpy(stat, &stream->rtcp.stat, sizeof(pjmedia_rtcp_stat)); + return PJ_SUCCESS; +} + + +/* + * Reset the stream statistics in the middle of a stream session. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_reset_stat(pjmedia_vid_stream *stream) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + pjmedia_rtcp_init_stat(&stream->rtcp.stat); + + return PJ_SUCCESS; +} + + +/* + * Get jitter buffer state. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_get_stat_jbuf( + const pjmedia_vid_stream *stream, + pjmedia_jb_state *state) +{ + PJ_ASSERT_RETURN(stream && state, PJ_EINVAL); + return pjmedia_jbuf_get_state(stream->jb, state); +} + + +/* + * Get the stream info. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_get_info( + const pjmedia_vid_stream *stream, + pjmedia_vid_stream_info *info) +{ + PJ_ASSERT_RETURN(stream && info, PJ_EINVAL); + pj_memcpy(info, &stream->info, sizeof(*info)); + return PJ_SUCCESS; +} + + +/* + * Start stream. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_start(pjmedia_vid_stream *stream) +{ + + PJ_ASSERT_RETURN(stream && stream->enc && stream->dec, PJ_EINVALIDOP); + + if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { + stream->enc->paused = 0; + //pjmedia_snd_stream_start(stream->enc->snd_stream); + PJ_LOG(4,(stream->enc->port.info.name.ptr, "Encoder stream started")); + } else { + PJ_LOG(4,(stream->enc->port.info.name.ptr, "Encoder stream paused")); + } + + if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { + stream->dec->paused = 0; + //pjmedia_snd_stream_start(stream->dec->snd_stream); + PJ_LOG(4,(stream->dec->port.info.name.ptr, "Decoder stream started")); + } else { + PJ_LOG(4,(stream->dec->port.info.name.ptr, "Decoder stream paused")); + } + + return PJ_SUCCESS; +} + + +/* + * Pause stream. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_pause(pjmedia_vid_stream *stream, + pjmedia_dir dir) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { + stream->enc->paused = 1; + PJ_LOG(4,(stream->enc->port.info.name.ptr, "Encoder stream paused")); + } + + if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { + stream->dec->paused = 1; + + /* Also reset jitter buffer */ + pj_mutex_lock( stream->jb_mutex ); + pjmedia_jbuf_reset(stream->jb); + pj_mutex_unlock( stream->jb_mutex ); + + PJ_LOG(4,(stream->dec->port.info.name.ptr, "Decoder stream paused")); + } + + return PJ_SUCCESS; +} + + +/* + * Resume stream + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_resume(pjmedia_vid_stream *stream, + pjmedia_dir dir) +{ + PJ_ASSERT_RETURN(stream, PJ_EINVAL); + + if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { + stream->enc->paused = 0; + PJ_LOG(4,(stream->enc->port.info.name.ptr, "Encoder stream resumed")); + } + + if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { + stream->dec->paused = 0; + PJ_LOG(4,(stream->dec->port.info.name.ptr, "Decoder stream resumed")); + } + + return PJ_SUCCESS; +} + + +static const pj_str_t ID_VIDEO = { "video", 5}; +static const pj_str_t ID_IN = { "IN", 2 }; +static const pj_str_t ID_IP4 = { "IP4", 3}; +static const pj_str_t ID_IP6 = { "IP6", 3}; +static const pj_str_t ID_RTP_AVP = { "RTP/AVP", 7 }; +static const pj_str_t ID_RTP_SAVP = { "RTP/SAVP", 8 }; +//static const pj_str_t ID_SDP_NAME = { "pjmedia", 7 }; +static const pj_str_t ID_RTPMAP = { "rtpmap", 6 }; + +static const pj_str_t STR_INACTIVE = { "inactive", 8 }; +static const pj_str_t STR_SENDRECV = { "sendrecv", 8 }; +static const pj_str_t STR_SENDONLY = { "sendonly", 8 }; +static const pj_str_t STR_RECVONLY = { "recvonly", 8 }; + + +/* + * Internal function for collecting codec info and param from the SDP media. + */ +static pj_status_t get_video_codec_info_param(pjmedia_vid_stream_info *si, + pj_pool_t *pool, + pjmedia_vid_codec_mgr *mgr, + const pjmedia_sdp_media *local_m, + const pjmedia_sdp_media *rem_m) +{ + unsigned pt = 0; + const pjmedia_vid_codec_info *p_info; + pj_status_t status; + + pt = pj_strtoul(&local_m->desc.fmt[0]); + + /* Get codec info. */ + status = pjmedia_vid_codec_mgr_get_codec_info(mgr, pt, &p_info); + if (status != PJ_SUCCESS) + return status; + + si->codec_info = *p_info; + + /* Get payload type for receiving direction */ + si->rx_pt = pt; + + /* Get payload type for transmitting direction */ + if (pt < 96) { + /* For static payload type, pt's are symetric */ + si->tx_pt = pt; + + } else { + unsigned i; + + /* Determine payload type for outgoing channel, by finding + * dynamic payload type in remote SDP that matches the answer. + */ + si->tx_pt = 0xFFFF; + for (i=0; i<rem_m->desc.fmt_count; ++i) { + if (pjmedia_sdp_neg_fmt_match(NULL, + (pjmedia_sdp_media*)local_m, 0, + (pjmedia_sdp_media*)rem_m, i, 0) == + PJ_SUCCESS) + { + /* Found matched codec. */ + si->tx_pt = pj_strtoul(&rem_m->desc.fmt[i]); + break; + } + } + + if (si->tx_pt == 0xFFFF) + return PJMEDIA_EMISSINGRTPMAP; + } + + + /* Now that we have codec info, get the codec param. */ + si->codec_param = PJ_POOL_ALLOC_T(pool, pjmedia_vid_codec_param); + status = pjmedia_vid_codec_mgr_get_default_param(mgr, + &si->codec_info, + si->codec_param); + + /* Get remote fmtp for our encoder. */ + pjmedia_stream_info_parse_fmtp(pool, rem_m, si->tx_pt, + &si->codec_param->enc_fmtp); + + /* Get local fmtp for our decoder. */ + pjmedia_stream_info_parse_fmtp(pool, local_m, si->rx_pt, + &si->codec_param->dec_fmtp); + + /* When direction is NONE (it means SDP negotiation has failed) we don't + * need to return a failure here, as returning failure will cause + * the whole SDP to be rejected. See ticket #: + * http:// + * + * Thanks Alain Totouom + */ + if (status != PJ_SUCCESS && si->dir != PJMEDIA_DIR_NONE) + return status; + + return PJ_SUCCESS; +} + + + +/* + * Create stream info from SDP media line. + */ +PJ_DEF(pj_status_t) pjmedia_vid_stream_info_from_sdp( + pjmedia_vid_stream_info *si, + pj_pool_t *pool, + pjmedia_endpt *endpt, + const pjmedia_sdp_session *local, + const pjmedia_sdp_session *remote, + unsigned stream_idx) +{ + const pjmedia_sdp_attr *attr; + const pjmedia_sdp_media *local_m; + const pjmedia_sdp_media *rem_m; + const pjmedia_sdp_conn *local_conn; + const pjmedia_sdp_conn *rem_conn; + int rem_af, local_af; + pj_sockaddr local_addr; + pj_status_t status; + + PJ_UNUSED_ARG(endpt); + + /* Validate arguments: */ + PJ_ASSERT_RETURN(pool && si && local && remote, PJ_EINVAL); + PJ_ASSERT_RETURN(stream_idx < local->media_count, PJ_EINVAL); + PJ_ASSERT_RETURN(stream_idx < remote->media_count, PJ_EINVAL); + + /* Keep SDP shortcuts */ + local_m = local->media[stream_idx]; + rem_m = remote->media[stream_idx]; + + local_conn = local_m->conn ? local_m->conn : local->conn; + if (local_conn == NULL) + return PJMEDIA_SDP_EMISSINGCONN; + + rem_conn = rem_m->conn ? rem_m->conn : remote->conn; + if (rem_conn == NULL) + return PJMEDIA_SDP_EMISSINGCONN; + + /* Media type must be video */ + if (pj_stricmp(&local_m->desc.media, &ID_VIDEO) != 0) + return PJMEDIA_EINVALIMEDIATYPE; + + + /* Reset: */ + + pj_bzero(si, sizeof(*si)); + + /* Media type: */ + si->type = PJMEDIA_TYPE_VIDEO; + + /* Transport protocol */ + + /* At this point, transport type must be compatible, + * the transport instance will do more validation later. + */ + status = pjmedia_sdp_transport_cmp(&rem_m->desc.transport, + &local_m->desc.transport); + if (status != PJ_SUCCESS) + return PJMEDIA_SDPNEG_EINVANSTP; + + if (pj_stricmp(&local_m->desc.transport, &ID_RTP_AVP) == 0) { + + si->proto = PJMEDIA_TP_PROTO_RTP_AVP; + + } else if (pj_stricmp(&local_m->desc.transport, &ID_RTP_SAVP) == 0) { + + si->proto = PJMEDIA_TP_PROTO_RTP_SAVP; + + } else { + + si->proto = PJMEDIA_TP_PROTO_UNKNOWN; + return PJ_SUCCESS; + } + + + /* Check address family in remote SDP */ + rem_af = pj_AF_UNSPEC(); + if (pj_stricmp(&rem_conn->net_type, &ID_IN)==0) { + if (pj_stricmp(&rem_conn->addr_type, &ID_IP4)==0) { + rem_af = pj_AF_INET(); + } else if (pj_stricmp(&rem_conn->addr_type, &ID_IP6)==0) { + rem_af = pj_AF_INET6(); + } + } + + if (rem_af==pj_AF_UNSPEC()) { + /* Unsupported address family */ + return PJ_EAFNOTSUP; + } + + /* Set remote address: */ + status = pj_sockaddr_init(rem_af, &si->rem_addr, &rem_conn->addr, + rem_m->desc.port); + if (status != PJ_SUCCESS) { + /* Invalid IP address. */ + return PJMEDIA_EINVALIDIP; + } + + /* Check address family of local info */ + local_af = pj_AF_UNSPEC(); + if (pj_stricmp(&local_conn->net_type, &ID_IN)==0) { + if (pj_stricmp(&local_conn->addr_type, &ID_IP4)==0) { + local_af = pj_AF_INET(); + } else if (pj_stricmp(&local_conn->addr_type, &ID_IP6)==0) { + local_af = pj_AF_INET6(); + } + } + + if (local_af==pj_AF_UNSPEC()) { + /* Unsupported address family */ + return PJ_SUCCESS; + } + + /* Set remote address: */ + status = pj_sockaddr_init(local_af, &local_addr, &local_conn->addr, + local_m->desc.port); + if (status != PJ_SUCCESS) { + /* Invalid IP address. */ + return PJMEDIA_EINVALIDIP; + } + + /* Local and remote address family must match */ + if (local_af != rem_af) + return PJ_EAFNOTSUP; + + /* Media direction: */ + + if (local_m->desc.port == 0 || + pj_sockaddr_has_addr(&local_addr)==PJ_FALSE || + pj_sockaddr_has_addr(&si->rem_addr)==PJ_FALSE || + pjmedia_sdp_media_find_attr(local_m, &STR_INACTIVE, NULL)!=NULL) + { + /* Inactive stream. */ + + si->dir = PJMEDIA_DIR_NONE; + + } else if (pjmedia_sdp_media_find_attr(local_m, &STR_SENDONLY, NULL)!=NULL) { + + /* Send only stream. */ + + si->dir = PJMEDIA_DIR_ENCODING; + + } else if (pjmedia_sdp_media_find_attr(local_m, &STR_RECVONLY, NULL)!=NULL) { + + /* Recv only stream. */ + + si->dir = PJMEDIA_DIR_DECODING; + + } else { + + /* Send and receive stream. */ + + si->dir = PJMEDIA_DIR_ENCODING_DECODING; + + } + + /* No need to do anything else if stream is rejected */ + if (local_m->desc.port == 0) { + return PJ_SUCCESS; + } + + /* If "rtcp" attribute is present in the SDP, set the RTCP address + * from that attribute. Otherwise, calculate from RTP address. + */ + attr = pjmedia_sdp_attr_find2(rem_m->attr_count, rem_m->attr, + "rtcp", NULL); + if (attr) { + pjmedia_sdp_rtcp_attr rtcp; + status = pjmedia_sdp_attr_get_rtcp(attr, &rtcp); + if (status == PJ_SUCCESS) { + if (rtcp.addr.slen) { + status = pj_sockaddr_init(rem_af, &si->rem_rtcp, &rtcp.addr, + (pj_uint16_t)rtcp.port); + } else { + pj_sockaddr_init(rem_af, &si->rem_rtcp, NULL, + (pj_uint16_t)rtcp.port); + pj_memcpy(pj_sockaddr_get_addr(&si->rem_rtcp), + pj_sockaddr_get_addr(&si->rem_addr), + pj_sockaddr_get_addr_len(&si->rem_addr)); + } + } + } + + if (!pj_sockaddr_has_addr(&si->rem_rtcp)) { + int rtcp_port; + + pj_memcpy(&si->rem_rtcp, &si->rem_addr, sizeof(pj_sockaddr)); + rtcp_port = pj_sockaddr_get_port(&si->rem_addr) + 1; + pj_sockaddr_set_port(&si->rem_rtcp, (pj_uint16_t)rtcp_port); + } + + /* Get codec info and param */ + status = get_video_codec_info_param(si, pool, NULL, local_m, rem_m); + + /* Leave SSRC to random. */ + si->ssrc = pj_rand(); + + /* Set default jitter buffer parameter. */ + si->jb_init = si->jb_max = si->jb_min_pre = si->jb_max_pre = -1; + + return status; +} + |