diff options
author | Henri Herscher <henri@oreka.org> | 2006-10-11 16:59:19 +0000 |
---|---|---|
committer | Henri Herscher <henri@oreka.org> | 2006-10-11 16:59:19 +0000 |
commit | a7f1e318b8cea70aea786580ef9b15e5e3afcd23 (patch) | |
tree | 56a0e52bf83c880184ba346646d56e72a77f3ab5 | |
parent | 6bc7a0c92eefd1d9374348f3d014549f78bc713c (diff) |
There is now an orkaudio wide tracking ID for recording sessions. All logging related to a recording session includes this tracking ID. The recording file name now uses this tracking ID instead of the "port" name which used to be IP+TcpPort of one endpoint. Also introduced the notion of OrkUid which is an unique identifier based on timestamp + tracking ID. OrkUid is not yet reported to orktrack.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@342 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r-- | orkaudio/BatchProcessing.cpp | 17 | ||||
-rw-r--r-- | orkaudio/CapturePort.cpp | 6 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/voip/RtpSession.cpp | 58 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/voip/RtpSession.h | 3 | ||||
-rw-r--r-- | orkbasecxx/AudioCapture.cpp | 6 | ||||
-rw-r--r-- | orkbasecxx/AudioCapture.h | 6 | ||||
-rw-r--r-- | orkbasecxx/AudioTape.cpp | 22 | ||||
-rw-r--r-- | orkbasecxx/AudioTape.h | 2 |
8 files changed, 86 insertions, 34 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp index 4cb9214..207ec7d 100644 --- a/orkaudio/BatchProcessing.cpp +++ b/orkaudio/BatchProcessing.cpp @@ -98,10 +98,12 @@ void BatchProcessing::ThreadHandler(void *args) { AudioFileRef fileRef; AudioFileRef outFileRef; + AudioTapeRef audioTapeRef; + CStdString trackingId = "[no-trk]"; try { - AudioTapeRef audioTapeRef = pBatchProcessing->m_audioTapeQueue.pop(); + audioTapeRef = pBatchProcessing->m_audioTapeQueue.pop(); if(audioTapeRef.get() == NULL) { if(DaemonSingleton::instance()->IsStopping()) @@ -116,10 +118,11 @@ void BatchProcessing::ThreadHandler(void *args) else { fileRef = audioTapeRef->GetAudioFileRef(); + trackingId = audioTapeRef->m_trackingId; // Let's work on the tape we have pulled //CStdString threadIdString = IntToString(threadId); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " processing " + audioTapeRef->GetIdentifier()); //fileRef->MoveOrig(); // #### could do this only when original and output file have the same extension. Irrelevant for now as everything is captured as mcf file fileRef->Open(AudioFile::READ); @@ -166,7 +169,7 @@ void BatchProcessing::ThreadHandler(void *args) if(firstChunk && details.m_rtpPayloadType != -1) { CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " RTP payload type:" + rtpPayloadType); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " RTP payload type:" + rtpPayloadType); CStdString filterName("RtpMixer"); filter = FilterRegistry::instance()->GetNewFilter(filterName); @@ -238,13 +241,13 @@ void BatchProcessing::ThreadHandler(void *args) fileRef->Close(); outFileRef->Close(); - logMsg.Format("Th%s stop, num samples: s1:%u s2:%u out:%u", threadIdString, numSamplesS1, numSamplesS2, numSamplesOut); + logMsg.Format("[%s] Th%s stop: num samples: s1:%u s2:%u out:%u", trackingId, threadIdString, numSamplesS1, numSamplesS2, numSamplesOut); LOG4CXX_INFO(LOG.batchProcessingLog, logMsg); if(CONFIG.m_deleteNativeFile) { fileRef->Delete(); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); } // Finished processing the tape, pass on to next processor @@ -253,12 +256,12 @@ void BatchProcessing::ThreadHandler(void *args) } catch (CStdString& e) { - LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " " + e); + LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " " + e); if(fileRef.get()) {fileRef->Close();} if(outFileRef.get()) {outFileRef->Close();} if(CONFIG.m_deleteFailedCaptureFile && fileRef.get() != NULL) { - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native and transcoded"); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native and transcoded"); if(fileRef.get()) {fileRef->Delete();} if(outFileRef.get()) {outFileRef->Delete();} } diff --git a/orkaudio/CapturePort.cpp b/orkaudio/CapturePort.cpp index a1a761c..005b68d 100644 --- a/orkaudio/CapturePort.cpp +++ b/orkaudio/CapturePort.cpp @@ -214,7 +214,7 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) audioTapeRef->AddCaptureEvent(eventRef, true); m_audioTapeRef = audioTapeRef; - LOG4CXX_INFO(s_log, "#" + m_id + ": start"); + LOG4CXX_INFO(s_log, "[" + m_audioTapeRef->m_trackingId + "] #" + m_id + " start"); } if (!audioTapeRef.get()) @@ -232,7 +232,7 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) case CaptureEvent::EtStop: m_capturing = false; - LOG4CXX_INFO(s_log, "#" + m_id + ": stop"); + LOG4CXX_INFO(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " stop"); audioTapeRef->AddCaptureEvent(eventRef, true); if (m_audioTapeRef->GetAudioFileRef().get()) @@ -243,7 +243,7 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) else { // Received a stop but there is no valid audio file associated with the tape - LOG4CXX_WARN(s_log, "#" + m_id + ": no audio reported between last start and stop"); + LOG4CXX_WARN(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " no audio reported between last start and stop"); } break; case CaptureEvent::EtEndMetadata: diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp index 2aac32f..6536946 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp @@ -47,12 +47,13 @@ RtpSession::RtpSession(CStdString& trackingId) m_started = false; m_stopped = false; m_rtpTimestampCorrectiveDelta = 0; + m_beginDate = 0; } void RtpSession::Stop() { CStdString logMsg; - logMsg.Format("%s: %s Session stop, num RTP packets:%d, last updated:%u", m_trackingId, m_capturePort, m_numRtpPackets, m_lastUpdated); + logMsg.Format("[%s] %s Session stop, num RTP packets:%d, last updated:%u", m_trackingId, m_capturePort, m_numRtpPackets, m_lastUpdated); LOG4CXX_INFO(m_log, logMsg); if(m_started && !m_stopped) @@ -68,15 +69,26 @@ void RtpSession::Stop() void RtpSession::Start() { m_started = true; - //m_rtpRingBuffer.SetCapturePort(m_capturePort); + m_beginDate = time(NULL); + GenerateOrkUid(); CaptureEventRef startEvent(new CaptureEvent); startEvent->m_type = CaptureEvent::EtStart; - startEvent->m_timestamp = time(NULL); + startEvent->m_timestamp = m_beginDate; + startEvent->m_value = m_trackingId; CStdString timestamp = IntToString(startEvent->m_timestamp); - LOG4CXX_INFO(m_log, m_trackingId + ": " + m_capturePort + " " + ProtocolToString(m_protocol) + " Session start, timestamp:" + timestamp); + LOG4CXX_INFO(m_log, "[" + m_trackingId + "] " + m_capturePort + " " + ProtocolToString(m_protocol) + " Session start, timestamp:" + timestamp); g_captureEventCallBack(startEvent, m_capturePort); } +void RtpSession::GenerateOrkUid() +{ + struct tm date = {0}; + ACE_OS::localtime_r(&m_beginDate, &date); + int month = date.tm_mon + 1; // january=0, decembre=11 + int year = date.tm_year + 1900; + m_orkUid.Format("%.4d%.2d%.2d_%.2d%.2d%.2d_%s", year, month, date.tm_mday, date.tm_hour, date.tm_min, date.tm_sec, m_trackingId); +} + void RtpSession::ProcessMetadataSipIncoming() { m_remoteParty = m_invite->m_from; @@ -179,7 +191,7 @@ void RtpSession::ProcessMetadataSip(RtpPacketInfoRef& rtpPacket) } else { - LOG4CXX_ERROR(m_log, m_trackingId + ": " + m_ipAndPort + " alien RTP packet"); + LOG4CXX_ERROR(m_log, "[" + m_trackingId + "] " + m_ipAndPort + " alien RTP packet"); } // work out capture port and direction @@ -296,6 +308,12 @@ void RtpSession::ReportMetadata() event->m_value = szRemoteIp; g_captureEventCallBack(event, m_capturePort); + // Report OrkUid + event.reset(new CaptureEvent()); + event->m_type = CaptureEvent::EtOrkUid; + event->m_value = m_orkUid; + g_captureEventCallBack(event, m_capturePort); + // Report end of metadata event.reset(new CaptureEvent()); event->m_type = CaptureEvent::EtEndMetadata; @@ -348,7 +366,7 @@ bool RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) if(m_log->isInfoEnabled()) { rtpPacket->ToString(logMsg); - logMsg = m_trackingId + ": " + "1st packet s1: " + logMsg; + logMsg = "[" + m_trackingId + "] 1st packet s1: " + logMsg; LOG4CXX_INFO(m_log, logMsg); } } @@ -371,7 +389,7 @@ bool RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) if(m_log->isInfoEnabled()) { rtpPacket->ToString(logMsg); - logMsg = m_trackingId + ": " + "1st packet s2: " + logMsg; + logMsg = "[" + m_trackingId + "] 1st packet s2: " + logMsg; LOG4CXX_INFO(m_log, logMsg); } } @@ -392,7 +410,7 @@ bool RtpSession::AddRtpPacket(RtpPacketInfoRef& rtpPacket) if(m_log->isDebugEnabled()) { CStdString debug; - debug.Format("%s: %s: Add RTP packet ts:%u, corrected ts:%u, arrival:%u, channel:%d", m_trackingId, m_capturePort, rtpPacket->m_timestamp, correctedRtpTimestamp, rtpPacket->m_arrivalTimestamp, channel); + debug.Format("[%s] %s: Add RTP packet ts:%u, corrected ts:%u, arrival:%u, channel:%d", m_trackingId, m_capturePort, rtpPacket->m_timestamp, correctedRtpTimestamp, rtpPacket->m_arrivalTimestamp, channel); LOG4CXX_DEBUG(m_log, debug); } @@ -519,7 +537,7 @@ void RtpSessions::ReportSipInvite(SipInviteInfoRef& invite) // ... and reinsert m_byIpAndPort.insert(std::make_pair(session->m_ipAndPort, session)); - LOG4CXX_INFO(m_log, session->m_trackingId + ": updated with new INVITE data"); + LOG4CXX_INFO(m_log, "[" + session->m_trackingId + "] updated with new INVITE data"); } return; } @@ -537,7 +555,7 @@ void RtpSessions::ReportSipInvite(SipInviteInfoRef& invite) CStdString numSessions = IntToString(m_byIpAndPort.size()); LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); - LOG4CXX_INFO(m_log, trackingId + ": created by SIP INVITE"); + LOG4CXX_INFO(m_log, "[" + trackingId + "] created by SIP INVITE"); } void RtpSessions::ReportSipBye(SipByeInfo bye) @@ -595,7 +613,7 @@ void RtpSessions::ReportSkinnyCallInfo(SkCallInfoStruct* callInfo, IpHeaderStruc char szEndPointIp[16]; ACE_OS::inet_ntop(AF_INET, (void*)&session->m_endPointIp, szEndPointIp, sizeof(szEndPointIp)); - logMsg.Format("%s: Skinny CallInfo callId %s local:%s remote:%s dir:%s endpoint:%s", session->m_trackingId, + logMsg.Format("[%s] Skinny CallInfo callId %s local:%s remote:%s dir:%s endpoint:%s", session->m_trackingId, session->m_callId, session->m_localParty, session->m_remoteParty, dir, szEndPointIp); LOG4CXX_INFO(m_log, logMsg); } @@ -639,7 +657,7 @@ void RtpSessions::ChangeCallId(RtpSessionRef& session, unsigned int newId) if(m_log->isInfoEnabled()) { CStdString logMsg; - logMsg.Format("%s: callId %s becomes %s", session->m_trackingId, oldCallId, newCallId); + logMsg.Format("[%s] callId %s becomes %s", session->m_trackingId, oldCallId, newCallId); LOG4CXX_INFO(m_log, logMsg); } } @@ -666,7 +684,7 @@ void RtpSessions::SetMediaAddress(RtpSessionRef& session, struct in_addr mediaIp char szEndPointIp[16]; ACE_OS::inet_ntop(AF_INET, (void*)&session->m_endPointIp, szEndPointIp, sizeof(szEndPointIp)); CStdString logMsg; - logMsg.Format("%s: callId %s media address:%s endpoint:%s", session->m_trackingId, session->m_callId, ipAndPort, szEndPointIp); + logMsg.Format("[%s] callId %s media address:%s endpoint:%s", session->m_trackingId, session->m_callId, ipAndPort, szEndPointIp); LOG4CXX_INFO(m_log, logMsg); } @@ -698,7 +716,7 @@ void RtpSessions::ReportSkinnyOpenReceiveChannelAck(SkOpenReceiveChannelAckStruc } else { - LOG4CXX_DEBUG(m_log, session->m_trackingId + ": OpenReceiveChannelAck: session already got media address signalling"); + LOG4CXX_DEBUG(m_log, "[" + session->m_trackingId + "] OpenReceiveChannelAck: session already got media address signalling"); } } else @@ -721,7 +739,7 @@ void RtpSessions::ReportSkinnyStartMediaTransmission(SkStartMediaTransmissionStr } else { - LOG4CXX_DEBUG(m_log, session->m_trackingId + ": StartMediaTransmission: session already got media address signalling"); + LOG4CXX_DEBUG(m_log, "[" + session->m_trackingId + "] StartMediaTransmission: session already got media address signalling"); } } else @@ -760,7 +778,7 @@ void RtpSessions::ReportSkinnyStopMediaTransmission(SkStopMediaTransmissionStruc if(m_log->isInfoEnabled()) { CStdString logMsg; - logMsg.Format("%s: Skinny StopMedia conferenceId:%s passThruPartyId:%s", session->m_trackingId, conferenceId, passThruPartyId); + logMsg.Format("[%s] Skinny StopMedia conferenceId:%s passThruPartyId:%s", session->m_trackingId, conferenceId, passThruPartyId); LOG4CXX_INFO(m_log, logMsg); } @@ -925,7 +943,7 @@ void RtpSessions::ReportRtpPacket(RtpPacketInfoRef& rtpPacket) if(m_log->isInfoEnabled()) { CStdString debug; - debug.Format("Merging session %s %s with callid:%s into session %s %s with callid:%s", + debug.Format("Merging [%s] %s with callid:%s into [%s] %s with callid:%s", mergeeSession->m_trackingId, mergeeSession->m_ipAndPort, mergeeSession->m_callId, mergerSession->m_trackingId, mergerSession->m_ipAndPort, mergerSession->m_callId); LOG4CXX_INFO(m_log, debug); @@ -946,7 +964,7 @@ void RtpSessions::ReportRtpPacket(RtpPacketInfoRef& rtpPacket) CStdString numSessions = IntToString(m_byIpAndPort.size()); LOG4CXX_DEBUG(m_log, CStdString("ByIpAndPort: ") + numSessions); - LOG4CXX_INFO(m_log, trackingId + ": created by RTP packet"); + LOG4CXX_INFO(m_log, "[" + trackingId + "] created by RTP packet"); } } @@ -987,7 +1005,7 @@ void RtpSessions::Hoover(time_t now) for (std::list<RtpSessionRef>::iterator it = toDismiss.begin(); it != toDismiss.end() ; it++) { RtpSessionRef session = *it; - LOG4CXX_INFO(m_log, session->m_trackingId + ": " + session->m_ipAndPort + " Expired"); + LOG4CXX_INFO(m_log, "[" + session->m_trackingId + "] " + session->m_ipAndPort + " Expired"); Stop(session); } @@ -1006,7 +1024,7 @@ void RtpSessions::Hoover(time_t now) for (std::list<RtpSessionRef>::iterator it2 = toDismiss.begin(); it2 != toDismiss.end() ; it2++) { RtpSessionRef session = *it2; - LOG4CXX_INFO(m_log, session->m_trackingId + ": " + session->m_ipAndPort + " Expired"); + LOG4CXX_INFO(m_log, "[" + session->m_trackingId + "] " + session->m_ipAndPort + " Expired"); Stop(session); } } diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.h b/orkaudio/audiocaptureplugins/voip/RtpSession.h index 96ed18c..5166cc1 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.h +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.h @@ -93,6 +93,7 @@ private: void ProcessMetadataRawRtp(RtpPacketInfoRef&); void ProcessMetadataSkinny(RtpPacketInfoRef& rtpPacket); void ReportMetadata(); + void GenerateOrkUid(); RtpPacketInfoRef m_lastRtpPacket; RtpPacketInfoRef m_lastRtpPacketSide1; @@ -112,6 +113,8 @@ private: bool m_stopped; unsigned int m_rtpTimestampCorrectiveDelta; bool m_rtpTimestampCorrectiveSign; + time_t m_beginDate; + CStdString m_orkUid; }; typedef boost::shared_ptr<RtpSession> RtpSessionRef; diff --git a/orkbasecxx/AudioCapture.cpp b/orkbasecxx/AudioCapture.cpp index dc72935..93830c7 100644 --- a/orkbasecxx/AudioCapture.cpp +++ b/orkbasecxx/AudioCapture.cpp @@ -233,6 +233,8 @@ CStdString CaptureEvent::EventTypeToString(int eventTypeEnum) return ET_LOCALMAC; case EtRemoteMac: return ET_REMOTEMAC; + case EtOrkUid: + return ET_ORKUID; case EtEndMetadata: return ET_ENDMETADATA; } @@ -286,6 +288,10 @@ int CaptureEvent::EventTypeToEnum(CStdString& eventTypeString) { eventTypeEnum = EtRemoteMac; } + else if (eventTypeString.CompareNoCase(ET_ORKUID) == 0) + { + eventTypeEnum = EtOrkUid; + } else if (eventTypeString.CompareNoCase(ET_ENDMETADATA) == 0) { eventTypeEnum = EtEndMetadata; diff --git a/orkbasecxx/AudioCapture.h b/orkbasecxx/AudioCapture.h index 1d6e8e9..6c257ba 100644 --- a/orkbasecxx/AudioCapture.h +++ b/orkbasecxx/AudioCapture.h @@ -120,6 +120,7 @@ public: #define ET_REMOTEIP "remoteip" #define ET_LOCALMAC "localmac" #define ET_REMOTEMAC "remotemac" +#define ET_ORKUID "orkuid" #define ET_ENDMETADATA "endmetadata" #define ET_INVALID "invalid" typedef enum @@ -136,8 +137,9 @@ public: EtRemoteIp = 9, EtLocalMac = 10, EtRemoteMac = 11, - EtEndMetadata = 12, - EtInvalid = 13 + EtOrkUid = 12, + EtEndMetadata = 13, + EtInvalid = 14 } EventTypeEnum; static CStdString EventTypeToString(int eventTypeEnum); static int EventTypeToEnum(CStdString&); diff --git a/orkbasecxx/AudioTape.cpp b/orkbasecxx/AudioTape.cpp index 9bb7239..30cd9ef 100644 --- a/orkbasecxx/AudioTape.cpp +++ b/orkbasecxx/AudioTape.cpp @@ -79,6 +79,7 @@ AudioTape::AudioTape(CStdString &portId) m_direction = CaptureEvent::DirUnkn; m_shouldStop = false; m_readyForBatchProcessing = false; + m_trackingId = portId; // to make sure this has a value before we get the capture tracking Id. GenerateFilePathAndIdentifier(); } @@ -171,13 +172,13 @@ void AudioTape::Write() // Compute RMS, RMS dB and log CStdString rmsString; rmsString.Format("%.1f dB:%.1f", chunkRef.get()->ComputeRms(), chunkRef.get()->ComputeRmsDb()); - LOG4CXX_INFO(LOG.portLog, m_portId + " RMS: " + rmsString); + LOG4CXX_INFO(LOG.portLog, "[" + m_trackingId + "] RMS: " + rmsString); } } } catch (CStdString& e) { - LOG4CXX_INFO(LOG.portLog, "#" + m_portId + ": " + e); + LOG4CXX_INFO(LOG.portLog, "[" + m_trackingId + "] " + e); m_state = StateError; } } @@ -208,6 +209,15 @@ void AudioTape::AddCaptureEvent(CaptureEventRef eventRef, bool send) // Extract useful info from well known events switch(eventRef->m_type) { + case CaptureEvent::EtStart: + m_trackingId = eventRef->m_value; + if (m_state == StateCreated) + { + // Media chunk stream not yet started, we can update begin date with the actual capture begin date + m_beginDate = eventRef->m_timestamp; + GenerateFilePathAndIdentifier(); + } + break; case CaptureEvent::EtStop: m_shouldStop = true; @@ -247,6 +257,14 @@ void AudioTape::AddCaptureEvent(CaptureEventRef eventRef, bool send) case CaptureEvent::EtRemoteIp: m_remoteIp = eventRef->m_value; break; + case CaptureEvent::EtOrkUid: + m_orkUid = eventRef->m_value; + if (m_state == StateCreated) + { + // Media chunk stream not yet started, we can set the mcf file name to be the Oreka Unique ID + m_fileIdentifier = m_orkUid; + } + break; } // Store the capture event locally diff --git a/orkbasecxx/AudioTape.h b/orkbasecxx/AudioTape.h index 2e212a2..fc93249 100644 --- a/orkbasecxx/AudioTape.h +++ b/orkbasecxx/AudioTape.h @@ -89,6 +89,7 @@ public: time_t m_duration; CStdString m_localIp; CStdString m_remoteIp; + CStdString m_trackingId; TapeResponseRef m_tapeResponse; @@ -109,6 +110,7 @@ private: StateEnum m_state; bool m_shouldStop; bool m_readyForBatchProcessing; + CStdString m_orkUid; }; typedef boost::shared_ptr<AudioTape> AudioTapeRef; |