summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--orkaudio/audiocaptureplugins/voip/RtpSession.cpp32
-rw-r--r--orkaudio/filters/rtpmixer/RtpMixer.cpp114
2 files changed, 89 insertions, 57 deletions
diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
index b51e6f8..2aac32f 100644
--- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
+++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
@@ -389,38 +389,6 @@ bool RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket)
m_numRtpPackets++;
- // Compute the corrective delta
- if(m_rtpTimestampCorrectiveDelta == 0 && m_lastRtpPacketSide2.get() != NULL)
- {
- if(m_lastRtpPacketSide2->m_timestamp > m_lastRtpPacketSide1->m_timestamp)
- {
- m_rtpTimestampCorrectiveSign = true;
- m_rtpTimestampCorrectiveDelta = m_lastRtpPacketSide2->m_timestamp - m_lastRtpPacketSide1->m_timestamp;
- }
- else
- {
- m_rtpTimestampCorrectiveSign = false;
- m_rtpTimestampCorrectiveDelta = m_lastRtpPacketSide1->m_timestamp - m_lastRtpPacketSide2->m_timestamp;
- }
- if(m_log->isInfoEnabled())
- {
- CStdString timestampOffsetString = IntToString(m_rtpTimestampCorrectiveDelta);
- LOG4CXX_INFO(m_log, m_trackingId + ": " + m_capturePort + ": " + "Applying timestamp corrective delta:" + timestampOffsetString);
- }
- }
- // apply the corrective delta to packets of side 2
- if(channel == 2)
- {
- if(m_rtpTimestampCorrectiveSign)
- {
- correctedRtpTimestamp = rtpPacket->m_timestamp - m_rtpTimestampCorrectiveDelta;
- }
- else
- {
- correctedRtpTimestamp = rtpPacket->m_timestamp + m_rtpTimestampCorrectiveDelta;
- }
- }
-
if(m_log->isDebugEnabled())
{
CStdString debug;
diff --git a/orkaudio/filters/rtpmixer/RtpMixer.cpp b/orkaudio/filters/rtpmixer/RtpMixer.cpp
index a213cc3..a335a8f 100644
--- a/orkaudio/filters/rtpmixer/RtpMixer.cpp
+++ b/orkaudio/filters/rtpmixer/RtpMixer.cpp
@@ -46,8 +46,10 @@ private:
//AudioChunkRef m_outputAudioChunk;
std::queue<AudioChunkRef> m_outputQueue;
- void StoreRtpPacket(AudioChunkRef& chunk);
- void CreateShipment(size_t silenceSize);
+ void StoreRtpPacket(AudioChunkRef& chunk, unsigned int correctedTimestamp);
+ void ManageOutOfRangeTimestamp(AudioChunkRef& chunk);
+ void CreateShipment(size_t silenceSize = 0, bool force = false);
+ void Reset(unsigned int timestamp);
unsigned int FreeSpace();
unsigned int UsedSpace();
short* GetHoldOffPtr();
@@ -62,6 +64,9 @@ private:
short m_buffer[NUM_SAMPLES_CIRCULAR_BUFFER];
unsigned int m_shippedSamples;
log4cxx::LoggerPtr m_log;
+ double m_timestampCorrectiveDelta;
+
+ bool m_invalidChannelReported;
};
RtpMixer::RtpMixer()
@@ -73,6 +78,9 @@ RtpMixer::RtpMixer()
m_readTimestamp = 0;
m_log = log4cxx::Logger::getLogger("rtpmixer");
m_shippedSamples = 0;
+ m_timestampCorrectiveDelta = 0.0;
+
+ m_invalidChannelReported = false;
}
FilterRef RtpMixer::Instanciate()
@@ -102,63 +110,89 @@ void RtpMixer::AudioChunkIn(AudioChunkRef& chunk)
throw (CStdString("RtpMixer input audio must be PCM !"));
}
+ unsigned int correctedTimestamp = 0;
+
+ if(details->m_channel == 1)
+ {
+ correctedTimestamp = details->m_timestamp;
+ }
+ else if(details->m_channel == 2)
+ {
+ // Corrective delta always only applied to side 2.
+ double tmp = (double)details->m_timestamp - m_timestampCorrectiveDelta;
+ if(tmp < 0.0)
+ {
+ logMsg.Format("Corrected s2 timestamp is negative: ts:%u delta:%f wrts:%u", details->m_timestamp, m_timestampCorrectiveDelta, m_writeTimestamp);
+ LOG4CXX_ERROR(m_log, logMsg);
+ return;
+ }
+ correctedTimestamp = (unsigned int)tmp;
+ }
+ else
+ {
+ if(m_invalidChannelReported == false)
+ {
+ m_invalidChannelReported = true;
+ logMsg.Format("Invalid Channel:%d", details->m_channel);
+ LOG4CXX_ERROR(m_log, logMsg);
+ }
+ }
+ unsigned int rtpEndTimestamp = correctedTimestamp + chunk->GetNumSamples();
+
if(m_log->isDebugEnabled())
{
- logMsg.Format("New chunk, timestamp:%d", details->m_timestamp);
+ logMsg.Format("New chunk, s%d ts:%u corr-ts:%u", details->m_channel, details->m_timestamp, correctedTimestamp);
LOG4CXX_DEBUG(m_log, logMsg);
}
- unsigned int rtpEndTimestamp = details->m_timestamp + chunk->GetNumSamples();
-
if(m_writeTimestamp == 0)
{
// First RTP packet of the session
LOG4CXX_DEBUG(m_log, "first chunk");
- m_writeTimestamp = details->m_timestamp;
+ m_writeTimestamp = correctedTimestamp;
m_readTimestamp = m_writeTimestamp;
- StoreRtpPacket(chunk);
+ StoreRtpPacket(chunk, correctedTimestamp);
}
- else if (details->m_timestamp >= m_readTimestamp) // drop packets that are older than last shipment
+ else if (correctedTimestamp >= m_readTimestamp)
{
- if( (int)(rtpEndTimestamp - m_writeTimestamp) <= (int)FreeSpace() && (int)(m_writeTimestamp - details->m_timestamp) <= (int)UsedSpace())
+ if( (int)(rtpEndTimestamp - m_writeTimestamp) <= (int)FreeSpace() && (int)(m_writeTimestamp - correctedTimestamp) <= (int)UsedSpace())
{
// RTP packet fits into current buffer
- StoreRtpPacket(chunk);
+ StoreRtpPacket(chunk, correctedTimestamp);
if(UsedSpace() > NUM_SAMPLES_TRIGGER)
{
// We have enough stuff, make a shipment
- CreateShipment(0);
+ CreateShipment();
}
}
else
{
// RTP packet does not fit into current buffer
// work out how much silence we need to add to the current buffer when shipping
- size_t silenceSize = details->m_timestamp - m_writeTimestamp;
+ size_t silenceSize = correctedTimestamp - m_writeTimestamp;
- if(silenceSize < (8000*60) && (details->m_timestamp > m_writeTimestamp)) // sanity check, maximum silence is 60 seconds @8KHz, otherwise, drop the chunk
+ if(silenceSize < (8000*10) && (correctedTimestamp > m_writeTimestamp)) // maximum silence is 10 seconds @8KHz
{
CreateShipment(silenceSize);
// reset buffer
- m_writePtr = m_buffer;
- m_readPtr = m_buffer;
- m_writeTimestamp = details->m_timestamp;
- m_readTimestamp = m_writeTimestamp;
+ Reset(correctedTimestamp);
// Store new packet
- StoreRtpPacket(chunk);
+ StoreRtpPacket(chunk, correctedTimestamp);
}
else
{
- LOG4CXX_DEBUG(m_log, "Packet too distant in the future, dropped");
+ // This chunk is newer than the curent timestamp window
+ ManageOutOfRangeTimestamp(chunk);
}
}
}
else
{
- LOG4CXX_DEBUG(m_log, "Packet too old, dropped");
+ // This chunk is older than the current timestamp window
+ ManageOutOfRangeTimestamp(chunk);
}
if(m_log->isDebugEnabled())
{
@@ -167,6 +201,36 @@ void RtpMixer::AudioChunkIn(AudioChunkRef& chunk)
}
}
+void RtpMixer::ManageOutOfRangeTimestamp(AudioChunkRef& chunk)
+{
+ CStdString logMsg;
+
+ AudioChunkDetails* details = chunk->GetDetails();
+ if(details->m_channel == 1)
+ {
+ // 1. Ship what we have
+ CreateShipment(0, true);
+
+ // 2. Reset circular buffer and add this new chunk
+ Reset(details->m_timestamp);
+ StoreRtpPacket(chunk ,details->m_timestamp);
+ }
+ else if(details->m_channel == 2)
+ {
+ // Calculate timestamp corrective delta so that next channel-2 chunk
+ // will be in the circular buffer timestamp window.
+ m_timestampCorrectiveDelta = (double)details->m_timestamp - (double)m_writeTimestamp;
+ }
+}
+
+void RtpMixer::Reset(unsigned int timestamp)
+{
+ m_writePtr = m_buffer;
+ m_readPtr = m_buffer;
+ m_writeTimestamp = timestamp;
+ m_readTimestamp = m_writeTimestamp;
+}
+
void RtpMixer::AudioChunkOut(AudioChunkRef& chunk)
{
if(m_outputQueue.size() > 0)
@@ -202,13 +266,13 @@ int RtpMixer::GetInputRtpPayloadType(void) // does not link if not defined here
}
// Writes to the internal buffer without any size verification
-void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk)
+void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk, unsigned int correctedTimestamp)
{
CStdString debug;
AudioChunkDetails* details = audioChunk->GetDetails();
// 1. Silence from write pointer until end of RTP packet
- unsigned int endRtpTimestamp = details->m_timestamp + audioChunk->GetNumSamples();
+ unsigned int endRtpTimestamp = correctedTimestamp + audioChunk->GetNumSamples();
if (endRtpTimestamp > m_writeTimestamp)
{
for(int i=0; i<(endRtpTimestamp - m_writeTimestamp); i++)
@@ -227,7 +291,7 @@ void RtpMixer::StoreRtpPacket(AudioChunkRef& audioChunk)
}
// 2. Mix in the latest samples from this RTP packet
- unsigned int timestampDelta = m_writeTimestamp - details->m_timestamp;
+ unsigned int timestampDelta = m_writeTimestamp - correctedTimestamp;
ASSERT(timestampDelta>=0);
short* tempWritePtr = CicularPointerSubtractOffset(m_writePtr, timestampDelta);
short* payload = (short *)audioChunk->m_pBuffer;
@@ -278,13 +342,13 @@ short* RtpMixer::CicularPointerSubtractOffset(short *ptr, size_t offset)
}
}
-void RtpMixer::CreateShipment(size_t silenceSize)
+void RtpMixer::CreateShipment(size_t silenceSize, bool force)
{
// 1. ship from readPtr until stop pointer or until end of buffer if wrapped
bool bufferWrapped = false;
short* stopPtr = NULL;
short* wrappedStopPtr = NULL;
- if (silenceSize)
+ if (silenceSize || force)
{
// There is additional silence to ship, do not take holdoff into account
stopPtr = m_writePtr;