From 5aceeee0352021d6f14a06e30c8dd89cc998822a Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@09dcff7a-b715-0410-9601-b79a96267cd0> Date: Tue, 11 Apr 2006 20:51:39 +0000 Subject: This commit was manufactured by cvs2svn to create tag 'OREKA-0-5-8'. git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/tags/OREKA-0-5-8@215 09dcff7a-b715-0410-9601-b79a96267cd0 --- orkaudio/BatchProcessing.cpp | 12 +- orkaudio/CapturePort.cpp | 81 +++++++++-- orkaudio/CapturePort.h | 9 +- orkaudio/audiocaptureplugins/voip/RtpSession.cpp | 95 ++++++++----- orkaudio/audiocaptureplugins/voip/RtpSession.h | 4 +- orkaudio/audiocaptureplugins/voip/VoIp.cpp | 67 ++++++--- orkaudio/audiocaptureplugins/voip/VoIpConfig.cpp | 173 ++++++++++++++++++++++- orkaudio/audiocaptureplugins/voip/VoIpConfig.h | 11 ++ orkaudio/filters/rtpmixer/RtpMixer.cpp | 28 +++- orkbasecxx/Filter.cpp | 45 ++++-- 10 files changed, 435 insertions(+), 90 deletions(-) diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp index 42816c3..a381958 100644 --- a/orkaudio/BatchProcessing.cpp +++ b/orkaudio/BatchProcessing.cpp @@ -236,9 +236,15 @@ void BatchProcessing::ThreadHandler(void *args) if(CONFIG.m_batchProcessingEnhancePriority == false) { // Give up CPU between every audio buffer to make sure the actual recording always has priority - ACE_Time_Value yield; - yield.set(0,1); // 1 us - ACE_OS::sleep(yield); + //ACE_Time_Value yield; + //yield.set(0,1); // 1 us + //ACE_OS::sleep(yield); + + // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only. + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + ACE_OS::nanosleep (&ts, NULL); } } diff --git a/orkaudio/CapturePort.cpp b/orkaudio/CapturePort.cpp index 044520f..14e0e8a 100644 --- a/orkaudio/CapturePort.cpp +++ b/orkaudio/CapturePort.cpp @@ -20,12 +20,13 @@ #include "Reporting.h" #include "ConfigManager.h" -CapturePort::CapturePort(CStdString& Id) +CapturePort::CapturePort(CStdString& id) { - m_Id = Id; + m_id = id; m_vadBelowThresholdSec = 0.0; m_vadUp = false; m_capturing = false; + m_lastUpdated = 0; } CStdString CapturePort::ToString() @@ -34,9 +35,16 @@ CStdString CapturePort::ToString() return ret; } +CStdString CapturePort::GetId() +{ + return m_id; +} + + void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) { time_t now = time(NULL); + m_lastUpdated = now; if(CONFIG.m_audioSegmentation) { @@ -51,7 +59,7 @@ void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) AddCaptureEvent(eventRef); // create new tape - m_audioTapeRef.reset(new AudioTape(m_Id)); + m_audioTapeRef.reset(new AudioTape(m_id)); // signal new tape start event eventRef.reset(new CaptureEvent); @@ -63,7 +71,7 @@ void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) else { // create new tape - m_audioTapeRef.reset(new AudioTape(m_Id)); + m_audioTapeRef.reset(new AudioTape(m_id)); // signal new tape start event CaptureEventRef eventRef(new CaptureEvent); @@ -112,7 +120,7 @@ void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) m_vadUp = true; // create new tape - m_audioTapeRef.reset(new AudioTape(m_Id)); + m_audioTapeRef.reset(new AudioTape(m_id)); // signal new tape start event CaptureEventRef eventRef(new CaptureEvent); @@ -139,6 +147,8 @@ void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) { + m_lastUpdated = time(NULL); + AudioTapeRef audioTapeRef = m_audioTapeRef; // First of all, handle tape start @@ -149,16 +159,16 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) { audioTapeRef->SetShouldStop(); // force stop of previous tape } - audioTapeRef.reset(new AudioTape(m_Id)); // Create a new tape + audioTapeRef.reset(new AudioTape(m_id)); // Create a new tape audioTapeRef->AddCaptureEvent(eventRef, false); //Reporting::GetInstance()->AddAudioTape(audioTapeRef); m_audioTapeRef = audioTapeRef; - LOG4CXX_INFO(LOG.portLog, "#" + m_Id + ": start"); + LOG4CXX_INFO(LOG.portLog, "#" + m_id + ": start"); } if (!audioTapeRef.get()) { - LOG4CXX_WARN(LOG.portLog, "#" + m_Id + ": received unexpected capture event:" + LOG4CXX_WARN(LOG.portLog, "#" + m_id + ": received unexpected capture event:" + CaptureEvent::EventTypeToString(eventRef->m_type)); } else @@ -169,7 +179,7 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) case CaptureEvent::EtStop: m_capturing = false; - LOG4CXX_INFO(LOG.portLog, "#" + m_Id + ": stop"); + LOG4CXX_INFO(LOG.portLog, "#" + m_id + ": stop"); audioTapeRef->AddCaptureEvent(eventRef, true); if (m_audioTapeRef->GetAudioFileRef().get()) @@ -182,7 +192,7 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) else { // Received a stop but there is no valid audio file associated with the tape - LOG4CXX_WARN(LOG.portLog, "#" + m_Id + ": no audio reported between last start and stop"); + LOG4CXX_WARN(LOG.portLog, "#" + m_id + ": no audio reported between last start and stop"); } break; case CaptureEvent::EtDirection: @@ -195,16 +205,27 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) } } +bool CapturePort::IsExpired(time_t now) +{ + if((now - m_lastUpdated) > 60) // 1 minute + { + return true; + } + return false; +} + //======================================= - -void CapturePorts::Initialize() +CapturePorts::CapturePorts() { m_ports.clear(); + m_lastHooveringTime = time(NULL); } CapturePortRef CapturePorts::GetPort(CStdString & portId) { + Hoover(); + std::map::iterator pair; pair = m_ports.find(portId); @@ -222,7 +243,7 @@ CapturePortRef CapturePorts::GetPort(CStdString & portId) CapturePortRef CapturePorts::AddAndReturnPort(CStdString & portId) { - //MutexGuard mutexGuard(m_mutex); // To make sure a channel cannot be created twice + //MutexGuard mutexGuard(m_mutex); // To make sure a channel cannot be created twice - not used for now. CapturePorts only ever gets interaction from capture single thread CapturePortRef portRef = GetPort(portId); if (portRef.get() == NULL) @@ -238,4 +259,38 @@ CapturePortRef CapturePorts::AddAndReturnPort(CStdString & portId) } } +void CapturePorts::Hoover() +{ + CStdString logMsg; + time_t now = time(NULL); + if( (now - m_lastHooveringTime) > 10) // Hoover every 10 seconds + { + m_lastHooveringTime = now; + int numPorts = m_ports.size(); + + // Go round and detect inactive ports + std::map::iterator pair; + std::list toDismiss; + + for(pair = m_ports.begin(); pair != m_ports.end(); pair++) + { + CapturePortRef port = pair->second; + if(port->IsExpired(now)) + { + toDismiss.push_back(port); + } + } + + // Discard inactive ports + for (std::list::iterator it = toDismiss.begin(); it != toDismiss.end() ; it++) + { + CapturePortRef port = *it; + m_ports.erase(port->GetId()); + LOG4CXX_DEBUG(LOG.portLog, port->GetId() + ": Expired"); + } + logMsg.Format("Hoovered %d ports. New number:%d", (numPorts - m_ports.size()), m_ports.size()); + LOG4CXX_DEBUG(LOG.portLog, logMsg); + } +} + diff --git a/orkaudio/CapturePort.h b/orkaudio/CapturePort.h index bd62731..69b4c0a 100644 --- a/orkaudio/CapturePort.h +++ b/orkaudio/CapturePort.h @@ -33,16 +33,19 @@ class CapturePort public: CapturePort(CStdString& Id); CStdString ToString(); + CStdString GetId(); void AddAudioChunk(AudioChunkRef chunkRef); void AddCaptureEvent(CaptureEventRef eventRef); + bool IsExpired(time_t now); private: - CStdString m_Id; + CStdString m_id; AudioTapeRef m_audioTapeRef; ACE_Thread_Mutex m_mutex; bool m_capturing; double m_vadBelowThresholdSec; bool m_vadUp; + time_t m_lastUpdated; }; typedef boost::shared_ptr CapturePortRef; @@ -51,13 +54,15 @@ typedef boost::shared_ptr CapturePortRef; class CapturePorts { public: - void Initialize(); + CapturePorts(); CapturePortRef GetPort(CStdString & portId); /** Tries to find a capture port from its ID. If unsuccessful, creates a new one and returns it */ CapturePortRef AddAndReturnPort(CStdString & portId); + void Hoover(); private: std::map m_ports; ACE_Thread_Mutex m_mutex; + time_t m_lastHooveringTime; }; typedef ACE_Singleton CapturePortsSingleton; diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp index b7fe114..2e74786 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp @@ -30,12 +30,6 @@ extern VoIpConfigTopObjectRef g_VoIpConfigTopObjectRef; #define DLLCONFIG g_VoIpConfigTopObjectRef.get()->m_config -SipInviteInfo::SipInviteInfo() -{ - m_fromIp.s_addr = 0; -} - - RtpSession::RtpSession(CStdString& trackingId) { m_trackingId = trackingId; @@ -54,9 +48,10 @@ RtpSession::RtpSession(CStdString& trackingId) void RtpSession::Stop() { + LOG4CXX_INFO(m_log, m_trackingId + ": " + m_capturePort + " Session stop"); + if(m_started) { - LOG4CXX_INFO(m_log, m_trackingId + ": " + m_capturePort + " Session stop"); CaptureEventRef stopEvent(new CaptureEvent); stopEvent->m_type = CaptureEvent::EtStop; stopEvent->m_timestamp = time(NULL); @@ -152,13 +147,13 @@ void RtpSession::ProcessMetadataSip(RtpPacketInfoRef& rtpPacket) bool done = false; // work out invitee IP address - if(rtpPacket->m_sourceIp.s_addr == m_invitorIp.s_addr) + if((unsigned int)rtpPacket->m_sourceIp.s_addr == (unsigned int)m_invitorIp.s_addr) { m_inviteeIp = rtpPacket->m_destIp; m_inviteeTcpPort = rtpPacket->m_destPort; m_invitorTcpPort = rtpPacket->m_sourcePort; } - else if(rtpPacket->m_destIp.s_addr == m_invitorIp.s_addr) + else if((unsigned int)rtpPacket->m_destIp.s_addr == (unsigned int)m_invitorIp.s_addr) { m_inviteeIp = rtpPacket->m_sourceIp; m_inviteeTcpPort = rtpPacket->m_sourcePort; @@ -281,7 +276,7 @@ void RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) } else { - if(rtpPacket->m_sourceIp.s_addr == m_lastRtpPacketSide1->m_sourceIp.s_addr) + if((unsigned int)rtpPacket->m_sourceIp.s_addr == (unsigned int)m_lastRtpPacketSide1->m_sourceIp.s_addr) { m_lastRtpPacketSide1 = rtpPacket; channel = 1; @@ -355,8 +350,7 @@ void RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) details.m_encoding = AlawAudio; AudioChunkRef chunk(new AudioChunk()); chunk->SetBuffer(rtpPacket->m_payload, rtpPacket->m_payloadSize, details); - g_audioChunkCallBack(chunk, m_capturePort); // ##### after - //m_rtpRingBuffer.AddRtpPacket(rtpPacket); // ##### before + g_audioChunkCallBack(chunk, m_capturePort); m_lastUpdated = rtpPacket->m_arrivalTimestamp; } @@ -366,7 +360,7 @@ void RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) void RtpSession::ReportSipInvite(SipInviteInfoRef& invite) { m_invite = invite; - m_invitorIp = invite->m_fromIp; + m_invitorIp = invite->m_fromRtpIp; } int RtpSession::ProtocolToEnum(CStdString& protocol) @@ -416,34 +410,36 @@ RtpSessions::RtpSessions() void RtpSessions::ReportSipInvite(SipInviteInfoRef& invite) { - char szFromIp[16]; - ACE_OS::inet_ntop(AF_INET, (void*)&invite->m_fromIp, szFromIp, sizeof(szFromIp)); + char szFromRtpIp[16]; + ACE_OS::inet_ntop(AF_INET, (void*)&invite->m_fromRtpIp, szFromRtpIp, sizeof(szFromRtpIp)); - CStdString ipAndPort = CStdString(szFromIp) + "," + invite->m_fromRtpPort; + CStdString ipAndPort = CStdString(szFromRtpIp) + "," + invite->m_fromRtpPort; std::map::iterator pair; pair = m_byIpAndPort.find(ipAndPort); if (pair != m_byIpAndPort.end()) { - // #### old behaviour - // A session exists ont the same IP+port, stop old session - //RtpSessionRef session = pair->second; - //Stop(session); - - // #### new behaviour // The session already exists, do nothing return; } pair = m_byCallId.find(invite->m_callId); if (pair != m_byCallId.end()) { - // #### old behaviour - // A session exists ont the same CallId, stop old session - //RtpSessionRef session = pair->second; - //Stop(session); + // The session already exists + RtpSessionRef session = pair->second; + if(!session->m_ipAndPort.Equals(ipAndPort)) + { + // The session RTP connection address has changed + // Remove session from IP and Port map + m_byIpAndPort.erase(session->m_ipAndPort); + // ... update + session->m_ipAndPort = ipAndPort; + session->ReportSipInvite(invite); + // ... and reinsert + m_byIpAndPort.insert(std::make_pair(session->m_ipAndPort, session)); - // #### new behaviour - // The session already exists, do nothing + LOG4CXX_INFO(m_log, session->m_trackingId + ": updated with new INVITE data"); + } return; } @@ -456,6 +452,11 @@ void RtpSessions::ReportSipInvite(SipInviteInfoRef& invite) session->ReportSipInvite(invite); m_byIpAndPort.insert(std::make_pair(session->m_ipAndPort, session)); m_byCallId.insert(std::make_pair(session->m_callId, session)); + + CStdString numSessions = IntToString(m_byIpAndPort.size()); + LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); + + LOG4CXX_INFO(m_log, trackingId + ": created by SIP INVITE"); } void RtpSessions::ReportSipBye(SipByeInfo bye) @@ -515,6 +516,10 @@ void RtpSessions::ReportSkinnyCallInfo(SkCallInfoStruct* callInfo, IpHeaderStruc } m_byCallId.insert(std::make_pair(session->m_callId, session)); + + CStdString numSessions = IntToString(m_byIpAndPort.size()); + LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); + } void RtpSessions::ReportSkinnyStartMediaTransmission(SkStartMediaTransmissionStruct* startMedia, IpHeaderStruct* ipHeader) @@ -532,7 +537,7 @@ void RtpSessions::ReportSkinnyStartMediaTransmission(SkStartMediaTransmissionStr { RtpSessionRef tmpSession = pair->second; - if(tmpSession->m_endPointIp.s_addr == ipHeader->ip_dest.s_addr) + if((unsigned int)tmpSession->m_endPointIp.s_addr == (unsigned int)ipHeader->ip_dest.s_addr) { if(tmpSession->m_ipAndPort.size() == 0) { @@ -608,6 +613,9 @@ void RtpSessions::ReportSkinnyStartMediaTransmission(SkStartMediaTransmissionStr session->m_ipAndPort = ipAndPort; m_byIpAndPort.insert(std::make_pair(session->m_ipAndPort, session)); + + CStdString numSessions = IntToString(m_byIpAndPort.size()); + LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); } else { @@ -658,6 +666,9 @@ void RtpSessions::Stop(RtpSessionRef& session) if(session->m_ipAndPort.size() > 0) { m_byIpAndPort.erase(session->m_ipAndPort); + + CStdString numSessions = IntToString(m_byIpAndPort.size()); + LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); } if(session->m_callId.size() > 0) { @@ -687,7 +698,7 @@ void RtpSessions::ReportRtpPacket(RtpPacketInfoRef& rtpPacket) if (pair != m_byIpAndPort.end()) { session1 = pair->second; - if (!session1.get() == NULL) + if (session1.get() != NULL) { // Found a session give it the RTP packet info session1->AddRtpPacket(rtpPacket); @@ -705,7 +716,7 @@ void RtpSessions::ReportRtpPacket(RtpPacketInfoRef& rtpPacket) if (pair != m_byIpAndPort.end()) { session2 = pair->second; - if (!session2.get() == NULL) + if (session2.get() != NULL) { // Found a session give it the RTP packet info session2->AddRtpPacket(rtpPacket); @@ -764,6 +775,11 @@ void RtpSessions::ReportRtpPacket(RtpPacketInfoRef& rtpPacket) session->m_ipAndPort = ipAndPort; session->AddRtpPacket(rtpPacket); m_byIpAndPort.insert(std::make_pair(ipAndPort, session)); + + CStdString numSessions = IntToString(m_byIpAndPort.size()); + LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); + + LOG4CXX_INFO(m_log, trackingId + ": created by RTP packet"); } } @@ -820,12 +836,23 @@ void RtpSessions::Hoover(time_t now) } //========================================================== +SipInviteInfo::SipInviteInfo() +{ + m_fromRtpIp.s_addr = 0; +} + void SipInviteInfo::ToString(CStdString& string) { - char fromIp[16]; - ACE_OS::inet_ntop(AF_INET, (void*)&m_fromIp, fromIp, sizeof(fromIp)); + char fromRtpIp[16]; + ACE_OS::inet_ntop(AF_INET, (void*)&m_fromRtpIp, fromRtpIp, sizeof(fromRtpIp)); + + char senderIp[16]; + ACE_OS::inet_ntop(AF_INET, (void*)&m_senderIp, senderIp, sizeof(senderIp)); + + char receiverIp[16]; + ACE_OS::inet_ntop(AF_INET, (void*)&m_receiverIp, receiverIp, sizeof(receiverIp)); - string.Format("from:%s %s,%s to:%s callid:%s", m_from, fromIp, m_fromRtpPort, m_to, m_callId); + string.Format("sender:%s from:%s RTP:%s,%s to:%s rcvr:%s callid:%s", senderIp, m_from, fromRtpIp, m_fromRtpPort, m_to, receiverIp, m_callId); } diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.h b/orkaudio/audiocaptureplugins/voip/RtpSession.h index 50e979b..6e9c739 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.h +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.h @@ -26,7 +26,9 @@ public: SipInviteInfo(); void ToString(CStdString& string); - struct in_addr m_fromIp; + struct in_addr m_senderIp; + struct in_addr m_receiverIp; + struct in_addr m_fromRtpIp; CStdString m_fromRtpPort; CStdString m_from; CStdString m_to; diff --git a/orkaudio/audiocaptureplugins/voip/VoIp.cpp b/orkaudio/audiocaptureplugins/voip/VoIp.cpp index f9124c8..e8702c0 100644 --- a/orkaudio/audiocaptureplugins/voip/VoIp.cpp +++ b/orkaudio/audiocaptureplugins/voip/VoIp.cpp @@ -44,6 +44,7 @@ static LoggerPtr s_rtpPacketLog; static LoggerPtr s_sipPacketLog; static LoggerPtr s_skinnyPacketLog; static LoggerPtr s_sipExtractionLog; +static LoggerPtr s_voipPluginLog; static time_t s_lastHooveringTime; static ACE_Thread_Mutex s_mutex; static bool s_liveCapture; @@ -212,9 +213,12 @@ bool TrySipBye(EthernetHeaderStruct* ethernetHeader, IpHeaderStruct* ipHeader, U if(callIdField) { GrabToken(callIdField, info.m_callId); - RtpSessionsSingleton::instance()->ReportSipBye(info); } LOG4CXX_INFO(s_sipPacketLog, "BYE: callid:" + info.m_callId); + if(callIdField) + { + RtpSessionsSingleton::instance()->ReportSipBye(info); + } } return result; } @@ -222,6 +226,7 @@ bool TrySipBye(EthernetHeaderStruct* ethernetHeader, IpHeaderStruct* ipHeader, U bool TrySipInvite(EthernetHeaderStruct* ethernetHeader, IpHeaderStruct* ipHeader, UdpHeaderStruct* udpHeader, u_char* udpPayload) { bool result = false; + bool drop = false; if (memcmp("INVITE", (void*)udpPayload, 6) == 0) { result = true; @@ -260,13 +265,9 @@ bool TrySipInvite(EthernetHeaderStruct* ethernetHeader, IpHeaderStruct* ipHeader } if(toField) { - char* toFieldEnd = NULL; - if(s_sipExtractionLog->isDebugEnabled()) - { - CStdString to; - toFieldEnd = GrabLine(toField, sipEnd, to); - LOG4CXX_DEBUG(s_sipExtractionLog, "to: " + to); - } + CStdString to; + char* toFieldEnd = GrabLine(toField, sipEnd, to); + LOG4CXX_DEBUG(s_sipExtractionLog, "to: " + to); char* sipUser = memFindAfter("sip:", toField, toFieldEnd); if(sipUser) @@ -297,25 +298,36 @@ bool TrySipInvite(EthernetHeaderStruct* ethernetHeader, IpHeaderStruct* ipHeader { if(ACE_OS::inet_aton((PCSTR)connectionAddress, &fromIp)) { - info->m_fromIp = fromIp; + info->m_fromRtpIp = fromIp; + + if (DLLCONFIG.m_sipDropIndirectInvite) + { + if((unsigned int)fromIp.s_addr != (unsigned int)ipHeader->ip_src.s_addr) + { + // SIP invite SDP connection address does not match with SIP packet origin + drop =true; + } + } } } } - if(info->m_fromIp.s_addr == 0) + if((unsigned int)info->m_fromRtpIp.s_addr == 0) { // In case connection address could not be extracted, use SIP invite sender IP address - info->m_fromIp = ipHeader->ip_src; - } - - if(info->m_fromRtpPort.size() && info->m_from.size() && info->m_to.size() && info->m_callId.size()) - { - RtpSessionsSingleton::instance()->ReportSipInvite(info); + info->m_fromRtpIp = ipHeader->ip_src; } + info->m_senderIp = ipHeader->ip_src; + info->m_receiverIp = ipHeader->ip_dest; CStdString logMsg; info->ToString(logMsg); logMsg = "INVITE: " + logMsg; LOG4CXX_INFO(s_sipPacketLog, logMsg); + + if(drop == false && info->m_fromRtpPort.size() && info->m_from.size() && info->m_to.size() && info->m_callId.size()) + { + RtpSessionsSingleton::instance()->ReportSipInvite(info); + } } return result; } @@ -378,9 +390,20 @@ void HandlePacket(u_char *param, const struct pcap_pkthdr *header, const u_char { // This is a pcap file replay, make sure Orkaudio won't be flooded by too many // packets at a time by yielding control to other threads. - ACE_Time_Value yield; - yield.set(0,1); // 1 us - ACE_OS::sleep(yield); + //ACE_Time_Value yield; + //yield.set(0,1); // 1 us + //ACE_OS::sleep(yield); + + // Use nanosleep instead + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + ACE_OS::nanosleep (&ts, NULL); + } + + if(DLLCONFIG.IsPacketWanted(ipHeader) == false) + { + return; } if(ipHeader->ip_p == IPPROTO_UDP) @@ -496,6 +519,8 @@ VoIp::VoIp() void Configure(DOMNode* node) { + s_voipPluginLog = Logger::getLogger("voipplugin"); + if (node) { VoIpConfigTopObjectRef VoIpConfigTopObjectRef(new VoIpConfigTopObject); @@ -506,12 +531,12 @@ void Configure(DOMNode* node) } catch (CStdString& e) { - LOG4CXX_WARN(g_logManager->rootLog, "VoIp.dll: " + e); + LOG4CXX_ERROR(s_voipPluginLog, e); } } else { - LOG4CXX_WARN(g_logManager->rootLog, "VoIp.dll: got empty DOM tree"); + LOG4CXX_ERROR(s_voipPluginLog, "Got empty DOM tree"); } } diff --git a/orkaudio/audiocaptureplugins/voip/VoIpConfig.cpp b/orkaudio/audiocaptureplugins/voip/VoIpConfig.cpp index 7cb6dde..5a12214 100644 --- a/orkaudio/audiocaptureplugins/voip/VoIpConfig.cpp +++ b/orkaudio/audiocaptureplugins/voip/VoIpConfig.cpp @@ -24,6 +24,8 @@ VoIpConfig::VoIpConfig() m_asciiLanMasks.push_back("192.168.255.255"); m_asciiLanMasks.push_back("10.255.255.255"); m_asciiLanMasks.push_back("172.31.255.255"); + + m_sipDropIndirectInvite = false; } void VoIpConfig::Define(Serializer* s) @@ -32,8 +34,13 @@ void VoIpConfig::Define(Serializer* s) s->CsvValue("Devices", m_devices); s->CsvValue("LanMasks", m_asciiLanMasks); s->CsvValue("MediaGateways", m_asciiMediaGateways); + + s->CsvValue("BlockedIpRanges", m_asciiBlockedIpRanges); + s->CsvValue("AllowedIpRanges", m_asciiAllowedIpRanges); + s->StringValue("PcapFile", m_pcapFile); s->StringValue("PcapDirectory", m_pcapDirectory); + s->BoolValue("SipDropIndirectInvite", m_sipDropIndirectInvite); } void VoIpConfig::Validate() @@ -50,7 +57,7 @@ void VoIpConfig::Validate() } else { - throw (CStdString("VoIpConfig: invalid IP address in LanMasks:" + *it)); + throw (CStdString("VoIpConfig: invalid IP address in LanMasks:" + *it) + " please fix config.xml"); } } @@ -65,7 +72,100 @@ void VoIpConfig::Validate() } else { - throw (CStdString("VoIpConfig: invalid IP address in MediaGateways:" + *it)); + throw (CStdString("VoIpConfig: invalid IP address in MediaGateways:" + *it) + " please fix config.xml"); + } + } + + // Iterate over ascii allowed IP ranges and populate the bit width and prefix lists + m_allowedIpRangePrefixes.clear(); + m_allowedIpRangeBitWidths.clear(); + for(it = m_asciiAllowedIpRanges.begin(); it != m_asciiAllowedIpRanges.end(); it++) + { + CStdString cidrPrefixLengthString; + unsigned int cidrPrefixLength = 32; // by default, x.x.x.x/32 + CStdString cidrIpAddressString; + struct in_addr cidrIpAddress; + + CStdString entry = *it; + int slashPosition = entry.Find('/'); + if(slashPosition > 0) + { + cidrIpAddressString = entry.Left(slashPosition); + cidrPrefixLengthString = entry.Mid(slashPosition+1); + + bool notAnInt = false; + try + { + cidrPrefixLength = StringToInt(cidrPrefixLengthString); + } + catch (...) {notAnInt = true;} + if(cidrPrefixLength < 1 || cidrPrefixLength > 32 || notAnInt) + { + throw (CStdString("VoIpConfig: invalid CIDR prefix length in AllowedIpRanges:" + entry) + " please fix config.xml"); + } + } + else + { + cidrIpAddressString = entry; + } + + if(ACE_OS::inet_aton((PCSTR)cidrIpAddressString, &cidrIpAddress)) + { + unsigned int rangeBitWidth = 32-cidrPrefixLength; + unsigned int prefix = ntohl((unsigned int)cidrIpAddress.s_addr) >> (rangeBitWidth); + m_allowedIpRangePrefixes.push_back(prefix); + m_allowedIpRangeBitWidths.push_back(rangeBitWidth); + } + else + { + throw (CStdString("VoIpConfig: invalid IP range in AllowedIpRanges:" + entry) + " please fix config.xml"); + } + } + + + // Iterate over ascii blocked IP ranges and populate the bit width and prefix lists + m_blockedIpRangePrefixes.clear(); + m_blockedIpRangeBitWidths.clear(); + for(it = m_asciiBlockedIpRanges.begin(); it != m_asciiBlockedIpRanges.end(); it++) + { + CStdString cidrPrefixLengthString; + unsigned int cidrPrefixLength = 32; // by default, x.x.x.x/32 + CStdString cidrIpAddressString; + struct in_addr cidrIpAddress; + + CStdString entry = *it; + int slashPosition = entry.Find('/'); + if(slashPosition > 0) + { + cidrIpAddressString = entry.Left(slashPosition); + cidrPrefixLengthString = entry.Mid(slashPosition+1); + + bool notAnInt = false; + try + { + cidrPrefixLength = StringToInt(cidrPrefixLengthString); + } + catch (...) {notAnInt = true;} + if(cidrPrefixLength < 1 || cidrPrefixLength > 32 || notAnInt) + { + throw (CStdString("VoIpConfig: invalid CIDR prefix length in blockedIpRanges:" + entry) + " please fix config.xml"); + } + } + else + { + cidrIpAddressString = entry; + } + + if(ACE_OS::inet_aton((PCSTR)cidrIpAddressString, &cidrIpAddress)) + { + unsigned int rangeBitWidth = 32-cidrPrefixLength; + unsigned int prefix = ntohl((unsigned int)cidrIpAddress.s_addr) >> (rangeBitWidth); + m_blockedIpRangePrefixes.push_back(prefix); + m_blockedIpRangeBitWidths.push_back(rangeBitWidth); + } + else + { + throw (CStdString("VoIpConfig: invalid IP range in BlockedIpRanges:" + entry) + " please fix config.xml"); } } } @@ -94,6 +194,75 @@ bool VoIpConfig::IsMediaGateway(struct in_addr addr) return false; } +bool VoIpConfig::IsPacketWanted(IpHeaderStruct* ipHeader) +{ + bool wanted = true; // keep packet by default + + // If source or destination IP address does not match any existing allowing mask, drop packet + if(m_allowedIpRangePrefixes.size() > 0) + { + wanted = false; // Presence of allowing ranges -> drop packet by default + + bool sourceWanted = false; + std::list::iterator bitWidthIt = m_allowedIpRangeBitWidths.begin(); + std::list::iterator prefixIt = m_allowedIpRangePrefixes.begin(); + while(prefixIt != m_allowedIpRangePrefixes.end()) + { + unsigned int bitWidth = *bitWidthIt; + unsigned int prefix = *prefixIt; + unsigned int packetSourcePrefix = ntohl((unsigned int)ipHeader->ip_src.s_addr) >> bitWidth; + if(packetSourcePrefix == prefix) + { + sourceWanted = true; + break; + } + prefixIt++; + bitWidthIt++; + } + if(sourceWanted) + { + std::list::iterator bitWidthIt = m_allowedIpRangeBitWidths.begin(); + std::list::iterator prefixIt = m_allowedIpRangePrefixes.begin(); + while(prefixIt != m_allowedIpRangePrefixes.end()) + { + unsigned int bitWidth = *bitWidthIt; + unsigned int prefix = *prefixIt; + unsigned int packetDestPrefix = ntohl((unsigned int)ipHeader->ip_dest.s_addr) >> bitWidth; + if(packetDestPrefix == prefix) + { + wanted = true; + break; + } + prefixIt++; + bitWidthIt++; + } + } + } + // If source or destination IP address does match any existing blocking range, drop packet + std::list::iterator bitWidthIt = m_blockedIpRangeBitWidths.begin(); + std::list::iterator prefixIt = m_blockedIpRangePrefixes.begin(); + + while(prefixIt != m_blockedIpRangePrefixes.end() && wanted == true) + { + unsigned int bitWidth = *bitWidthIt; + unsigned int prefix = *prefixIt; + unsigned int packetSourcePrefix = ntohl((unsigned int)ipHeader->ip_src.s_addr) >> bitWidth; + unsigned int packetDestPrefix = ntohl((unsigned int)ipHeader->ip_dest.s_addr) >> bitWidth; + + if(packetSourcePrefix == prefix) + { + wanted = false; + } + if(packetDestPrefix == prefix) + { + wanted = false; + } + prefixIt++; + bitWidthIt++; + } + return wanted; +} + bool VoIpConfig::IsDeviceWanted(CStdString device) { if(device.Equals(m_device)) diff --git a/orkaudio/audiocaptureplugins/voip/VoIpConfig.h b/orkaudio/audiocaptureplugins/voip/VoIpConfig.h index 4a9ce83..5313389 100644 --- a/orkaudio/audiocaptureplugins/voip/VoIpConfig.h +++ b/orkaudio/audiocaptureplugins/voip/VoIpConfig.h @@ -18,6 +18,7 @@ #include "StdString.h" #include "Object.h" #include "boost/shared_ptr.hpp" +#include "PacketHeaderDefs.h" #define DEVICE_PARAM "Device" @@ -36,6 +37,7 @@ public: bool IsPartOfLan(struct in_addr); bool IsMediaGateway(struct in_addr); bool IsDeviceWanted(CStdString device); + bool IsPacketWanted(IpHeaderStruct* ipHeader); CStdString m_device; // old style but can still be used for specifying single device std::list m_devices; // new style devices csv @@ -43,8 +45,17 @@ public: std::list m_asciiMediaGateways; std::list m_lanMasks; std::list m_asciiLanMasks; + + std::list m_asciiAllowedIpRanges; // CIDR notation + std::list m_allowedIpRangePrefixes; + std::list m_allowedIpRangeBitWidths; + std::list m_asciiBlockedIpRanges; // CIDR notation + std::list m_blockedIpRangePrefixes; + std::list m_blockedIpRangeBitWidths; + CStdString m_pcapFile; CStdString m_pcapDirectory; + bool m_sipDropIndirectInvite; }; //======================================== diff --git a/orkaudio/filters/rtpmixer/RtpMixer.cpp b/orkaudio/filters/rtpmixer/RtpMixer.cpp index 6219b84..225af98 100644 --- a/orkaudio/filters/rtpmixer/RtpMixer.cpp +++ b/orkaudio/filters/rtpmixer/RtpMixer.cpp @@ -26,6 +26,7 @@ #include #include "Filter.h" #include "AudioCapture.h" +#include extern "C" { #include "g711.h" @@ -68,7 +69,7 @@ private: short* m_bufferEnd; short m_buffer[NUM_SAMPLES_CIRCULAR_BUFFER]; unsigned int m_shippedSamples; - + log4cxx::LoggerPtr m_log; }; RtpMixer::RtpMixer() @@ -78,7 +79,7 @@ RtpMixer::RtpMixer() m_bufferEnd = m_buffer + NUM_SAMPLES_CIRCULAR_BUFFER; m_writeTimestamp = 0; m_readTimestamp = 0; - //m_log = Logger::getLogger("rtpringbuffer"); + m_log = log4cxx::Logger::getLogger("rtpmixer"); m_shippedSamples = 0; } @@ -89,7 +90,20 @@ FilterRef RtpMixer::Instanciate() } void RtpMixer::AudioChunkIn(AudioChunkRef& chunk) -{ +{ + CStdString logMsg; + + if(chunk.get() == NULL) + { + LOG4CXX_DEBUG(m_log, "Null input chunk"); + return; + } + else if(chunk->GetNumSamples() == 0) + { + LOG4CXX_DEBUG(m_log, "Empty input chunk"); + return; + } + AudioChunkDetails* details = chunk->GetDetails(); if(details->m_encoding != PcmAudio) { @@ -144,9 +158,11 @@ void RtpMixer::AudioChunkIn(AudioChunkRef& chunk) { //LOG4CXX_DEBUG(m_log, m_capturePort + " packet too old, dropped"); } - CStdString debug; - debug.Format("free:%u used:%u wr:%x rd:%x wrts:%u rdts:%d", FreeSpace(), UsedSpace(), m_writePtr, m_readPtr, m_writeTimestamp, m_readTimestamp); - //LOG4CXX_DEBUG(m_log, debug); + if(m_log->isDebugEnabled()) + { + logMsg.Format("free:%u used:%u wr:%x rd:%x wrts:%u rdts:%d", FreeSpace(), UsedSpace(), m_writePtr, m_readPtr, m_writeTimestamp, m_readTimestamp); + LOG4CXX_DEBUG(m_log, logMsg); + } } void RtpMixer::AudioChunkOut(AudioChunkRef& chunk) diff --git a/orkbasecxx/Filter.cpp b/orkbasecxx/Filter.cpp index b1c0d4c..ae0c2c9 100644 --- a/orkbasecxx/Filter.cpp +++ b/orkbasecxx/Filter.cpp @@ -99,10 +99,25 @@ FilterRef AlawToPcmFilter::Instanciate() void AlawToPcmFilter::AudioChunkIn(AudioChunkRef& inputAudioChunk) { + m_outputAudioChunk.reset(); + + if(inputAudioChunk.get() == NULL) + { + return; + } + if(inputAudioChunk->GetNumSamples() == 0) + { + return; + } + AudioChunkDetails outputDetails = *inputAudioChunk->GetDetails(); + if(outputDetails.m_rtpPayloadType != GetInputRtpPayloadType()) + { + return; + } + // Create output buffer m_outputAudioChunk.reset(new AudioChunk()); - AudioChunkDetails outputDetails = *inputAudioChunk->GetDetails(); // pass through all details - outputDetails.m_rtpPayloadType = -1; // and override the ones that this filter changes + outputDetails.m_rtpPayloadType = -1; // Override details that this filter changes outputDetails.m_encoding = PcmAudio; int numSamples = inputAudioChunk->GetNumSamples(); @@ -114,7 +129,6 @@ void AlawToPcmFilter::AudioChunkIn(AudioChunkRef& inputAudioChunk) { outputBuffer[i] = (short)alaw2linear(inputBuffer[i]); } - } void AlawToPcmFilter::AudioChunkOut(AudioChunkRef& chunk) @@ -154,10 +168,26 @@ FilterRef UlawToPcmFilter::Instanciate() void UlawToPcmFilter::AudioChunkIn(AudioChunkRef& inputAudioChunk) { + m_outputAudioChunk.reset(); + + if(inputAudioChunk.get() == NULL) + { + return; + } + else if(inputAudioChunk->GetNumSamples() == 0) + { + return; + } + + AudioChunkDetails outputDetails = *inputAudioChunk->GetDetails(); + if(outputDetails.m_rtpPayloadType != GetInputRtpPayloadType()) + { + return; + } + // Create output buffer m_outputAudioChunk.reset(new AudioChunk()); - AudioChunkDetails outputDetails = *inputAudioChunk->GetDetails(); // pass through all details - outputDetails.m_rtpPayloadType = -1; // and override the ones that this filter changes + outputDetails.m_rtpPayloadType = -1; // Override details that this filter changes outputDetails.m_encoding = PcmAudio; int numSamples = inputAudioChunk->GetNumSamples(); @@ -168,8 +198,7 @@ void UlawToPcmFilter::AudioChunkIn(AudioChunkRef& inputAudioChunk) for(int i=0; i