diff options
author | Benny Prijono <bennylp@teluu.com> | 2006-04-06 19:29:03 +0000 |
---|---|---|
committer | Benny Prijono <bennylp@teluu.com> | 2006-04-06 19:29:03 +0000 |
commit | 2c1984946280641641349515c08e4970afdf9eff (patch) | |
tree | 1002e70bb23bb37f00ca4179817c371cb9a0c6fa /pjmedia/src | |
parent | 303d33a1dbf4bc43a62720c2e673591916325e33 (diff) |
Integrate (stream) quality monitoring into RTCP framework, and update all RTCP clients accordingly
git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@390 74dad513-b988-da41-8d7b-12977e46ad98
Diffstat (limited to 'pjmedia/src')
-rw-r--r-- | pjmedia/src/pjmedia/rtcp.c | 315 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/rtp.c | 135 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/session.c | 2 | ||||
-rw-r--r-- | pjmedia/src/pjmedia/stream.c | 142 |
4 files changed, 417 insertions, 177 deletions
diff --git a/pjmedia/src/pjmedia/rtcp.c b/pjmedia/src/pjmedia/rtcp.c index 6fc3a752..8755af52 100644 --- a/pjmedia/src/pjmedia/rtcp.c +++ b/pjmedia/src/pjmedia/rtcp.c @@ -43,7 +43,7 @@ /* * Get NTP time. */ -static void rtcp_get_ntp_time(const pjmedia_rtcp_session *s, +static void rtcp_get_ntp_time(const pjmedia_rtcp_session *sess, struct pjmedia_rtcp_ntp_rec *ntp) { pj_time_val tv; @@ -56,28 +56,30 @@ static void rtcp_get_ntp_time(const pjmedia_rtcp_session *s, ntp->hi = tv.sec; /* Calculate second fractions */ - ts.u64 %= s->ts_freq.u64; - ts.u64 = (ts.u64 << 32) / s->ts_freq.u64; + ts.u64 %= sess->ts_freq.u64; + ts.u64 = (ts.u64 << 32) / sess->ts_freq.u64; /* Fill up the low 32bit part */ ntp->lo = ts.u32.lo; } -PJ_DEF(void) pjmedia_rtcp_init(pjmedia_rtcp_session *s, +PJ_DEF(void) pjmedia_rtcp_init(pjmedia_rtcp_session *sess, unsigned clock_rate, + unsigned samples_per_frame, pj_uint32_t ssrc) { - pjmedia_rtcp_pkt *rtcp_pkt = &s->rtcp_pkt; + pjmedia_rtcp_pkt *rtcp_pkt = &sess->rtcp_pkt; pj_memset(rtcp_pkt, 0, sizeof(pjmedia_rtcp_pkt)); /* Set clock rate */ - s->clock_rate = clock_rate; + sess->clock_rate = clock_rate; + sess->pkt_size = samples_per_frame; /* Init time */ - s->rx_lsr = 0; - s->rx_lsr_time.u64 = 0; + sess->rx_lsr = 0; + sess->rx_lsr_time.u64 = 0; /* Init common RTCP header */ rtcp_pkt->common.version = 2; @@ -89,49 +91,62 @@ PJ_DEF(void) pjmedia_rtcp_init(pjmedia_rtcp_session *s, rtcp_pkt->sr.ssrc = pj_htonl(ssrc); /* Get timestamp frequency */ - pj_get_timestamp_freq(&s->ts_freq); + pj_get_timestamp_freq(&sess->ts_freq); /* RR will be initialized on receipt of the first RTP packet. */ } -PJ_DEF(void) pjmedia_rtcp_fini(pjmedia_rtcp_session *session) +PJ_DEF(void) pjmedia_rtcp_fini(pjmedia_rtcp_session *sess) { /* Nothing to do. */ - PJ_UNUSED_ARG(session); + PJ_UNUSED_ARG(sess); } -static void rtcp_init_seq(pjmedia_rtcp_session *s, pj_uint16_t seq) +static void rtcp_init_seq(pjmedia_rtcp_session *sess) { - s->received = 0; - s->exp_prior = 0; - s->rx_prior = 0; - s->transit = 0; - s->jitter = 0; - - pjmedia_rtp_seq_restart(&s->seq_ctrl, seq); + sess->received = 0; + sess->exp_prior = 0; + sess->rx_prior = 0; + sess->transit = 0; + sess->jitter = 0; } -PJ_DEF(void) pjmedia_rtcp_rx_rtp(pjmedia_rtcp_session *s, - pj_uint16_t seq, - pj_uint32_t rtp_ts) +PJ_DEF(void) pjmedia_rtcp_rx_rtp(pjmedia_rtcp_session *sess, + unsigned seq, + unsigned rtp_ts, + unsigned payload) { pj_timestamp ts; pj_uint32_t arrival; pj_int32_t transit; - int status; + pjmedia_rtp_status seq_st; + unsigned last_seq; + + sess->stat.rx.pkt++; + sess->stat.rx.bytes += payload; - /* Update sequence numbers (received, lost, etc). */ - status = pjmedia_rtp_seq_update(&s->seq_ctrl, seq); - if (status == PJMEDIA_RTP_ESESSRESTART) { - rtcp_init_seq(s, seq); - status = 0; + /* Update sequence numbers. */ + last_seq = sess->seq_ctrl.max_seq; + pjmedia_rtp_seq_update(&sess->seq_ctrl, (pj_uint16_t)seq, &seq_st); + if (seq_st.status.flag.restart) { + rtcp_init_seq(sess); } - if (status != 0) + if (seq_st.status.flag.dup) + sess->stat.rx.dup++; + if (seq_st.status.flag.outorder) + sess->stat.rx.reorder++; + + if (seq_st.status.flag.bad) { + sess->stat.rx.discard++; return; + } + + + /* Only mark "good" packets */ + ++sess->received; - ++s->received; /* * Calculate jitter (see RFC 3550 section A.8) @@ -139,41 +154,55 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtp(pjmedia_rtcp_session *s, /* Get arrival time and convert timestamp to samples */ pj_get_timestamp(&ts); - ts.u64 = ts.u64 * s->clock_rate / s->ts_freq.u64; + ts.u64 = ts.u64 * sess->clock_rate / sess->ts_freq.u64; arrival = ts.u32.lo; transit = arrival - rtp_ts; - if (s->transit == 0) { - s->transit = transit; + /* Ignore the first N packets as they normally have bad jitter + * due to other threads working to establish the call + */ + if (sess->transit == 0 || sess->received < 25 ) { + sess->transit = transit; + sess->stat.rx.jitter.min = 2000; } else { pj_int32_t d; + pj_uint32_t jitter; - d = transit - s->transit; - s->transit = transit; + d = transit - sess->transit; + sess->transit = transit; if (d < 0) d = -d; - s->jitter += d - ((s->jitter + 8) >> 4); + sess->jitter += d - ((sess->jitter + 8) >> 4); + + /* Get jitter in usec */ + if (d < 4294) + jitter = d * 1000000 / sess->clock_rate; + else { + jitter = d * 1000 / sess->clock_rate; + jitter *= 1000; + } + + /* Update jitter stat */ + if (jitter < sess->stat.rx.jitter.min) + sess->stat.rx.jitter.min = jitter; + if (jitter > sess->stat.rx.jitter.max) + sess->stat.rx.jitter.max = jitter; + sess->stat.rx.jitter.last = jitter; } } -PJ_DEF(void) pjmedia_rtcp_tx_rtp(pjmedia_rtcp_session *s, - pj_uint16_t bytes_payload_size) +PJ_DEF(void) pjmedia_rtcp_tx_rtp(pjmedia_rtcp_session *sess, + unsigned bytes_payload_size) { - pjmedia_rtcp_pkt *rtcp_pkt = &s->rtcp_pkt; - - /* Update number of packets */ - rtcp_pkt->sr.sender_pcount = - pj_htonl( pj_ntohl(rtcp_pkt->sr.sender_pcount) + 1); - - /* Update number of bytes */ - rtcp_pkt->sr.sender_bcount = - pj_htonl( pj_ntohl(rtcp_pkt->sr.sender_bcount) + bytes_payload_size ); + /* Update statistics */ + sess->stat.tx.pkt++; + sess->stat.tx.bytes += bytes_payload_size; } -PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *session, +PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *sess, const void *pkt, pj_size_t size) { @@ -183,19 +212,77 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *session, pj_assert(size >= sizeof(pjmedia_rtcp_common)+sizeof(pjmedia_rtcp_sr)); /* Save LSR from NTP timestamp of RTCP packet */ - session->rx_lsr = ((pj_ntohl(rtcp->sr.ntp_sec) & 0x0000FFFF) << 16) | - ((pj_ntohl(rtcp->sr.ntp_frac) >> 16) & 0xFFFF); + sess->rx_lsr = ((pj_ntohl(rtcp->sr.ntp_sec) & 0x0000FFFF) << 16) | + ((pj_ntohl(rtcp->sr.ntp_frac) >> 16) & 0xFFFF); /* Calculate SR arrival time for DLSR */ - pj_get_timestamp(&session->rx_lsr_time); + pj_get_timestamp(&sess->rx_lsr_time); TRACE_((THIS_FILE, "Rx RTCP SR: ntp-ts=%p, time=%p", - session->rx_lsr, - (pj_uint32_t)(session->rx_lsr_time.u64*65536/session->ts_freq.u64))); + sess->rx_lsr, + (pj_uint32_t)(sess->rx_lsr_time.u64*65536/sess->ts_freq.u64))); /* Calculate RTT if it has RR */ if (size >= sizeof(pjmedia_rtcp_pkt)) { + pj_uint32_t last_loss, jitter_samp, jitter; + + last_loss = sess->stat.tx.loss; + + /* Get packet loss */ + sess->stat.tx.loss = (rtcp->rr.total_lost_2 << 16) + + (rtcp->rr.total_lost_1 << 8) + + rtcp->rr.total_lost_0; + + /* We can't calculate the exact loss period for TX, so just give the + * best estimation. + */ + if (sess->stat.tx.loss > last_loss) { + unsigned period; + + /* Loss period in msec */ + period = (sess->stat.tx.loss - last_loss) * sess->pkt_size * + 1000 / sess->clock_rate; + + /* Loss period in usec */ + period *= 1000; + + if (sess->stat.tx.update_cnt==0||sess->stat.tx.loss_period.min==0) + sess->stat.tx.loss_period.min = period; + if (period < sess->stat.tx.loss_period.min) + sess->stat.tx.loss_period.min = period; + if (period > sess->stat.tx.loss_period.max) + sess->stat.tx.loss_period.max = period; + + sess->stat.tx.loss_period.avg = + (sess->stat.tx.loss_period.avg*sess->stat.tx.update_cnt+period) + / (sess->stat.tx.update_cnt + 1); + sess->stat.tx.loss_period.last = period; + } + + /* Get jitter value in usec */ + jitter_samp = pj_ntohl(rtcp->rr.jitter); + /* Calculate jitter in usec, avoiding overflows */ + if (jitter_samp <= 4294) + jitter = jitter_samp * 1000000 / sess->clock_rate; + else { + jitter = jitter_samp * 1000 / sess->clock_rate; + jitter *= 1000; + } + + /* Update jitter statistics */ + if (sess->stat.tx.update_cnt == 0) + sess->stat.tx.jitter.min = jitter; + if (jitter < sess->stat.tx.jitter.min && jitter) + sess->stat.tx.jitter.min = jitter; + if (jitter > sess->stat.tx.jitter.max) + sess->stat.tx.jitter.max = jitter; + sess->stat.tx.jitter.avg = + (sess->stat.tx.jitter.avg * sess->stat.tx.update_cnt + jitter) / + (sess->stat.tx.update_cnt + 1); + sess->stat.tx.jitter.last = jitter; + + /* Can only calculate if LSR and DLSR is present in RR */ if (rtcp->rr.lsr && rtcp->rr.dlsr) { pj_uint32_t lsr, now, dlsr; @@ -211,7 +298,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *session, dlsr = pj_ntohl(rtcp->rr.dlsr); /* Get current time, and convert to 1/65536 resolution */ - rtcp_get_ntp_time(session, &ntp); + rtcp_get_ntp_time(sess, &ntp); now = ((ntp.hi & 0xFFFF) << 16) + (ntp.lo >> 16); @@ -220,7 +307,7 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *session, /* Convert end to end delay to usec (keeping the calculation in * 64bit space):: - * session->ee_delay = (eedelay * 1000) / 65536; + * sess->ee_delay = (eedelay * 1000) / 65536; */ eedelay = (eedelay * 1000000) >> 16; @@ -233,48 +320,90 @@ PJ_DEF(void) pjmedia_rtcp_rx_rtcp( pjmedia_rtcp_session *session, * otherwise rtt will be invalid */ if (now-dlsr >= lsr) { - session->rtt_us = (pj_uint32_t)eedelay; + unsigned rtt = (pj_uint32_t)eedelay; + + if (sess->stat.rtt_update_cnt == 0) + sess->stat.rtt.min = rtt; + + if (rtt < sess->stat.rtt.min && rtt) + sess->stat.rtt.min = rtt; + if (rtt > sess->stat.rtt.max) + sess->stat.rtt.max = rtt; + + sess->stat.rtt.avg = + (sess->stat.rtt.avg * sess->stat.rtt_update_cnt + rtt) / + (sess->stat.rtt_update_cnt + 1); + + sess->stat.rtt.last = rtt; + sess->stat.rtt_update_cnt++; + } else { - PJ_LOG(3, (THIS_FILE, "Internal NTP clock skew detected")); + PJ_LOG(3, (THIS_FILE, "Internal NTP clock skew detected: " + "lsr=%p, now=%p, dlsr=%p (%d:%03dms)", + lsr, now, dlsr, dlsr/65536, + (dlsr%65536)*1000/65536)); } } + + pj_gettimeofday(&sess->stat.tx.update); + sess->stat.tx.update_cnt++; } } -static void rtcp_build_rtcp(pjmedia_rtcp_session *s, - pj_uint32_t receiver_ssrc) -{ - pj_uint32_t expected; - pj_uint32_t u32; - pj_uint32_t expected_interval, received_interval, lost_interval; - pjmedia_rtcp_pkt *rtcp_pkt = &s->rtcp_pkt; +PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *sess, + pjmedia_rtcp_pkt **ret_p_pkt, + int *len) +{ + pj_uint32_t expected, expected_interval, received_interval, lost_interval; + pj_uint32_t jitter_samp, jitter; + pjmedia_rtcp_pkt *rtcp_pkt = &sess->rtcp_pkt; + pjmedia_rtcp_ntp_rec ntp; + + /* Packet count */ + rtcp_pkt->sr.sender_pcount = pj_htonl(sess->stat.tx.pkt); + + /* Octets count */ + rtcp_pkt->sr.sender_bcount = pj_htonl(sess->stat.tx.bytes); /* SSRC and last_seq */ - rtcp_pkt->rr.ssrc = pj_htonl(receiver_ssrc); - rtcp_pkt->rr.last_seq = (s->seq_ctrl.cycles & 0xFFFF0000L); - rtcp_pkt->rr.last_seq += s->seq_ctrl.max_seq; + rtcp_pkt->rr.ssrc = pj_htonl(sess->peer_ssrc); + rtcp_pkt->rr.last_seq = (sess->seq_ctrl.cycles & 0xFFFF0000L); + rtcp_pkt->rr.last_seq += sess->seq_ctrl.max_seq; rtcp_pkt->rr.last_seq = pj_htonl(rtcp_pkt->rr.last_seq); + /* Jitter */ - rtcp_pkt->rr.jitter = pj_htonl(s->jitter >> 4); + jitter_samp = (sess->jitter >> 4); + rtcp_pkt->rr.jitter = pj_htonl(jitter_samp); + + /* Calculate jitter in usec, avoiding overflows */ + if (jitter_samp <= 4294) + jitter = jitter_samp * 1000000 / sess->clock_rate; + else { + jitter = jitter_samp * 1000 / sess->clock_rate; + jitter *= 1000; + } + + /* Update jitter statistics */ + sess->stat.rx.jitter.avg = + (sess->stat.rx.jitter.avg * sess->stat.rx.update_cnt + jitter) / + (sess->stat.rx.update_cnt + 1); /* Total lost. */ - expected = pj_ntohl(rtcp_pkt->rr.last_seq) - s->seq_ctrl.base_seq; - if (expected >= s->received) - u32 = expected - s->received; - else - u32 = 0; - rtcp_pkt->rr.total_lost_2 = (u32 >> 16) & 0x00FF; - rtcp_pkt->rr.total_lost_1 = (u32 >> 8) & 0x00FF; - rtcp_pkt->rr.total_lost_0 = u32 & 0x00FF; + expected = pj_ntohl(rtcp_pkt->rr.last_seq) - sess->seq_ctrl.base_seq; + if (expected >= sess->received) + sess->stat.rx.loss = expected - sess->received; + rtcp_pkt->rr.total_lost_2 = (sess->stat.rx.loss >> 16) & 0xFF; + rtcp_pkt->rr.total_lost_1 = (sess->stat.rx.loss >> 8) & 0xFF; + rtcp_pkt->rr.total_lost_0 = (sess->stat.rx.loss & 0xFF); /* Fraction lost calculation */ - expected_interval = expected - s->exp_prior; - s->exp_prior = expected; + expected_interval = expected - sess->exp_prior; + sess->exp_prior = expected; - received_interval = s->received - s->rx_prior; - s->rx_prior = s->received; + received_interval = sess->received - sess->rx_prior; + sess->rx_prior = sess->received; lost_interval = expected_interval - received_interval; @@ -283,35 +412,25 @@ static void rtcp_build_rtcp(pjmedia_rtcp_session *s, } else { rtcp_pkt->rr.fract_lost = (lost_interval << 8) / expected_interval; } -} - -PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *session, - pjmedia_rtcp_pkt **ret_p_pkt, - int *len) -{ - pjmedia_rtcp_pkt *rtcp_pkt = &session->rtcp_pkt; - pjmedia_rtcp_ntp_rec ntp; - - rtcp_build_rtcp(session, session->peer_ssrc); /* Get current NTP time. */ - rtcp_get_ntp_time(session, &ntp); + rtcp_get_ntp_time(sess, &ntp); /* Fill in NTP timestamp in SR. */ rtcp_pkt->sr.ntp_sec = pj_htonl(ntp.hi); rtcp_pkt->sr.ntp_frac = pj_htonl(ntp.lo); - if (session->rx_lsr_time.u64 == 0 || session->rx_lsr == 0) { + if (sess->rx_lsr_time.u64 == 0 || sess->rx_lsr == 0) { rtcp_pkt->rr.lsr = 0; rtcp_pkt->rr.dlsr = 0; } else { pj_timestamp ts; - pj_uint32_t lsr = session->rx_lsr; - pj_uint64_t lsr_time = session->rx_lsr_time.u64; + pj_uint32_t lsr = sess->rx_lsr; + pj_uint64_t lsr_time = sess->rx_lsr_time.u64; pj_uint32_t dlsr; /* Convert LSR time to 1/65536 seconds resolution */ - lsr_time = (lsr_time << 16) / session->ts_freq.u64; + lsr_time = (lsr_time << 16) / sess->ts_freq.u64; /* Fill in LSR. LSR is the middle 32bit of the last SR NTP time received. @@ -324,7 +443,7 @@ PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *session, pj_get_timestamp(&ts); /* Convert interval to 1/65536 seconds value */ - ts.u64 = (ts.u64 << 16) / session->ts_freq.u64; + ts.u64 = (ts.u64 << 16) / sess->ts_freq.u64; /* Get DLSR */ dlsr = (pj_uint32_t)(ts.u64 - lsr_time); @@ -340,6 +459,10 @@ PJ_DEF(void) pjmedia_rtcp_build_rtcp(pjmedia_rtcp_session *session, (dlsr%65536)*1000/65536 )); } + /* Update counter */ + pj_gettimeofday(&sess->stat.rx.update); + sess->stat.rx.update_cnt++; + /* Return pointer. */ *ret_p_pkt = rtcp_pkt; diff --git a/pjmedia/src/pjmedia/rtp.c b/pjmedia/src/pjmedia/rtp.c index 17b4e712..5103c5ac 100644 --- a/pjmedia/src/pjmedia/rtp.c +++ b/pjmedia/src/pjmedia/rtp.c @@ -34,6 +34,9 @@ #define MAX_MISORDER ((pj_int16_t)100) #define MIN_SEQUENTIAL ((pj_int16_t)2) +static void pjmedia_rtp_seq_restart(pjmedia_rtp_seq_session *seq_ctrl, + pj_uint16_t seq); + PJ_DEF(pj_status_t) pjmedia_rtp_session_init( pjmedia_rtp_session *ses, int default_pt, pj_uint32_t sender_ssrc ) @@ -161,9 +164,11 @@ PJ_DEF(pj_status_t) pjmedia_rtp_decode_rtp( pjmedia_rtp_session *ses, } -PJ_DEF(pj_status_t) pjmedia_rtp_session_update( pjmedia_rtp_session *ses, const pjmedia_rtp_hdr *hdr) +PJ_DEF(void) pjmedia_rtp_session_update( pjmedia_rtp_session *ses, + const pjmedia_rtp_hdr *hdr, + pjmedia_rtp_status *p_seq_st) { - int status; + pjmedia_rtp_status seq_st; /* Check SSRC. */ if (ses->peer_ssrc == 0) ses->peer_ssrc = pj_ntohl(hdr->ssrc); @@ -175,11 +180,21 @@ PJ_DEF(pj_status_t) pjmedia_rtp_session_update( pjmedia_rtp_session *ses, const } */ + /* Init status */ + seq_st.status.value = 0; + seq_st.diff = 0; + /* Check payload type. */ if (hdr->pt != ses->out_pt) { - PJ_LOG(4, (THIS_FILE, "pjmedia_rtp_session_update: ses=%p, invalid payload type %d (!=%d)", + PJ_LOG(4, (THIS_FILE, + "pjmedia_rtp_session_update: ses=%p, invalid payload " + "type %d (expecting %d)", ses, hdr->pt, ses->out_pt)); - return PJMEDIA_RTP_EINPT; + if (p_seq_st) { + p_seq_st->status.flag.bad = 1; + p_seq_st->status.flag.badpt = 1; + } + return; } /* Initialize sequence number on first packet received. */ @@ -187,87 +202,125 @@ PJ_DEF(pj_status_t) pjmedia_rtp_session_update( pjmedia_rtp_session *ses, const pjmedia_rtp_seq_init( &ses->seq_ctrl, pj_ntohs(hdr->seq) ); /* Check sequence number to see if remote session has been restarted. */ - status = pjmedia_rtp_seq_update( &ses->seq_ctrl, pj_ntohs(hdr->seq)); - if (status == PJMEDIA_RTP_ESESSRESTART) { - pjmedia_rtp_seq_restart( &ses->seq_ctrl, pj_ntohs(hdr->seq)); + pjmedia_rtp_seq_update( &ses->seq_ctrl, pj_ntohs(hdr->seq), &seq_st); + if (seq_st.status.flag.restart) { ++ses->received; - } else if (status == 0 || status == PJMEDIA_RTP_ESESSPROBATION) { + + } else if (!seq_st.status.flag.bad) { ++ses->received; } - - return status; + if (p_seq_st) { + p_seq_st->status.value = seq_st.status.value; + p_seq_st->diff = seq_st.diff; + } } -void pjmedia_rtp_seq_restart(pjmedia_rtp_seq_session *sctrl, pj_uint16_t seq) +void pjmedia_rtp_seq_restart(pjmedia_rtp_seq_session *sess, pj_uint16_t seq) { - sctrl->base_seq = seq; - sctrl->max_seq = seq; - sctrl->bad_seq = RTP_SEQ_MOD + 1; - sctrl->cycles = 0; + sess->base_seq = seq; + sess->max_seq = seq; + sess->bad_seq = RTP_SEQ_MOD + 1; + sess->cycles = 0; } -void pjmedia_rtp_seq_init(pjmedia_rtp_seq_session *sctrl, pj_uint16_t seq) +void pjmedia_rtp_seq_init(pjmedia_rtp_seq_session *sess, pj_uint16_t seq) { - pjmedia_rtp_seq_restart(sctrl, seq); + pjmedia_rtp_seq_restart(sess, seq); - sctrl->max_seq = (pj_uint16_t) (seq - 1); - sctrl->probation = MIN_SEQUENTIAL; + sess->max_seq = (pj_uint16_t) (seq - 1); + sess->probation = MIN_SEQUENTIAL; } -pj_status_t pjmedia_rtp_seq_update(pjmedia_rtp_seq_session *sctrl, - pj_uint16_t seq) +void pjmedia_rtp_seq_update( pjmedia_rtp_seq_session *sess, + pj_uint16_t seq, + pjmedia_rtp_status *seq_status) { - pj_uint16_t udelta = (pj_uint16_t) (seq - sctrl->max_seq); + pj_uint16_t udelta = (pj_uint16_t) (seq - sess->max_seq); + pjmedia_rtp_status st; + /* Init status */ + st.status.value = 0; + st.diff = 0; + /* * Source is not valid until MIN_SEQUENTIAL packets with * sequential sequence numbers have been received. */ - if (sctrl->probation) { - /* packet is in sequence */ - if (seq == sctrl->max_seq+ 1) { - sctrl->probation--; - sctrl->max_seq = seq; - if (sctrl->probation == 0) { - return PJMEDIA_RTP_ESESSRESTART; + if (sess->probation) { + + st.status.flag.probation = 1; + + if (seq == sess->max_seq+ 1) { + /* packet is in sequence */ + st.diff = 1; + sess->probation--; + sess->max_seq = seq; + if (sess->probation == 0) { + st.status.flag.probation = 0; } } else { - sctrl->probation = MIN_SEQUENTIAL - 1; - sctrl->max_seq = seq; + + st.diff = 0; + + st.status.flag.bad = 1; + if (seq == sess->max_seq) + st.status.flag.dup = 1; + else + st.status.flag.outorder = 1; + + sess->probation = MIN_SEQUENTIAL - 1; + sess->max_seq = seq; } - return PJMEDIA_RTP_ESESSPROBATION; + + + } else if (udelta == 0) { + + st.status.flag.dup = 1; } else if (udelta < MAX_DROPOUT) { /* in order, with permissible gap */ - if (seq < sctrl->max_seq) { + if (seq < sess->max_seq) { /* Sequence number wrapped - count another 64K cycle. */ - sctrl->cycles += RTP_SEQ_MOD; + sess->cycles += RTP_SEQ_MOD; } - sctrl->max_seq = seq; + sess->max_seq = seq; + + st.diff = udelta; } else if (udelta <= (RTP_SEQ_MOD - MAX_MISORDER)) { /* the sequence number made a very large jump */ - if (seq == sctrl->bad_seq) { + if (seq == sess->bad_seq) { /* * Two sequential packets -- assume that the other side * restarted without telling us so just re-sync * (i.e., pretend this was the first packet). */ - return PJMEDIA_RTP_ESESSRESTART; + pjmedia_rtp_seq_restart(sess, seq); + st.status.flag.restart = 1; + st.status.flag.probation = 1; + st.diff = 1; } else { - sctrl->bad_seq = (seq + 1) & (RTP_SEQ_MOD-1); - return PJMEDIA_RTP_EBADSEQ; + sess->bad_seq = (seq + 1) & (RTP_SEQ_MOD-1); + st.status.flag.bad = 1; + st.status.flag.outorder = 1; } } else { - /* duplicate or reordered packet */ + /* old duplicate or reordered packet. + * Not necessarily bad packet (?) + */ + st.status.flag.outorder = 1; } - return PJ_SUCCESS; + + if (seq_status) { + seq_status->diff = st.diff; + seq_status->status.value = st.status.value; + } } diff --git a/pjmedia/src/pjmedia/session.c b/pjmedia/src/pjmedia/session.c index 5f00df53..2b50485e 100644 --- a/pjmedia/src/pjmedia/session.c +++ b/pjmedia/src/pjmedia/session.c @@ -525,7 +525,7 @@ PJ_DEF(pj_status_t) pjmedia_session_get_port( pjmedia_session *session, */ PJ_DEF(pj_status_t) pjmedia_session_get_stream_stat( pjmedia_session *session, unsigned index, - pjmedia_stream_stat *stat) + pjmedia_rtcp_stat *stat) { PJ_ASSERT_RETURN(session && stat && index < session->stream_cnt, PJ_EINVAL); diff --git a/pjmedia/src/pjmedia/stream.c b/pjmedia/src/pjmedia/stream.c index 6f0d4881..378d3670 100644 --- a/pjmedia/src/pjmedia/stream.c +++ b/pjmedia/src/pjmedia/stream.c @@ -37,12 +37,13 @@ #define THIS_FILE "stream.c" #define ERRLEVEL 1 #define TRACE_(expr) stream_perror expr - +#define TRC_(expr) PJ_LOG(4,expr) #define PJMEDIA_MAX_FRAME_DURATION_MS 200 #define PJMEDIA_MAX_BUFFER_SIZE_MS 2000 #define PJMEDIA_MAX_MTU 1500 #define PJMEDIA_DTMF_DURATION 1600 /* in timestamp */ #define PJMEDIA_RTP_NAT_PROBATION_CNT 10 +#define PJMEDIA_RTCP_INTERVAL 5 /* seconds */ /** @@ -87,7 +88,6 @@ struct pjmedia_stream pjmedia_channel *dec; /**< Decoding channel. */ pjmedia_dir dir; /**< Stream direction. */ - pjmedia_stream_stat stat; /**< Stream statistics. */ void *user_data; /**< User data. */ pjmedia_codec *codec; /**< Codec instance being used. */ @@ -110,7 +110,10 @@ struct pjmedia_stream pj_ioqueue_key_t *rtcp_key; /**< RTCP ioqueue key. */ pj_ioqueue_op_key_t rtcp_op_key; /**< The pending read op key. */ - + pj_size_t rtcp_pkt_size; /**< Size of RTCP packet buf. */ + char rtcp_pkt[512]; /**< RTCP packet buffer. */ + pj_uint32_t rtcp_tx_time; /**< RTCP tx time in timestamp */ + int rtcp_addrlen; /**< Address length. */ /* RFC 2833 DTMF transmission queue: */ int tx_event_pt; /**< Outgoing pt for dtmf. */ @@ -269,6 +272,7 @@ static pj_status_t put_frame( pjmedia_port *port, pj_status_t status = 0; struct pjmedia_frame frame_out; int ts_len; + pj_bool_t has_tx; void *rtphdr; int rtphdrlen; pj_ssize_t sent; @@ -279,11 +283,15 @@ static pj_status_t put_frame( pjmedia_port *port, /* Init frame_out buffer. */ frame_out.buf = ((char*)channel->out_pkt) + sizeof(pjmedia_rtp_hdr); + /* Make compiler happy */ + frame_out.size = 0; + /* If we have DTMF digits in the queue, transmit the digits. * Otherwise encode the PCM buffer. */ if (stream->tx_dtmf_count) { + has_tx = PJ_TRUE; create_dtmf_payload(stream, &frame_out); /* Encapsulate. */ @@ -296,6 +304,7 @@ static pj_status_t put_frame( pjmedia_port *port, } else if (frame->type != PJMEDIA_FRAME_TYPE_NONE) { unsigned max_size; + has_tx = PJ_TRUE; max_size = channel->out_pkt_size - sizeof(pjmedia_rtp_hdr); status = stream->codec->op->encode( stream->codec, frame, max_size, @@ -316,39 +325,67 @@ static pj_status_t put_frame( pjmedia_port *port, } else { /* Just update RTP session's timestamp. */ + has_tx = PJ_FALSE; status = pjmedia_rtp_encode_rtp( &channel->rtp, 0, 0, 0, ts_len, (const void**)&rtphdr, &rtphdrlen); - return PJ_SUCCESS; } - if (status != 0) { + if (status != PJ_SUCCESS) { TRACE_((THIS_FILE, "RTP encode_rtp() error", status)); return status; } + /* Check if this is the time to transmit RTCP packet */ + if (stream->rtcp_tx_time == 0) { + stream->rtcp_tx_time = pj_ntohl(channel->rtp.out_hdr.ts) + + PJMEDIA_RTCP_INTERVAL * + stream->port.info.sample_rate; + } else if (pj_ntohl(channel->rtp.out_hdr.ts) >= stream->rtcp_tx_time) { + + pjmedia_rtcp_pkt *rtcp_pkt; + pj_ssize_t size; + int len; + + pjmedia_rtcp_build_rtcp(&stream->rtcp, &rtcp_pkt, &len); + size = len; + status = pj_sock_sendto(stream->skinfo.rtcp_sock, rtcp_pkt, &size, 0, + &stream->rem_rtcp_addr, + sizeof(stream->rem_rtcp_addr)); + if (status != PJ_SUCCESS) { + ; + } + + stream->rtcp_tx_time = pj_ntohl(channel->rtp.out_hdr.ts) + + PJMEDIA_RTCP_INTERVAL * + stream->port.info.sample_rate; + } + + /* Do nothing if we have nothing to transmit */ + if (!has_tx) + return PJ_SUCCESS; + if (rtphdrlen != sizeof(pjmedia_rtp_hdr)) { /* We don't support RTP with extended header yet. */ PJ_TODO(SUPPORT_SENDING_RTP_WITH_EXTENDED_HEADER); - //TRACE_((THIS_FILE, "Unsupported extended RTP header for transmission")); - return 0; + return PJ_SUCCESS; } pj_memcpy(channel->out_pkt, rtphdr, sizeof(pjmedia_rtp_hdr)); /* Send. */ sent = frame_out.size+sizeof(pjmedia_rtp_hdr); - status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt, &sent, 0, - &stream->rem_rtp_addr, sizeof(stream->rem_rtp_addr)); + status = pj_sock_sendto(stream->skinfo.rtp_sock, channel->out_pkt, + &sent, 0, &stream->rem_rtp_addr, + sizeof(stream->rem_rtp_addr)); if (status != PJ_SUCCESS) return status; /* Update stat */ - stream->stat.enc.pkt++; - stream->stat.enc.bytes += frame_out.size+sizeof(pjmedia_rtp_hdr); + pjmedia_rtcp_tx_rtp(&stream->rtcp, frame_out.size); return PJ_SUCCESS; } @@ -459,6 +496,7 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, const pjmedia_rtp_hdr *hdr; const void *payload; unsigned payloadlen; + pjmedia_rtp_status seq_st; /* Go straight to read next packet if bytes_read == 0. */ @@ -476,6 +514,10 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, } + /* Inform RTCP session */ + pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), + pj_ntohl(hdr->ts), payloadlen); + /* Handle incoming DTMF. */ if (hdr->pt == stream->rx_event_pt) { handle_incoming_dtmf(stream, payload, payloadlen); @@ -486,29 +528,20 @@ static void on_rx_rtp( pj_ioqueue_key_t *key, /* Update RTP session (also checks if RTP session can accept * the incoming packet. */ - status = pjmedia_rtp_session_update(&channel->rtp, hdr); - if (status != 0 && - status != PJMEDIA_RTP_ESESSPROBATION && - status != PJMEDIA_RTP_ESESSRESTART) - { - TRACE_((THIS_FILE, "RTP session_update error (details follows)", - status)); - PJ_LOG(4,(THIS_FILE,"RTP packet detail: pt=%d, seq=%d", - hdr->pt, pj_ntohs(hdr->seq))); + pjmedia_rtp_session_update(&channel->rtp, hdr, &seq_st); + if (seq_st.status.flag.bad) { + TRC_ ((THIS_FILE, + "RTP session_update error: badpt=%d, dup=%d, outorder=%d, " + "probation=%d, restart=%d", + seq_st.status.flag.badpt, + seq_st.status.flag.dup, + seq_st.status.flag.outorder, + seq_st.status.flag.probation, + seq_st.status.flag.restart)); goto read_next_packet; } - /* Update the RTCP session. */ - pjmedia_rtcp_rx_rtp(&stream->rtcp, pj_ntohs(hdr->seq), - pj_ntohl(hdr->ts)); - - - /* Update stat */ - stream->stat.dec.pkt++; - stream->stat.dec.bytes += bytes_read; - - /* See if source address of RTP packet is different than the * configured address. */ @@ -571,9 +604,36 @@ static void on_rx_rtcp( pj_ioqueue_key_t *key, pj_ioqueue_op_key_t *op_key, pj_ssize_t bytes_read) { - PJ_UNUSED_ARG(key); + pjmedia_stream *stream = pj_ioqueue_get_user_data(key); + pj_status_t status; + PJ_UNUSED_ARG(op_key); - PJ_UNUSED_ARG(bytes_read); + + do { + if (bytes_read > 0) { + pjmedia_rtcp_rx_rtcp(&stream->rtcp, stream->rtcp_pkt, + bytes_read); + } + + bytes_read = stream->rtcp_pkt_size; + stream->rtcp_addrlen = sizeof(stream->rem_rtcp_addr); + status = pj_ioqueue_recvfrom( stream->rtcp_key, + &stream->rtcp_op_key, + stream->rtcp_pkt, + &bytes_read, 0, + &stream->rem_rtcp_addr, + &stream->rtcp_addrlen); + + } while (status == PJ_SUCCESS); + + if (status != PJ_SUCCESS && status != PJ_EPENDING) { + char errmsg[PJ_ERR_MSG_SIZE]; + + pj_strerror(status, errmsg, sizeof(errmsg)); + PJ_LOG(4,(THIS_FILE, "Error reading RTCP packet: %s [status=%d]", + errmsg, status)); + } + } @@ -658,6 +718,7 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, pjmedia_stream *stream; pjmedia_codec_param codec_param; pj_ioqueue_callback ioqueue_cb; + pj_uint16_t rtcp_port; pj_status_t status; PJ_ASSERT_RETURN(pool && info && p_stream, PJ_EINVAL); @@ -690,13 +751,13 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, stream->user_data = user_data; stream->skinfo = info->sock_info; stream->rem_rtp_addr = info->rem_addr; + rtcp_port = (pj_uint16_t) (pj_ntohs(info->rem_addr.sin_port)+1); + stream->rem_rtcp_addr = stream->rem_rtp_addr; + stream->rem_rtcp_addr.sin_port = pj_htons(rtcp_port); stream->tx_event_pt = info->tx_event_pt; stream->rx_event_pt = info->rx_event_pt; stream->last_dtmf = -1; - - PJ_TODO(INITIALIZE_RTCP_REMOTE_ADDRESS); - /* Create mutex to protect jitter buffer: */ status = pj_mutex_create_simple(pool, NULL, &stream->jb_mutex); @@ -738,7 +799,9 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, /* Init RTCP session: */ - pjmedia_rtcp_init(&stream->rtcp, info->fmt.sample_rate, info->ssrc); + pjmedia_rtcp_init(&stream->rtcp, info->fmt.sample_rate, + stream->port.info.samples_per_frame, + info->ssrc); /* Create jitter buffer: */ @@ -799,6 +862,8 @@ PJ_DEF(pj_status_t) pjmedia_stream_create( pjmedia_endpt *endpt, /* Init pending operation key. */ pj_ioqueue_op_key_init(&stream->rtcp_op_key, sizeof(stream->rtcp_op_key)); + stream->rtcp_pkt_size = sizeof(stream->rtcp_pkt); + /* Bootstrap the first recvfrom() operation. */ on_rx_rtcp( stream->rtcp_key, &stream->rtcp_op_key, 0); @@ -900,12 +965,11 @@ PJ_DEF(pj_status_t) pjmedia_stream_start(pjmedia_stream *stream) * Get stream statistics. */ PJ_DEF(pj_status_t) pjmedia_stream_get_stat( const pjmedia_stream *stream, - pjmedia_stream_stat *stat) + pjmedia_rtcp_stat *stat) { PJ_ASSERT_RETURN(stream && stat, PJ_EINVAL); - pj_memcpy(stat, &stream->stat, sizeof(pjmedia_stream_stat)); - + pj_memcpy(stat, &stream->rtcp.stat, sizeof(pjmedia_rtcp_stat)); return PJ_SUCCESS; } |