From 122bb97947035e7fb84fbad5d648d7a88e86e8fb Mon Sep 17 00:00:00 2001 From: Henri Herscher Date: Thu, 7 Sep 2006 14:06:56 +0000 Subject: Timestamp correction is now managed by the RTP mixer. This also fixes some single-sided audio problems. git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@332 09dcff7a-b715-0410-9601-b79a96267cd0 --- orkaudio/audiocaptureplugins/voip/RtpSession.cpp | 32 ------- orkaudio/filters/rtpmixer/RtpMixer.cpp | 114 ++++++++++++++++++----- 2 files changed, 89 insertions(+), 57 deletions(-) diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp index b51e6f8..2aac32f 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp @@ -389,38 +389,6 @@ bool RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) m_numRtpPackets++; - // Compute the corrective delta - if(m_rtpTimestampCorrectiveDelta == 0 && m_lastRtpPacketSide2.get() != NULL) - { - if(m_lastRtpPacketSide2->m_timestamp > m_lastRtpPacketSide1->m_timestamp) - { - m_rtpTimestampCorrectiveSign = true; - m_rtpTimestampCorrectiveDelta = m_lastRtpPacketSide2->m_timestamp - m_lastRtpPacketSide1->m_timestamp; - } - else - { - m_rtpTimestampCorrectiveSign = false; - m_rtpTimestampCorrectiveDelta = m_lastRtpPacketSide1->m_timestamp - m_lastRtpPacketSide2->m_timestamp; - } - if(m_log->isInfoEnabled()) - { - CStdString timestampOffsetString = IntToString(m_rtpTimestampCorrectiveDelta); - LOG4CXX_INFO(m_log, m_trackingId + ": " + m_capturePort + ": " + "Applying timestamp corrective delta:" + timestampOffsetString); - } - } - // apply the corrective delta to packets of side 2 - if(channel == 2) - { - if(m_rtpTimestampCorrectiveSign) - { - correctedRtpTimestamp = rtpPacket->m_timestamp - m_rtpTimestampCorrectiveDelta; - } - else - { - correctedRtpTimestamp = rtpPacket->m_timestamp + m_rtpTimestampCorrectiveDelta; - } - } - if(m_log->isDebugEnabled()) { CStdString debug; diff --git a/orkaudio/filters/rtpmixer/RtpMixer.cpp b/orkaudio/filters/rtpmixer/RtpMixer.cpp index a213cc3..a335a8f 100644 --- a/orkaudio/filters/rtpmixer/RtpMixer.cpp +++ b/orkaudio/filters/rtpmixer/RtpMixer.cpp @@ -46,8 +46,10 @@ private: //AudioChunkRef m_outputAudioChunk; std::queue m_outputQueue; - void StoreRtpPacket(AudioChunkRef& chunk); - void CreateShipment(size_t silenceSize); + void StoreRtpPacket(AudioChunkRef& chunk, unsigned int correctedTimestamp); + void ManageOutOfRangeTimestamp(AudioChunkRef& chunk); + void CreateShipment(size_t silenceSize = 0, bool force = false); + void Reset(unsigned int timestamp); unsigned int FreeSpace(); unsigned int UsedSpace(); short* GetHoldOffPtr(); @@ -62,6 +64,9 @@ private: short m_buffer[NUM_SAMPLES_CIRCULAR_BUFFER]; unsigned int m_shippedSamples; log4cxx::LoggerPtr m_log; + double m_timestampCorrectiveDelta; + + bool m_invalidChannelReported; }; RtpMixer::RtpMixer() @@ -73,6 +78,9 @@ RtpMixer::RtpMixer() m_readTimestamp = 0; m_log = log4cxx::Logger::getLogger("rtpmixer"); m_shippedSamples = 0; + m_timestampCorrectiveDelta = 0.0; + + m_invalidChannelReported = false; } FilterRef RtpMixer::Instanciate() @@ -102,63 +110,89 @@ void RtpMixer::AudioChunkIn(AudioChunkRef& chunk) throw (CStdString("RtpMixer input audio must be PCM !")); } + unsigned int correctedTimestamp = 0; + + if(details->m_channel == 1) + { + correctedTimestamp = details->m_timestamp; + } + else if(details->m_channel == 2) + { + // Corrective delta always only applied to side 2. + double tmp = (double)details->m_timestamp - m_timestampCorrectiveDelta; + if(tmp < 0.0) + { + logMsg.Format("Corrected s2 timestamp is negative: ts:%u delta:%f wrts:%u", details->m_timestamp, m_timestampCorrectiveDelta, m_writeTimestamp); + LOG4CXX_ERROR(m_log, logMsg); + return; + } + correctedTimestamp = (unsigned int)tmp; + } + else + { + if(m_invalidChannelReported == false) + { + m_invalidChannelReported = true; + logMsg.Format("Invalid Channel:%d", details->m_channel); + LOG4CXX_ERROR(m_log, logMsg); + } + } + unsigned int rtpEndTimestamp = correctedTimestamp + chunk->GetNumSamples(); + if(m_log->isDebugEnabled()) { - logMsg.Format("New chunk, timestamp:%d", details->m_timestamp); + logMsg.Format("New chunk, s%d ts:%u corr-ts:%u", details->m_channel, details->m_timestamp, correctedTimestamp); LOG4CXX_DEBUG(m_log, logMsg); } - unsigned int rtpEndTimestamp = details->m_timestamp + chunk->GetNumSamples(); - if(m_writeTimestamp == 0) { // First RTP packet of the session LOG4CXX_DEBUG(m_log, "first chunk"); - m_writeTimestamp = details->m_timestamp; + m_writeTimestamp = correctedTimestamp; m_readTimestamp = m_writeTimestamp; - StoreRtpPacket(chunk); + StoreRtpPacket(chunk, correctedTimestamp); } - else if (details->m_timestamp >= m_readTimestamp) // drop packets that are older than last shipment + else if (correctedTimestamp >= m_readTimestamp) { - if( (int)(rtpEndTimestamp - m_writeTimestamp) <= (int)FreeSpace() && (int)(m_writeTimestamp - details->m_timestamp) <= (int)UsedSpace()) + if( (int)(rtpEndTimestamp - m_writeTimestamp) <= (int)FreeSpace() && (int)(m_writeTimestamp - correctedTimestamp) <= (int)UsedSpace()) { // RTP packet fits into current buffer - StoreRtpPacket(chunk); + StoreRtpPacket(chunk, correctedTimestamp); if(UsedSpace() > NUM_SAMPLES_TRIGGER) { // We have enough stuff, make a shipment - CreateShipment(0); + CreateShipment(); } } else { // RTP packet does not fit into current buffer // work out how much silence we need to add to the current buffer when shipping - size_t silenceSize = details->m_timestamp - m_writeTimestamp; + size_t silenceSize = correctedTimestamp - m_writeTimestamp; - if(silenceSize < (8000*60) && (details->m_timestamp > m_writeTimestamp)) // sanity check, maximum silence is 60 seconds @8KHz, otherwise, drop the chunk + if(silenceSize < (8000*10) && (correctedTimestamp > m_writeTimestamp)) // maximum silence is 10 seconds @8KHz { CreateShipment(silenceSize); // reset buffer - m_writePtr = m_buffer; - m_readPtr = m_buffer; - m_writeTimestamp = details->m_timestamp; - m_readTimestamp = m_writeTimestamp; + Reset(correctedTimestamp); // Store new packet - StoreRtpPacket(chunk); + StoreRtpPacket(chunk, correctedTimestamp); } else { - LOG4CXX_DEBUG(m_log, "Packet too distant in the future, dropped"); + // This chunk is newer than the curent timestamp window + ManageOutOfRangeTimestamp(chunk); } } } else { - LOG4CXX_DEBUG(m_log, "Packet too old, dropped"); + // This chunk is older than the current timestamp window + ManageOutOfRangeTimestamp(chunk); } if(m_log->isDebugEnabled()) { @@ -167,6 +201,36 @@ void RtpMixer::AudioChunkIn(AudioChunkRef& chunk) } } +void RtpMixer::ManageOutOfRangeTimestamp(AudioChunkRef& chunk) +{ + CStdString logMsg; + + AudioChunkDetails* details = chunk->GetDetails(); + if(details->m_channel == 1) + { + // 1. Ship what we have + CreateShipment(0, true); + + // 2. Reset circular buffer and add this new chunk + Reset(details->m_timestamp); + StoreRtpPacket(chunk ,details->m_timestamp); + } + else if(details->m_channel == 2) + { + // Calculate timestamp corrective delta so that next channel-2 chunk + // will be in the circular buffer timestamp window. + m_timestampCorrectiveDelta = (double)details->m_timestamp - (double)m_writeTimestamp; + } +} + +void RtpMixer::Reset(unsigned int timestamp) +{ + m_writePtr = m_buffer; + m_readPtr = m_buffer; + m_writeTimestamp = timestamp; + m_readTimestamp = m_writeTimestamp; +} + void RtpMixer::AudioChunkOut(AudioChunkRef& chunk) { if(m_outputQueue.size() > 0) @@ -202,13 +266,13 @@ int RtpMixer::GetInputRtpPayloadType(void) // does not link if not defined here } // Writes to the internal buffer without any size verification -void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk) +void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk, unsigned int correctedTimestamp) { CStdString debug; AudioChunkDetails* details = audioChunk->GetDetails(); // 1. Silence from write pointer until end of RTP packet - unsigned int endRtpTimestamp = details->m_timestamp + audioChunk->GetNumSamples(); + unsigned int endRtpTimestamp = correctedTimestamp + audioChunk->GetNumSamples(); if (endRtpTimestamp > m_writeTimestamp) { for(int i=0; i<(endRtpTimestamp - m_writeTimestamp); i++) @@ -227,7 +291,7 @@ void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk) } // 2. Mix in the latest samples from this RTP packet - unsigned int timestampDelta = m_writeTimestamp - details->m_timestamp; + unsigned int timestampDelta = m_writeTimestamp - correctedTimestamp; ASSERT(timestampDelta>=0); short* tempWritePtr = CicularPointerSubtractOffset(m_writePtr, timestampDelta); short* payload = (short *)audioChunk->m_pBuffer; @@ -278,13 +342,13 @@ short* RtpMixer::CicularPointerSubtractOffset(short *ptr, size_t offset) } } -void RtpMixer::CreateShipment(size_t silenceSize) +void RtpMixer::CreateShipment(size_t silenceSize, bool force) { // 1. ship from readPtr until stop pointer or until end of buffer if wrapped bool bufferWrapped = false; short* stopPtr = NULL; short* wrappedStopPtr = NULL; - if (silenceSize) + if (silenceSize || force) { // There is additional silence to ship, do not take holdoff into account stopPtr = m_writePtr; -- cgit v1.2.3