diff options
author | Gerald Begumisa <ben_g@users.sourceforge.net> | 2008-05-07 17:54:52 +0000 |
---|---|---|
committer | Gerald Begumisa <ben_g@users.sourceforge.net> | 2008-05-07 17:54:52 +0000 |
commit | 233100139984e7ddf250d83166c778c84b917161 (patch) | |
tree | f652391c5bf5c72abe59de9758da0a412682addc /orkbasecxx | |
parent | 0ef1e5245dd3112c25001439b72386e9004cb545 (diff) |
Corrected error in reporting system where if more than one tracking server is used, only the first server gets a correct message, the rest getting the wrong messages (depending on what is in the event queue on the AudioTape object provided)
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@535 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx')
-rw-r--r-- | orkbasecxx/CapturePort.cpp | 22 | ||||
-rw-r--r-- | orkbasecxx/Reporting.cpp | 195 | ||||
-rw-r--r-- | orkbasecxx/Reporting.h | 4 |
3 files changed, 64 insertions, 157 deletions
diff --git a/orkbasecxx/CapturePort.cpp b/orkbasecxx/CapturePort.cpp index 46faff2..13d70e2 100644 --- a/orkbasecxx/CapturePort.cpp +++ b/orkbasecxx/CapturePort.cpp @@ -230,11 +230,14 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) case CaptureEvent::EtStart: break; case CaptureEvent::EtStop: - + { m_capturing = false; LOG4CXX_INFO(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " stop"); audioTapeRef->AddCaptureEvent(eventRef, true); - Reporting::Instance()->AddAudioTape(audioTapeRef); + + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + Reporting::Instance()->AddTapeMessage(msgRef); if (m_audioTapeRef->GetAudioFileRef().get()) { @@ -247,15 +250,26 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) LOG4CXX_WARN(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " no audio reported between last start and stop"); } break; + } case CaptureEvent::EtEndMetadata: + { // Now that all metadata has been acquired, we can generate the tape start message - Reporting::Instance()->AddAudioTape(audioTapeRef); + + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + Reporting::Instance()->AddTapeMessage(msgRef); + break; + } case CaptureEvent::EtUpdate: + { audioTapeRef->AddCaptureEvent(eventRef, true); // Generate tape update message - Reporting::Instance()->AddAudioTape(audioTapeRef); + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + Reporting::Instance()->AddTapeMessage(msgRef); break; + } case CaptureEvent::EtDirection: case CaptureEvent::EtRemoteParty: case CaptureEvent::EtLocalParty: diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp index 66a2a8a..54db288 100644 --- a/orkbasecxx/Reporting.cpp +++ b/orkbasecxx/Reporting.cpp @@ -140,27 +140,45 @@ void __CDECL__ Reporting::SkipTapes(int number, CStdString trackingServer) } } - -void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) +void Reporting::AddTapeMessage(MessageRef& messageRef) { std::map<CStdString, ReportingThreadInfoRef>::iterator pair; CStdString logMsg; - + TapeMsg *pTapeMsg = (TapeMsg*)messageRef.get(), *pRptTapeMsg; + MessageRef reportingMsgRef; + for(pair = s_reportingThreads.begin(); pair != s_reportingThreads.end(); pair++) { ReportingThreadInfoRef reportingThread = pair->second; - if(reportingThread->m_audioTapeQueue.push(audioTapeRef)) + reportingMsgRef.reset(new TapeMsg); + pRptTapeMsg = (TapeMsg*)reportingMsgRef.get(); + + pRptTapeMsg->m_recId = pTapeMsg->m_recId; + pRptTapeMsg->m_fileName = pTapeMsg->m_fileName; + pRptTapeMsg->m_stage = pTapeMsg->m_stage; + pRptTapeMsg->m_capturePort = pTapeMsg->m_capturePort; + pRptTapeMsg->m_localParty = pTapeMsg->m_localParty; + pRptTapeMsg->m_localEntryPoint = pTapeMsg->m_localEntryPoint; + pRptTapeMsg->m_remoteParty = pTapeMsg->m_remoteParty; + pRptTapeMsg->m_direction = pTapeMsg->m_direction; + pRptTapeMsg->m_duration = pTapeMsg->m_duration; + pRptTapeMsg->m_timestamp = pTapeMsg->m_timestamp; + pRptTapeMsg->m_localIp = pTapeMsg->m_localIp; + pRptTapeMsg->m_remoteIp = pTapeMsg->m_remoteIp; + pRptTapeMsg->m_nativeCallId = pTapeMsg->m_nativeCallId; + + if(reportingThread->m_messageQueue.push(reportingMsgRef)) { reportingThread->m_queueFullError = false; - logMsg.Format("[%s] added audiotape to queue: %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + logMsg.Format("[%s] added %s tape message to queue: %s", reportingThread->m_threadId, pTapeMsg->m_stage, pTapeMsg->m_recId); LOG4CXX_INFO(LOG.reportingLog, logMsg); } else { if(reportingThread->m_queueFullError == false) { - logMsg.Format("[%s] queue full, could not add audiotape %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + logMsg.Format("[%s] queue full, could not add tape message %s", reportingThread->m_threadId, pTapeMsg->m_recId); LOG4CXX_WARN(LOG.reportingLog, logMsg); reportingThread->m_queueFullError = true; } @@ -168,150 +186,27 @@ void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) } } -void Reporting::ThreadHandler(void *args) +void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) { - int humptyDumptySatOnAWall; - - humptyDumptySatOnAWall = 0; - - return; + // What to do? + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + audioTapeRef->GetMessage(msgRef); + + TapeMsg *pTapeMsg = (TapeMsg*)msgRef.get(); + + AddTapeMessage(msgRef); } -#if 0 void Reporting::ThreadHandler(void *args) { - CStdString processorName("Reporting"); - TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName); - if(reporting.get() == NULL) - { - LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting"); - return; - } - Reporting* pReporting = (Reporting*)(reporting->Instanciate().get()); - - bool stop = false; - bool reportError = true; - time_t reportErrorLastTime = 0; - bool error = false; - - for(;stop == false;) - { - try - { - AudioTapeRef audioTapeRef = pReporting->m_audioTapeQueue.pop(); - - if(audioTapeRef.get() == NULL) - { - if(Daemon::Singleton()->IsStopping()) - { - stop = true; - } - } - else - { - - MessageRef msgRef; - audioTapeRef->GetMessage(msgRef); - TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get(); - //bool startMsg = false; - bool realtimeMessage = false; - - if(msgRef.get() && CONFIG.m_enableReporting) - { - //if(ptapeMsg->m_stage.Equals("START")) - //{ - // startMsg = true; - //} - if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop")) - { - realtimeMessage = true; - } - - CStdString msgAsSingleLineString = msgRef->SerializeSingleLine(); - LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString); - - OrkHttpSingleLineClient c; - TapeResponseRef tr(new TapeResponse()); - audioTapeRef->m_tapeResponse = tr; - - bool success = false; - - while (!success && !pReporting->IsSkip()) - { - if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) - { - success = true; - reportError = true; // reenable error reporting - if(error) - { - error = false; - LOG4CXX_ERROR(LOG.reportingLog, CStdString("Orktrack successfully contacted")); - } - - if(tr->m_deleteTape) - { - CStdString tapeFilename = audioTapeRef->GetFilename(); - - CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; - if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) - { - LOG4CXX_INFO(LOG.reportingLog, "Deleted tape: " + tapeFilename); - } - else - { - LOG4CXX_DEBUG(LOG.reportingLog, "Could not delete tape: " + tapeFilename); - } - - } - else - { - // Tape is wanted - if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start")) - { - CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty); - CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty); - } - } - //else - //{ - // if(!startMsg) - // { - // // Pass the tape to the next processor - // pReporting->Runsftp NextProcessor(audioTapeRef); - // } - //} - } - else - { - error = true; - - if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute - { - reportError = false; - reportErrorLastTime = time(NULL); - LOG4CXX_ERROR(LOG.reportingLog, CStdString("Could not contact orktrack")); - } - if(realtimeMessage) - { - success = true; // No need to resend realtime messages - } - else - { - ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem - } - } - } - } - } - } - catch (CStdString& e) - { - LOG4CXX_ERROR(LOG.reportingLog, CStdString("Exception: ") + e); - } - } - LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread")); + return; } -#endif //======================================================= #define REPORTING_SKIP_TAPE_CLASS "reportingskiptape" @@ -321,7 +216,6 @@ ReportingSkipTapeMsg::ReportingSkipTapeMsg() m_number = 1; } - void ReportingSkipTapeMsg::Define(Serializer* s) { CStdString thisClass(REPORTING_SKIP_TAPE_CLASS); @@ -396,9 +290,9 @@ void ReportingThread::Run() { try { - AudioTapeRef audioTapeRef = m_myInfo->m_audioTapeQueue.pop(); + MessageRef msgRef = m_myInfo->m_messageQueue.pop(); - if(audioTapeRef.get() == NULL) + if(msgRef.get() == NULL) { if(Daemon::Singleton()->IsStopping()) { @@ -407,9 +301,6 @@ void ReportingThread::Run() } else { - - MessageRef msgRef; - audioTapeRef->GetMessage(msgRef); TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get(); //bool startMsg = false; bool realtimeMessage = false; @@ -430,7 +321,7 @@ void ReportingThread::Run() OrkHttpSingleLineClient c; TapeResponseRef tr(new TapeResponse()); - audioTapeRef->m_tapeResponse = tr; + //audioTapeRef->m_tapeResponse = tr; bool success = false; @@ -448,7 +339,7 @@ void ReportingThread::Run() if(tr->m_deleteTape) { - CStdString tapeFilename = audioTapeRef->GetFilename(); + CStdString tapeFilename = ptapeMsg->m_fileName; CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) diff --git a/orkbasecxx/Reporting.h b/orkbasecxx/Reporting.h index 5446467..141a0c8 100644 --- a/orkbasecxx/Reporting.h +++ b/orkbasecxx/Reporting.h @@ -26,7 +26,8 @@ struct ReportingThreadInfo int m_numTapesToSkip; bool m_queueFullError; char m_threadId[256]; - ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; + //ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; + ThreadSafeQueue<MessageRef> m_messageQueue; ACE_Thread_Mutex m_mutex; }; typedef boost::shared_ptr<ReportingThreadInfo> ReportingThreadInfoRef; @@ -58,6 +59,7 @@ public: CStdString __CDECL__ GetName(); TapeProcessorRef __CDECL__ Instanciate(); void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); + void __CDECL__ AddTapeMessage(MessageRef& messageRef); void __CDECL__ SkipTapes(int number, CStdString trackingServer=""); //static Reporting* GetInstance(); |