From 30b72e6036e0dd2bbe85e5929dc4201357d258db Mon Sep 17 00:00:00 2001 From: Benny Prijono Date: Mon, 24 Apr 2006 23:13:00 +0000 Subject: Better support for continuing media when peer has restarted transmission/RTP session git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@408 74dad513-b988-da41-8d7b-12977e46ad98 --- pjmedia/include/pjmedia/jbuf.h | 24 ++++- pjmedia/include/pjmedia/rtcp.h | 4 + pjmedia/include/pjmedia/rtp.h | 1 + pjmedia/src/pjmedia/jbuf.c | 106 +++++++++++++------ pjmedia/src/pjmedia/rtcp.c | 93 ++++++++-------- pjmedia/src/pjmedia/rtp.c | 79 +++++++------- pjmedia/src/pjmedia/stream.c | 233 +++++++++++++++++++++++++++-------------- 7 files changed, 333 insertions(+), 207 deletions(-) (limited to 'pjmedia') diff --git a/pjmedia/include/pjmedia/jbuf.h b/pjmedia/include/pjmedia/jbuf.h index 9a0d5c6a..c289e337 100644 --- a/pjmedia/include/pjmedia/jbuf.h +++ b/pjmedia/include/pjmedia/jbuf.h @@ -63,6 +63,8 @@ enum pjmedia_jb_frame_type * memory to keep the frames in the buffer. * * @param pool The pool to allocate memory. + * @param name Name to identify the jitter buffer for logging + * purpose. * @param frame_size The size of each frame that will be kept in the * jitter buffer. The value here normaly corresponds * to the RTP payload size according to the codec @@ -73,11 +75,12 @@ enum pjmedia_jb_frame_type * * @return PJ_SUCCESS on success. */ -PJ_DECL(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, - int frame_size, - int init_delay, - int max_count, - pjmedia_jbuf **p_jb); +PJ_DECL(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, + const pj_str_t *name, + int frame_size, + int init_delay, + int max_count, + pjmedia_jbuf **p_jb); /** * Destroy jitter buffer instance. @@ -89,6 +92,17 @@ PJ_DECL(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, PJ_DECL(pj_status_t) pjmedia_jbuf_destroy(pjmedia_jbuf *jb); +/** + * Restart jitter. This function flushes all packets in the buffer and + * reset the internal sequence number. + * + * @param jb The jitter buffer. + * + * @return PJ_SUCCESS on success. + */ +PJ_DECL(pj_status_t) pjmedia_jbuf_reset(pjmedia_jbuf *jb); + + /** * Put a frame to the jitter buffer. If the frame can be accepted (based * on the sequence number), the jitter buffer will copy the frame and put diff --git a/pjmedia/include/pjmedia/rtcp.h b/pjmedia/include/pjmedia/rtcp.h index 46cf13f2..51efe2e8 100644 --- a/pjmedia/include/pjmedia/rtcp.h +++ b/pjmedia/include/pjmedia/rtcp.h @@ -221,6 +221,7 @@ typedef struct pjmedia_rtcp_stat pjmedia_rtcp_stat; */ struct pjmedia_rtcp_session { + char *name; /**< Name identification. */ pjmedia_rtcp_pkt rtcp_pkt; /**< Cached RTCP packet. */ pjmedia_rtp_seq_session seq_ctrl; /**< RTCP sequence number control. */ @@ -253,11 +254,14 @@ typedef struct pjmedia_rtcp_session pjmedia_rtcp_session; * Initialize RTCP session. * * @param session The session + * @param name Optional name to identify the session (for + * logging purpose). * @param clock_rate Codec clock rate in samples per second. * @param samples_per_frame Average number of samples per frame. * @param ssrc The SSRC used in to identify the session. */ PJ_DECL(void) pjmedia_rtcp_init( pjmedia_rtcp_session *session, + char *name, unsigned clock_rate, unsigned samples_per_frame, pj_uint32_t ssrc ); diff --git a/pjmedia/include/pjmedia/rtp.h b/pjmedia/include/pjmedia/rtp.h index 87a73c81..29dad419 100644 --- a/pjmedia/include/pjmedia/rtp.h +++ b/pjmedia/include/pjmedia/rtp.h @@ -192,6 +192,7 @@ struct pjmedia_rtp_status this packet. More information will be given in other flags. */ int badpt:1; /**< Bad payload type. */ + int badssrc:1; /**< Bad SSRC */ int dup:1; /**< Indicates duplicate packet */ int outorder:1; /**< Indicates out of order packet */ int probation:1;/**< Indicates that session is in probation diff --git a/pjmedia/src/pjmedia/jbuf.c b/pjmedia/src/pjmedia/jbuf.c index 6f8865ba..32f4739b 100644 --- a/pjmedia/src/pjmedia/jbuf.c +++ b/pjmedia/src/pjmedia/jbuf.c @@ -47,6 +47,7 @@ typedef struct jb_framelist jb_framelist; struct pjmedia_jbuf { + pj_str_t name; // jitter buffer name jb_framelist jb_framelist; pj_size_t jb_frame_size; // frame size pj_size_t jb_max_count; // max frames in the jitter framelist->flist_buffer @@ -93,12 +94,11 @@ static pj_status_t jb_framelist_init( pj_pool_t *pool, framelist->flist_frame_size * framelist->flist_max_count); - framelist->flist_frame_type = pj_pool_zalloc(pool, - sizeof(framelist->flist_frame_type[0]) * - framelist->flist_max_count); + framelist->flist_frame_type = + pj_pool_zalloc(pool, sizeof(framelist->flist_frame_type[0]) * + framelist->flist_max_count); framelist->flist_empty = 1; - framelist->flist_head = framelist->flist_tail = framelist->flist_origin = 0; return PJ_SUCCESS; @@ -128,16 +128,20 @@ static pj_bool_t jb_framelist_get(jb_framelist *framelist, { if (!framelist->flist_empty) { pj_memcpy(frame, - framelist->flist_buffer + framelist->flist_head * framelist->flist_frame_size, + framelist->flist_buffer + + framelist->flist_head * framelist->flist_frame_size, framelist->flist_frame_size); - *p_type = (pjmedia_jb_frame_type) framelist->flist_frame_type[framelist->flist_head]; + *p_type = (pjmedia_jb_frame_type) + framelist->flist_frame_type[framelist->flist_head]; - pj_memset(framelist->flist_buffer + framelist->flist_head * framelist->flist_frame_size, + pj_memset(framelist->flist_buffer + + framelist->flist_head * framelist->flist_frame_size, 0, framelist->flist_frame_size); framelist->flist_frame_type[framelist->flist_head] = 0; framelist->flist_origin++; - framelist->flist_head = ++framelist->flist_head % framelist->flist_max_count; + framelist->flist_head = (framelist->flist_head + 1 ) % + framelist->flist_max_count; if (framelist->flist_head == framelist->flist_tail) framelist->flist_empty = PJ_TRUE; @@ -172,7 +176,8 @@ static void jb_framelist_remove_head( jb_framelist *framelist, step2 = 0; } - pj_memset(framelist->flist_buffer + framelist->flist_head * framelist->flist_frame_size, + pj_memset(framelist->flist_buffer + + framelist->flist_head * framelist->flist_frame_size, 0, step1*framelist->flist_frame_size); pj_memset(framelist->flist_frame_type+framelist->flist_head, @@ -190,7 +195,8 @@ static void jb_framelist_remove_head( jb_framelist *framelist, // update pointers framelist->flist_origin += count; - framelist->flist_head = (framelist->flist_head+count) % framelist->flist_max_count; + framelist->flist_head = (framelist->flist_head + count) % + framelist->flist_max_count; if (framelist->flist_head == framelist->flist_tail) framelist->flist_empty = PJ_TRUE; } @@ -204,31 +210,36 @@ static pj_bool_t jb_framelist_put_at(jb_framelist *framelist, { unsigned where; - // too late - if (index < framelist->flist_origin) - return PJ_FALSE; - - // too soon - if ((index > (framelist->flist_origin + framelist->flist_max_count - 1)) && !framelist->flist_empty) - return PJ_FALSE; - assert(frame_size <= framelist->flist_frame_size); if (!framelist->flist_empty) { + unsigned max_index; unsigned cur_size; - where = (index - framelist->flist_origin + framelist->flist_head) % framelist->flist_max_count; + // too late + if (index < framelist->flist_origin) + return PJ_FALSE; + + // too soon + max_index = framelist->flist_origin + framelist->flist_max_count - 1; + if (index > max_index) + return PJ_FALSE; + + where = (index - framelist->flist_origin + framelist->flist_head) % + framelist->flist_max_count; // update framelist->flist_tail pointer cur_size = jb_framelist_size(framelist); if (index >= framelist->flist_origin + cur_size) { unsigned diff = (index - (framelist->flist_origin + cur_size)); - framelist->flist_tail = (framelist->flist_tail + diff + 1) % framelist->flist_max_count; + framelist->flist_tail = (framelist->flist_tail + diff + 1) % + framelist->flist_max_count; } } else { where = framelist->flist_tail; framelist->flist_origin = index; - framelist->flist_tail = (++framelist->flist_tail % framelist->flist_max_count); + framelist->flist_tail = (framelist->flist_tail + 1) % + framelist->flist_max_count; framelist->flist_empty = PJ_FALSE; } @@ -251,6 +262,7 @@ enum pjmedia_jb_op PJ_DEF(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, + const pj_str_t *name, int frame_size, int initial_prefetch, int max_count, @@ -265,6 +277,7 @@ PJ_DEF(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, if (status != PJ_SUCCESS) return status; + pj_strdup_with_null(pool, &jb->name, name); jb->jb_frame_size = frame_size; jb->jb_last_seq_no = -1; jb->jb_level = 0; @@ -283,6 +296,24 @@ PJ_DEF(pj_status_t) pjmedia_jbuf_create(pj_pool_t *pool, } +PJ_DEF(pj_status_t) pjmedia_jbuf_reset(pjmedia_jbuf *jb) +{ + jb->jb_last_seq_no = -1; + jb->jb_level = 0; + jb->jb_last_level = 0; + jb->jb_last_jitter = 0; + jb->jb_last_op = JB_OP_INIT; + jb->jb_prefetch_cnt = 0; + jb->jb_stable_hist = 0; + jb->jb_status = JB_STATUS_INITIALIZING; + jb->jb_max_hist_jitter = 0; + + jb_framelist_remove_head(&jb->jb_framelist, + jb_framelist_size(&jb->jb_framelist)); + return PJ_SUCCESS; +} + + PJ_DEF(pj_status_t) pjmedia_jbuf_destroy(pjmedia_jbuf *jb) { return jb_framelist_destroy(&jb->jb_framelist); @@ -311,29 +342,33 @@ static void jbuf_calculate_jitter(pjmedia_jbuf *jb) jb->jb_max_hist_jitter = 0; if (jb->jb_op_count >= 100 && - (int)jb_framelist_size(&jb->jb_framelist) > jb->jb_prefetch+2) + (int)jb_framelist_size(&jb->jb_framelist) > jb->jb_prefetch+2) { jb_framelist_remove_head(&jb->jb_framelist,1); - PJ_LOG(5,(THIS_FILE, "jbuf prefetch: %d, size=%d", - jb->jb_prefetch, - jb_framelist_size(&jb->jb_framelist))); + PJ_LOG(5,(jb->name.ptr, + "jbuf optimizing, prefetch: %d, size=%d", + jb->jb_prefetch, + jb_framelist_size(&jb->jb_framelist))); jb->jb_op_count = 0; } } } else { - jb->jb_prefetch = PJ_MIN(jb->jb_last_jitter,(int)(jb->jb_max_count*4/5)); + jb->jb_prefetch = PJ_MIN(jb->jb_last_jitter, + (int)(jb->jb_max_count*4/5)); jb->jb_stable_hist = 0; jb->jb_max_hist_jitter = 0; if (jb->jb_op_count >= 100) { - if ((int)jb_framelist_size(&jb->jb_framelist) > jb->jb_prefetch+2) { + if ((int)jb_framelist_size(&jb->jb_framelist) > jb->jb_prefetch+2) + { jb_framelist_remove_head(&jb->jb_framelist,1); - PJ_LOG(5,(THIS_FILE, "jbuf prefetch: %d, size=%d", - jb->jb_prefetch, - jb_framelist_size(&jb->jb_framelist))); + PJ_LOG(5,(jb->name.ptr, + "jbuf optimizing prefetch: %d, size=%d", + jb->jb_prefetch, + jb_framelist_size(&jb->jb_framelist))); } jb->jb_op_count = 0; @@ -377,8 +412,11 @@ PJ_DEF(pj_status_t) pjmedia_jbuf_put_frame(pjmedia_jbuf *jb, min_frame_size = PJ_MIN(frame_size, jb->jb_frame_size); if (seq_diff > 0) { - while (!jb_framelist_put_at(&jb->jb_framelist,frame_seq,frame,min_frame_size)) { - jb_framelist_remove_head(&jb->jb_framelist,PJ_MAX(jb->jb_max_count/4,1)); + while (jb_framelist_put_at(&jb->jb_framelist, + frame_seq,frame,min_frame_size) ==PJ_FALSE) + { + jb_framelist_remove_head(&jb->jb_framelist, + PJ_MAX(jb->jb_max_count/4,1) ); } if (jb->jb_prefetch_cnt < jb->jb_prefetch) @@ -407,7 +445,9 @@ PJ_DEF(pj_status_t) pjmedia_jbuf_get_frame( pjmedia_jbuf *jb, jb->jb_prefetch_cnt = 0; } - if ((jb->jb_prefetch_cnt < jb->jb_prefetch) || !jb_framelist_get(&jb->jb_framelist,frame,&ftype)) { + if ((jb->jb_prefetch_cnt < jb->jb_prefetch) || + jb_framelist_get(&jb->jb_framelist,frame,&ftype) == PJ_FALSE) + { pj_memset(frame, 0, jb->jb_frame_size); *p_frame_type = PJMEDIA_JB_ZERO_FRAME; return PJ_SUCCESS; diff --git a/pjmedia/src/pjmedia/rtcp.c b/pjmedia/src/pjmedia/rtcp.c index 54e8dbd7..8d548ccf 100644 --- a/pjmedia/src/pjmedia/rtcp.c +++ b/pjmedia/src/pjmedia/rtcp.c @@ -109,7 +109,7 @@ PJ_DEF(pj_status_t) pjmedia_rtcp_get_ntp_time(const pjmedia_rtcp_session *sess, if (PJ_TIME_VAL_MSEC(diff) >= MIN_DIFF) { - TRACE_((THIS_FILE, "NTP timestamp corrected by %d ms", + TRACE_((sess->name, "RTCP NTP timestamp corrected by %d ms", PJ_TIME_VAL_MSEC(diff))); @@ -125,6 +125,7 @@ PJ_DEF(pj_status_t) pjmedia_rtcp_get_ntp_time(const pjmedia_rtcp_session *sess, PJ_DEF(void) pjmedia_rtcp_init(pjmedia_rtcp_session *sess, + char *name, unsigned clock_rate, unsigned samples_per_frame, pj_uint32_t ssrc) @@ -134,6 +135,9 @@ PJ_DEF(void) pjmedia_rtcp_init(pjmedia_rtcp_session *sess, pj_memset(rtcp_pkt, 0, sizeof(pjmedia_rtcp_pkt)); + /* Name */ + sess->name = name ? name : THIS_FILE, + /* Set clock rate */ sess->clock_rate = clock_rate; sess->pkt_size = samples_per_frame; @@ -237,47 +241,48 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtp(pjmedia_rtcp_session *sess, /* - * Calculate jitter (see RFC 3550 section A.8) + * Calculate jitter only when sequence is good (see RFC 3550 section A.8) */ - - /* Get arrival time and convert timestamp to samples */ - pj_get_timestamp(&ts); - ts.u64 = ts.u64 * sess->clock_rate / sess->ts_freq.u64; - arrival = ts.u32.lo; + if (seq_st.diff == 1) { + /* Get arrival time and convert timestamp to samples */ + pj_get_timestamp(&ts); + ts.u64 = ts.u64 * sess->clock_rate / sess->ts_freq.u64; + arrival = ts.u32.lo; - transit = arrival - rtp_ts; + transit = arrival - rtp_ts; - /* Ignore the first N packets as they normally have bad jitter - * due to other threads working to establish the call - */ - if (sess->transit == 0 || sess->received < 25 ) { - sess->transit = transit; - sess->stat.rx.jitter.min = 2000; - } else { - pj_int32_t d; - pj_uint32_t jitter; - - d = transit - sess->transit; - sess->transit = transit; - if (d < 0) - d = -d; - - sess->jitter += d - ((sess->jitter + 8) >> 4); - - /* Get jitter in usec */ - if (d < 4294) - jitter = d * 1000000 / sess->clock_rate; - else { - jitter = d * 1000 / sess->clock_rate; - jitter *= 1000; - } + /* Ignore the first N packets as they normally have bad jitter + * due to other threads working to establish the call + */ + if (sess->transit == 0 || sess->received < 25 ) { + sess->transit = transit; + sess->stat.rx.jitter.min = 2000; + } else { + pj_int32_t d; + pj_uint32_t jitter; + + d = transit - sess->transit; + sess->transit = transit; + if (d < 0) + d = -d; + + sess->jitter += d - ((sess->jitter + 8) >> 4); + + /* Get jitter in usec */ + if (d < 4294) + jitter = d * 1000000 / sess->clock_rate; + else { + jitter = d * 1000 / sess->clock_rate; + jitter *= 1000; + } - /* Update jitter stat */ - if (jitter < sess->stat.rx.jitter.min) - sess->stat.rx.jitter.min = jitter; - if (jitter > sess->stat.rx.jitter.max) - sess->stat.rx.jitter.max = jitter; - sess->stat.rx.jitter.last = jitter; + /* Update jitter stat */ + if (jitter < sess->stat.rx.jitter.min) + sess->stat.rx.jitter.min = jitter; + if (jitter > sess->stat.rx.jitter.max) + sess->stat.rx.jitter.max = jitter; + sess->stat.rx.jitter.last = jitter; + } } } @@ -307,7 +312,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *sess, /* Calculate SR arrival time for DLSR */ pj_get_timestamp(&sess->rx_lsr_time); - TRACE_((THIS_FILE, "Rx RTCP SR: ntp_ts=%p", + TRACE_((sess->name, "Rx RTCP SR: ntp_ts=%p", sess->rx_lsr, (pj_uint32_t)(sess->rx_lsr_time.u64*65536/sess->ts_freq.u64))); @@ -404,7 +409,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *sess, eedelay *= 1000; } - TRACE_((THIS_FILE, "Rx RTCP RR: lsr=%p, dlsr=%p (%d:%03dms), " + TRACE_((sess->name, "Rx RTCP RR: lsr=%p, dlsr=%p (%d:%03dms), " "now=%p, rtt=%p", lsr, dlsr, dlsr/65536, (dlsr%65536)*1000/65536, now, (pj_uint32_t)eedelay)); @@ -415,7 +420,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *sess, if (now-dlsr >= lsr) { unsigned rtt = (pj_uint32_t)eedelay; - TRACE_((THIS_FILE, "RTT is set to %d usec", rtt)); + TRACE_((sess->name, "RTCP RTT is set to %d usec", rtt)); if (rtt >= 1000000) { pjmedia_rtcp_ntp_rec ntp2; @@ -440,7 +445,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *sess, sess->stat.rtt_update_cnt++; } else { - PJ_LOG(5, (THIS_FILE, "Internal NTP clock skew detected: " + PJ_LOG(5, (sess->name, "Internal RTCP NTP clock skew detected: " "lsr=%p, now=%p, dlsr=%p (%d:%03dms), " "diff=%d", lsr, now, dlsr, dlsr/65536, @@ -525,7 +530,7 @@ PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *sess, rtcp_pkt->sr.ntp_sec = pj_htonl(ntp.hi); rtcp_pkt->sr.ntp_frac = pj_htonl(ntp.lo); - TRACE_((THIS_FILE, "TX RTCP SR: ntp_ts=%p", + TRACE_((sess->name, "TX RTCP SR: ntp_ts=%p", ((ntp.hi & 0xFFFF) << 16) + ((ntp.lo & 0xFFFF0000) >> 16))); @@ -558,7 +563,7 @@ PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *sess, dlsr = (pj_uint32_t)(ts.u64 - lsr_time); rtcp_pkt->rr.dlsr = pj_htonl(dlsr); - TRACE_((THIS_FILE, "Tx RTCP RR: lsr=%p, lsr_time=%p, now=%p, dlsr=%p" + TRACE_((sess->name,"Tx RTCP RR: lsr=%p, lsr_time=%p, now=%p, dlsr=%p" "(%ds:%03dms)", lsr, (pj_uint32_t)lsr_time, diff --git a/pjmedia/src/pjmedia/rtp.c b/pjmedia/src/pjmedia/rtp.c index 5103c5ac..4539ee59 100644 --- a/pjmedia/src/pjmedia/rtp.c +++ b/pjmedia/src/pjmedia/rtp.c @@ -19,9 +19,9 @@ #include #include #include -#include /* pj_gettimeofday() */ #include /* pj_htonx, pj_htonx */ #include +#include #include @@ -39,9 +39,11 @@ static void pjmedia_rtp_seq_restart(pjmedia_rtp_seq_session *seq_ctrl, PJ_DEF(pj_status_t) pjmedia_rtp_session_init( pjmedia_rtp_session *ses, - int default_pt, pj_uint32_t sender_ssrc ) + int default_pt, + pj_uint32_t sender_ssrc ) { - PJ_LOG(5, (THIS_FILE, "pjmedia_rtp_session_init: ses=%p, default_pt=%d, ssrc=0x%x", + PJ_LOG(5, (THIS_FILE, + "pjmedia_rtp_session_init: ses=%p, default_pt=%d, ssrc=0x%x", ses, default_pt, sender_ssrc)); /* Check RTP header packing. */ @@ -50,21 +52,22 @@ PJ_DEF(pj_status_t) pjmedia_rtp_session_init( pjmedia_rtp_session *ses, return PJMEDIA_RTP_EINPACK; } - /* If sender_ssrc is not specified, create from time value. */ + /* If sender_ssrc is not specified, create from random value. */ if (sender_ssrc == 0 || sender_ssrc == (pj_uint32_t)-1) { - pj_time_val tv; - - pj_gettimeofday(&tv); - sender_ssrc = (pj_uint32_t) pj_htonl(tv.sec); + sender_ssrc = pj_htonl(pj_rand()); } else { sender_ssrc = pj_htonl(sender_ssrc); } - /* Initialize session. */ - ses->out_extseq = 0; + /* Initialize session. + * Initial sequence number SHOULD be random, according to RFC 3550. + */ + ses->out_extseq = pj_rand(); ses->peer_ssrc = 0; - /* Sequence number will be initialized when the first RTP packet is receieved. */ + /* Sequence number will be initialized when the first RTP packet + * is receieved. + */ /* Build default header for outgoing RTP packet. */ pj_memset(ses, 0, sizeof(*ses)); @@ -85,15 +88,12 @@ PJ_DEF(pj_status_t) pjmedia_rtp_session_init( pjmedia_rtp_session *ses, } -PJ_DEF(pj_status_t) pjmedia_rtp_encode_rtp( pjmedia_rtp_session *ses, int pt, int m, - int payload_len, int ts_len, - const void **rtphdr, int *hdrlen ) +PJ_DEF(pj_status_t) pjmedia_rtp_encode_rtp( pjmedia_rtp_session *ses, + int pt, int m, + int payload_len, int ts_len, + const void **rtphdr, int *hdrlen ) { - PJ_UNUSED_ARG(payload_len) - - PJ_LOG(6, (THIS_FILE, - "pjmedia_rtp_encode_rtp: ses=%p, pt=%d, m=%d, pt_len=%d, ts_len=%d", - ses, pt, m, payload_len, ts_len)); + PJ_UNUSED_ARG(payload_len); /* Update timestamp */ ses->out_hdr.ts = pj_htonl(pj_ntohl(ses->out_hdr.ts)+ts_len); @@ -121,25 +121,20 @@ PJ_DEF(pj_status_t) pjmedia_rtp_encode_rtp( pjmedia_rtp_session *ses, int pt, in PJ_DEF(pj_status_t) pjmedia_rtp_decode_rtp( pjmedia_rtp_session *ses, - const void *pkt, int pkt_len, - const pjmedia_rtp_hdr **hdr, - const void **payload, - unsigned *payloadlen) + const void *pkt, int pkt_len, + const pjmedia_rtp_hdr **hdr, + const void **payload, + unsigned *payloadlen) { int offset; - PJ_UNUSED_ARG(ses) - - PJ_LOG(6, (THIS_FILE, - "pjmedia_rtp_decode_rtp: ses=%p, pkt=%p, pkt_len=%d", - ses, pkt, pkt_len)); + PJ_UNUSED_ARG(ses); /* Assume RTP header at the start of packet. We'll verify this later. */ *hdr = (pjmedia_rtp_hdr*)pkt; /* Check RTP header sanity. */ if ((*hdr)->v != RTP_VERSION) { - PJ_LOG(4, (THIS_FILE, " invalid RTP version!")); return PJMEDIA_RTP_EINVER; } @@ -148,7 +143,8 @@ PJ_DEF(pj_status_t) pjmedia_rtp_decode_rtp( pjmedia_rtp_session *ses, /* Adjust offset if RTP extension is used. */ if ((*hdr)->x) { - pjmedia_rtp_ext_hdr *ext = (pjmedia_rtp_ext_hdr*) (((pj_uint8_t*)pkt) + offset); + pjmedia_rtp_ext_hdr *ext = (pjmedia_rtp_ext_hdr*) + (((pj_uint8_t*)pkt) + offset); offset += (pj_ntohs(ext->length) * sizeof(pj_uint32_t)); } @@ -170,27 +166,22 @@ PJ_DEF(void) pjmedia_rtp_session_update( pjmedia_rtp_session *ses, { pjmedia_rtp_status seq_st; - /* Check SSRC. */ - if (ses->peer_ssrc == 0) ses->peer_ssrc = pj_ntohl(hdr->ssrc); - /* - if (pj_ntohl(ses->peer_ssrc) != hdr->ssrc) { - PJ_LOG(4, (THIS_FILE, "pjmedia_rtp_session_update: ses=%p, invalid ssrc 0x%p (!=0x%p)", - ses, pj_ntohl(hdr->ssrc), ses->peer_ssrc)); - return PJMEDIA_RTP_EINSSRC; - } - */ - /* Init status */ seq_st.status.value = 0; seq_st.diff = 0; + /* Check SSRC. */ + if (ses->peer_ssrc == 0) ses->peer_ssrc = pj_ntohl(hdr->ssrc); + + if (pj_ntohl(hdr->ssrc) != ses->peer_ssrc) { + seq_st.status.flag.badssrc = 1; + ses->peer_ssrc = pj_ntohl(hdr->ssrc); + } + /* Check payload type. */ if (hdr->pt != ses->out_pt) { - PJ_LOG(4, (THIS_FILE, - "pjmedia_rtp_session_update: ses=%p, invalid payload " - "type %d (expecting %d)", - ses, hdr->pt, ses->out_pt)); if (p_seq_st) { + p_seq_st->status.value = seq_st.status.value; p_seq_st->status.flag.bad = 1; p_seq_st->status.flag.badpt = 1; } diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c index f63d2abb..fb0bfb5a 100644 --- a/pjmedia/src/pjmedia/stream.c +++ b/pjmedia/src/pjmedia/stream.c @@ -37,8 +37,8 @@ #define THIS_FILE "stream.c" #define ERRLEVEL 1 -#define TRACE_(expr) stream_perror expr -#define TRC_(expr) PJ_LOG(4,expr) +#define LOGERR_(expr) stream_perror expr +#define TRC_(expr) PJ_LOG(5,expr) /** * Media channel. @@ -106,7 +106,8 @@ struct pjmedia_stream pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */ pj_size_t rtcp_pkt_size; /**< Size of RTCP packet buf. */ char rtcp_pkt[512]; /**< RTCP packet buffer. */ - pj_uint32_t rtcp_tx_time; /**< RTCP tx time in timestamp */ + pj_uint32_t rtcp_last_tx; /**< RTCP tx time in timestamp */ + pj_uint32_t rtcp_interval; /**< Interval, in timestamp. */ int rtcp_addrlen; /**< Address length. */ /* RFC 2833 DTMF transmission queue: */ @@ -183,7 +184,7 @@ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame) status = stream->codec->op->decode( stream->codec, &frame_in, channel->pcm_buf_size, &frame_out); if (status != 0) { - TRACE_((THIS_FILE, "codec decode() error", status)); + LOGERR_((port->info.name.ptr, "codec decode() error", status)); frame->type = PJMEDIA_FRAME_TYPE_NONE; return PJ_SUCCESS; @@ -191,7 +192,8 @@ static pj_status_t get_frame( pjmedia_port *port, pjmedia_frame *frame) /* Put in sound buffer. */ if (frame_out.size > frame->size) { - PJ_LOG(4,(THIS_FILE, "Sound playout buffer truncated %d bytes", + PJ_LOG(4,(port->info.name.ptr, + "Sound playout buffer truncated %d bytes", frame_out.size - frame->size)); frame_out.size = frame->size; } @@ -239,11 +241,12 @@ static void create_dtmf_payload(pjmedia_stream *stream, pj_mutex_unlock(stream->jb_mutex); if (stream->tx_dtmf_count) - PJ_LOG(5,(THIS_FILE,"Sending DTMF digit id %c", + PJ_LOG(5,(stream->port.info.name.ptr, + "Sending DTMF digit id %c", digitmap[stream->tx_dtmf_buf[0].event])); } else if (duration == 0) { - PJ_LOG(5,(THIS_FILE,"Sending DTMF digit id %c", + PJ_LOG(5,(stream->port.info.name.ptr, "Sending DTMF digit id %c", digitmap[digit->event])); } @@ -251,12 +254,59 @@ static void create_dtmf_payload(pjmedia_stream *stream, frame_out->size = 4; } + /** - * rec_callback() + * check_tx_rtcp() * - * This callback is called when the mic device has gathered - * enough audio samples. We will encode the audio samples and - * send it to remote. + * This function is can be called by either put_frame() or get_frame(), + * to transmit periodic RTCP SR/RR report. + */ +static void check_tx_rtcp(pjmedia_stream *stream, pj_uint32_t timestamp) +{ + /* Note that timestamp may represent local or remote timestamp, + * depending on whether this function is called from put_frame() + * or get_frame(). + */ + + + if (stream->rtcp_last_tx == 0) { + + stream->rtcp_last_tx = timestamp; + + } else if (timestamp - stream->rtcp_last_tx >= stream->rtcp_interval) { + + pjmedia_rtcp_pkt *rtcp_pkt; + pj_ssize_t size; + int len; + pj_status_t status; + + pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); + size = len; + status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, + &stream->rem_rtcp_addr, + sizeof(stream->rem_rtcp_addr)); +#if 0 + if (status != PJ_SUCCESS) { + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(port->info.name.ptr, "Error sending RTCP: %s [%d]", + errmsg, status)); + } +#endif + + stream->rtcp_last_tx = timestamp; + } + +} + + +/** + * put_frame() + * + * This callback is called by upstream component when it has PCM frame + * to transmit. This function encodes the PCM frame, pack it into + * RTP packet, and transmit to peer. */ static pj_status_t put_frame( pjmedia_port *port, const pjmedia_frame *frame ) @@ -304,7 +354,8 @@ static pj_status_t put_frame( pjmedia_port *port, max_size, &frame_out); if (status != 0) { - TRACE_((THIS_FILE, "Codec encode() error", status)); + LOGERR_((stream->port.info.name.ptr, + "Codec encode() error", status)); return status; } @@ -329,44 +380,17 @@ static pj_status_t put_frame( pjmedia_port *port, } if (status != PJ_SUCCESS) { - TRACE_((THIS_FILE, "RTP encode_rtp() error", status)); + LOGERR_((stream->port.info.name.ptr, + "RTP encode_rtp() error", status)); return status; } - /* Check if this is the time to transmit RTCP packet */ - if (stream->rtcp_tx_time == 0) { - unsigned first_interval; - - first_interval = PJMEDIA_RTCP_INTERVAL + (pj_rand() % 2000); - stream->rtcp_tx_time = pj_ntohl(channel->rtp.out_hdr.ts) + - first_interval* stream->port.info.sample_rate / - 1000; - } else if (pj_ntohl(channel->rtp.out_hdr.ts) >= stream->rtcp_tx_time) { - - pjmedia_rtcp_pkt *rtcp_pkt; - pj_ssize_t size; - unsigned interval; - int len; - - pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); - size = len; - status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, - &stream->rem_rtcp_addr, - sizeof(stream->rem_rtcp_addr)); -#if 0 - if (status != PJ_SUCCESS) { - char errmsg[PJ_ERR_MSG_SIZE]; - - pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(THIS_FILE, "Error sending RTCP: %s [%d]", - errmsg, status)); - } -#endif - - interval = PJMEDIA_RTCP_INTERVAL + (pj_rand() % 500); - stream->rtcp_tx_time = pj_ntohl(channel->rtp.out_hdr.ts) + - interval * stream->port.info.sample_rate / - 1000; + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is not "decoding only", because + * when it is, check_tx_rtcp() will be handled by get_frame(). + */ + if (stream->dir != PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(channel->rtp.out_hdr.ts)); } /* Do nothing if we have nothing to transmit */ @@ -449,15 +473,16 @@ static void handle_incoming_dtmf( pjmedia_stream *stream, /* Ignore unknown event. */ if (event->event > 15) { - PJ_LOG(5,(THIS_FILE, "Ignored RTP pkt with bad DTMF event %d", - event->event)); + PJ_LOG(5,(stream->port.info.name.ptr, + "Ignored RTP pkt with bad DTMF event %d", + event->event)); return; } /* New event! */ - PJ_LOG(5,(THIS_FILE, "Received DTMF digit %c, vol=%d", - digitmap[event->event], - (event->e_vol & 0x3F))); + PJ_LOG(5,(stream->port.info.name.ptr, "Received DTMF digit %c, vol=%d", + digitmap[event->event], + (event->e_vol & 0x3F))); stream->last_dtmf = event->event; stream->last_dtmf_dur = pj_ntohs(event->duration); @@ -516,7 +541,7 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, channel->in_pkt, bytes_read, &hdr, &payload, &payloadlen); if (status != PJ_SUCCESS) { - TRACE_((THIS_FILE, "RTP decode error", status)); + LOGERR_((stream->port.info.name.ptr, "RTP decode error", status)); goto read_next_packet; } @@ -536,18 +561,28 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, * the incoming packet. */ pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); - if (seq_st.status.flag.bad) { - TRC_ ((THIS_FILE, - "RTP session_update error: badpt=%d, dup=%d, outorder=%d, " - "probation=%d, restart=%d", + if (seq_st.status.value) { + TRC_ ((stream->port.info.name.ptr, + "RTP status: badpt=%d, badssrc=%d, dup=%d, " + "outorder=%d, probation=%d, restart=%d", seq_st.status.flag.badpt, + seq_st.status.flag.badssrc, seq_st.status.flag.dup, seq_st.status.flag.outorder, seq_st.status.flag.probation, seq_st.status.flag.restart)); - goto read_next_packet; + + if (seq_st.status.flag.badpt) { + PJ_LOG(4,(stream->port.info.name.ptr, + "Bad RTP pt %d (expecting %d)", + hdr->pt, channel->rtp.out_pt)); + } } + /* Skip bad RTP packet */ + if (seq_st.status.flag.bad) + goto read_next_packet; + /* See if source address of RTP packet is different than the * configured address. @@ -563,24 +598,45 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, stream->rem_rtp_addr = stream->rtp_src_addr; stream->rtp_src_cnt = 0; - PJ_LOG(4,(THIS_FILE,"Remote RTP address switched to %s:%d", + PJ_LOG(4,(stream->port.info.name.ptr, + "Remote RTP address switched to %s:%d", pj_inet_ntoa(stream->rtp_src_addr.sin_addr), pj_ntohs(stream->rtp_src_addr.sin_port))); } } - /* Put to jitter buffer. */ + + /* Put "good" packet to jitter buffer, or reset the jitter buffer + * when RTP session is restarted. + */ pj_mutex_lock( stream->jb_mutex ); - status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, - pj_ntohs(hdr->seq)); + if (seq_st.status.flag.restart) { + status = pjmedia_jbuf_reset(stream->jb); + PJ_LOG(4,(stream->port.info.name.ptr, "Jitter buffer reset")); + + } else { + status = pjmedia_jbuf_put_frame(stream->jb, payload, payloadlen, + pj_ntohs(hdr->seq)); + } pj_mutex_unlock( stream->jb_mutex ); + + /* Check if now is the time to transmit RTCP SR/RR report. + * We only do this when stream direction is "decoding only", + * because otherwise check_tx_rtcp() will be handled by put_frame() + */ + if (stream->dir == PJMEDIA_DIR_DECODING) { + check_tx_rtcp(stream, pj_ntohl(hdr->ts)); + } + if (status != 0) { - TRACE_((THIS_FILE, "Jitter buffer put() error", status)); + LOGERR_((stream->port.info.name.ptr, "Jitter buffer put() error", + status)); goto read_next_packet; } + read_next_packet: bytes_read = channel->in_pkt_size; stream->rtp_addrlen = sizeof(stream->rtp_src_addr); @@ -602,9 +658,10 @@ read_next_packet: char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(THIS_FILE, "Error reading RTP packet: %s [status=%d]. " - "RTP stream thread quitting!", - errmsg, status)); + PJ_LOG(4,(stream->port.info.name.ptr, + "Error reading RTP packet: %s [status=%d]. " + "RTP stream thread quitting!", + errmsg, status)); } } @@ -643,8 +700,9 @@ static void on_rx_rtcp( pj_ioqueue_key_t *key, char errmsg[PJ_ERR_MSG_SIZE]; pj_strerror(status, errmsg, sizeof(errmsg)); - PJ_LOG(4,(THIS_FILE, "Error reading RTCP packet: %s [status=%d]", - errmsg, status)); + PJ_LOG(4,(stream->port.info.name.ptr, + "Error reading RTCP packet: %s [status=%d]", + errmsg, status)); } } @@ -742,8 +800,13 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, stream = pj_pool_zalloc(pool, sizeof(pjmedia_stream)); PJ_ASSERT_RETURN(stream != NULL, PJ_ENOMEM); + /* Init stream/port name */ + stream->port.info.name.ptr = pj_pool_alloc(pool, 24); + pj_ansi_sprintf(stream->port.info.name.ptr, + "strm%p", stream); + stream->port.info.name.slen = pj_ansi_strlen(stream->port.info.name.ptr); + /* Init port. */ - stream->port.info.name = pj_str("stream"); stream->port.info.signature = ('S'<<3 | 'T'<<2 | 'R'<<1 | 'M'); stream->port.info.type = PJMEDIA_TYPE_AUDIO; stream->port.info.has_info = 1; @@ -767,10 +830,14 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1); stream->rem_rtcp_addr = stream->rem_rtp_addr; stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port); - stream->tx_event_pt = info->tx_event_pt; - stream->rx_event_pt = info->rx_event_pt; + stream->rtcp_interval = (PJMEDIA_RTCP_INTERVAL + (pj_rand() % 8000)) * + info->fmt.sample_rate / 1000; + + stream->tx_event_pt = info->tx_event_pt ? info->tx_event_pt : -1; + stream->rx_event_pt = info->rx_event_pt ? info->rx_event_pt : -1; stream->last_dtmf = -1; + /* Create mutex to protect jitter buffer: */ status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); @@ -812,14 +879,16 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, /* Init RTCP session: */ - pjmedia_rtcp_init(&stream->rtcp, info->fmt.sample_rate, + pjmedia_rtcp_init(&stream->rtcp, stream->port.info.name.ptr, + info->fmt.sample_rate, stream->port.info.samples_per_frame, info->ssrc); /* Create jitter buffer: */ - status = pjmedia_jbuf_create(pool, stream->frame_size, 15, 100, + status = pjmedia_jbuf_create(pool, &stream->port.info.name, + stream->frame_size, 15, 100, &stream->jb); if (status != PJ_SUCCESS) goto err_cleanup; @@ -882,6 +951,8 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, /* Success! */ *p_stream = stream; + + PJ_LOG(5,(THIS_FILE, "Stream %s created", stream->port.info.name.ptr)); return PJ_SUCCESS; @@ -957,17 +1028,17 @@ PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) if (stream->enc && (stream->dir & PJMEDIA_DIR_ENCODING)) { stream->enc->paused = 0; //pjmedia_snd_stream_start(stream->enc->snd_stream); - PJ_LOG(4,(THIS_FILE, "Encoder stream started")); + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream started")); } else { - PJ_LOG(4,(THIS_FILE, "Encoder stream paused")); + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); } if (stream->dec && (stream->dir & PJMEDIA_DIR_DECODING)) { stream->dec->paused = 0; //pjmedia_snd_stream_start(stream->dec->snd_stream); - PJ_LOG(4,(THIS_FILE, "Decoder stream started")); + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream started")); } else { - PJ_LOG(4,(THIS_FILE, "Decoder stream paused")); + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); } return PJ_SUCCESS; @@ -997,12 +1068,12 @@ PJ_DEF(pj_status_t) pjmedia_stream_pause( pjmedia_stream *stream, if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { stream->enc->paused = 1; - PJ_LOG(4,(THIS_FILE, "Encoder stream paused")); + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream paused")); } if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { stream->dec->paused = 1; - PJ_LOG(4,(THIS_FILE, "Decoder stream paused")); + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream paused")); } return PJ_SUCCESS; @@ -1019,12 +1090,12 @@ PJ_DEF(pj_status_t) pjmedia_stream_resume( pjmedia_stream *stream, if ((dir & PJMEDIA_DIR_ENCODING) && stream->enc) { stream->enc->paused = 1; - PJ_LOG(4,(THIS_FILE, "Encoder stream resumed")); + PJ_LOG(4,(stream->port.info.name.ptr, "Encoder stream resumed")); } if ((dir & PJMEDIA_DIR_DECODING) && stream->dec) { stream->dec->paused = 1; - PJ_LOG(4,(THIS_FILE, "Decoder stream resumed")); + PJ_LOG(4,(stream->port.info.name.ptr, "Decoder stream resumed")); } return PJ_SUCCESS; -- cgit v1.2.3