diff options
author | Gerald Begumisa <ben_g@users.sourceforge.net> | 2007-12-02 00:09:47 +0000 |
---|---|---|
committer | Gerald Begumisa <ben_g@users.sourceforge.net> | 2007-12-02 00:09:47 +0000 |
commit | 292e5f35006a577f601e00bbbcddef17315c7b55 (patch) | |
tree | 9b261b170b89c6e1817db01bda7257dc905353d2 /orkbasecxx/Reporting.cpp | |
parent | 90c2a7f79d23b6daf595152a9039685d78f05815 (diff) |
Added support for reporting to multiple orktrack servers - the config.xml parameter TrackerHostname has been changed to accept a comma-separated list of tracker servers
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@514 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx/Reporting.cpp')
-rw-r--r-- | orkbasecxx/Reporting.cpp | 285 |
1 files changed, 262 insertions, 23 deletions
diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp index bf0f674..66a2a8a 100644 --- a/orkbasecxx/Reporting.cpp +++ b/orkbasecxx/Reporting.cpp @@ -22,19 +22,69 @@ #include "OrkClient.h" #include "Daemon.h" #include "CapturePluginProxy.h" +#include "ace/Thread_Manager.h" +#ifdef WIN32 +# ifndef snprintf +# define snprintf _snprintf +# endif +#endif TapeProcessorRef Reporting::m_singleton; +static std::map<CStdString, ReportingThreadInfoRef> s_reportingThreads; void Reporting::Initialize() { + CStdString logMsg; + if(m_singleton.get() == NULL) { m_singleton.reset(new Reporting()); + + for(std::list<CStdString>::iterator it = CONFIG.m_trackerHostname.begin(); it != CONFIG.m_trackerHostname.end(); it++) + { + CStdString trackerHostname = *it; + ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)malloc(sizeof(ReportingThreadInfo)); + + memset(rtInfo, 0, sizeof(ReportingThreadInfo)); + snprintf(rtInfo->m_serverHostname, sizeof(rtInfo->m_serverHostname), "%s", trackerHostname.c_str()); + rtInfo->m_serverPort = CONFIG.m_trackerTcpPort; + + if(!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(ReportingThreadEntryPoint), (void *)rtInfo)) + { + logMsg.Format("Failed to start thread reporting to %s,%d", rtInfo->m_serverHostname, rtInfo->m_serverPort); + LOG4CXX_WARN(LOG.reportingLog, logMsg); + free(rtInfo); + } + } + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); } } +void Reporting::ReportingThreadEntryPoint(void *args) +{ + ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)args; + ReportingThreadInfoRef rtInfoRef(new ReportingThreadInfo()); + ReportingThread myRunInfo; + + myRunInfo.m_serverHostname.Format("%s", rtInfo->m_serverHostname); + myRunInfo.m_serverPort = rtInfo->m_serverPort; + myRunInfo.m_threadId.Format("%s,%d", myRunInfo.m_serverHostname, myRunInfo.m_serverPort); + + snprintf(rtInfoRef->m_serverHostname, sizeof(rtInfoRef->m_serverHostname), "%s", rtInfo->m_serverHostname); + rtInfoRef->m_serverPort = rtInfo->m_serverPort; + rtInfoRef->m_numTapesToSkip = 0; + rtInfoRef->m_queueFullError = false; + snprintf(rtInfoRef->m_threadId, sizeof(rtInfoRef->m_threadId), "%s,%d", rtInfoRef->m_serverHostname, rtInfoRef->m_serverPort); + myRunInfo.m_myInfo = rtInfoRef; + + s_reportingThreads.insert(std::make_pair(myRunInfo.m_serverHostname, rtInfoRef)); + free(rtInfo); + + myRunInfo.Run(); +} + Reporting* Reporting::Instance() { return (Reporting*)m_singleton.get(); @@ -42,8 +92,8 @@ Reporting* Reporting::Instance() Reporting::Reporting() { - m_queueFullError = false; - numTapesToSkip = 0; + //m_queueFullError = false; + //numTapesToSkip = 0; } CStdString __CDECL__ Reporting::GetName() @@ -56,42 +106,80 @@ TapeProcessorRef Reporting::Instanciate() return m_singleton; } -void __CDECL__ Reporting::SkipTapes(int number) +void __CDECL__ Reporting::SkipTapes(int number, CStdString trackingServer) { - MutexSentinel sentinel(m_mutex); - numTapesToSkip++; -} + std::map<CStdString, ReportingThreadInfoRef>::iterator pair; -bool Reporting::IsSkip() -{ - MutexSentinel sentinel(m_mutex); - if(numTapesToSkip) + if(!trackingServer.size()) { - numTapesToSkip--; - return true; + if(s_reportingThreads.size() == 1) + { + pair = s_reportingThreads.begin(); + ReportingThreadInfoRef reportingThread = pair->second; + { + MutexSentinel sentinel(reportingThread->m_mutex); + reportingThread->m_numTapesToSkip += number; + + return; + } + } + else + { + return; + } + } + + pair = s_reportingThreads.find(trackingServer); + if(pair != s_reportingThreads.end()) + { + ReportingThreadInfoRef reportingThread = pair->second; + { + MutexSentinel sentinel(reportingThread->m_mutex); + reportingThread->m_numTapesToSkip += number; + } } - return false; } + void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) { - if (m_audioTapeQueue.push(audioTapeRef)) - { - LOG4CXX_DEBUG(LOG.reportingLog, CStdString("added audiotape to queue:") + audioTapeRef->GetIdentifier()); - m_queueFullError = false; - } - else + std::map<CStdString, ReportingThreadInfoRef>::iterator pair; + CStdString logMsg; + + for(pair = s_reportingThreads.begin(); pair != s_reportingThreads.end(); pair++) { - if(m_queueFullError == false) + ReportingThreadInfoRef reportingThread = pair->second; + + if(reportingThread->m_audioTapeQueue.push(audioTapeRef)) { - m_queueFullError = true; - LOG4CXX_ERROR(LOG.reportingLog, CStdString("queue full")); + reportingThread->m_queueFullError = false; + logMsg.Format("[%s] added audiotape to queue: %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + LOG4CXX_INFO(LOG.reportingLog, logMsg); + } + else + { + if(reportingThread->m_queueFullError == false) + { + logMsg.Format("[%s] queue full, could not add audiotape %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + LOG4CXX_WARN(LOG.reportingLog, logMsg); + reportingThread->m_queueFullError = true; + } } } } void Reporting::ThreadHandler(void *args) { + int humptyDumptySatOnAWall; + + humptyDumptySatOnAWall = 0; + + return; +} + +#if 0 +void Reporting::ThreadHandler(void *args) +{ CStdString processorName("Reporting"); TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName); if(reporting.get() == NULL) @@ -223,6 +311,7 @@ void Reporting::ThreadHandler(void *args) } LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread")); } +#endif //======================================================= #define REPORTING_SKIP_TAPE_CLASS "reportingskiptape" @@ -238,6 +327,7 @@ void ReportingSkipTapeMsg::Define(Serializer* s) CStdString thisClass(REPORTING_SKIP_TAPE_CLASS); s->StringValue(OBJECT_TYPE_TAG, thisClass, true); s->IntValue("num", (int&)m_number, false); + s->StringValue("tracker", m_tracker, false); } @@ -259,7 +349,7 @@ ObjectRef ReportingSkipTapeMsg::Process() Reporting* reporting = Reporting::Instance(); if(reporting) { - reporting->SkipTapes(m_number); + reporting->SkipTapes(m_number, m_tracker); } SimpleResponseMsg* msg = new SimpleResponseMsg; @@ -270,4 +360,153 @@ ObjectRef ReportingSkipTapeMsg::Process() } +//======================================================= +ReportingThread::ReportingThread() +{ + m_serverPort = 0; + m_serverHostname = "0.0.0.0"; +} + +bool ReportingThread::IsSkip() +{ + MutexSentinel sentinel(m_myInfo->m_mutex); + + if(m_myInfo->m_numTapesToSkip > 0) + { + m_myInfo->m_numTapesToSkip--; + return true; + } + + return false; +} + +void ReportingThread::Run() +{ + CStdString logMsg; + + logMsg.Format("Thread reporting to %s started", m_threadId); + LOG4CXX_INFO(LOG.reportingLog, logMsg); + + bool stop = false; + bool reportError = true; + time_t reportErrorLastTime = 0; + bool error = false; + + for(;stop == false;) + { + try + { + AudioTapeRef audioTapeRef = m_myInfo->m_audioTapeQueue.pop(); + + if(audioTapeRef.get() == NULL) + { + if(Daemon::Singleton()->IsStopping()) + { + stop = true; + } + } + else + { + + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get(); + //bool startMsg = false; + bool realtimeMessage = false; + + if(msgRef.get() && CONFIG.m_enableReporting) + { + //if(ptapeMsg->m_stage.Equals("START")) + //{ + // startMsg = true; + //} + if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop")) + { + realtimeMessage = true; + } + + CStdString msgAsSingleLineString = msgRef->SerializeSingleLine(); + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + msgAsSingleLineString); + + OrkHttpSingleLineClient c; + TapeResponseRef tr(new TapeResponse()); + audioTapeRef->m_tapeResponse = tr; + + bool success = false; + + while (!success && !IsSkip()) + { + if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), m_serverHostname, m_serverPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + { + success = true; + reportError = true; // reenable error reporting + if(error) + { + error = false; + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Orktrack successfully contacted")); + } + + if(tr->m_deleteTape) + { + CStdString tapeFilename = audioTapeRef->GetFilename(); + + CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; + if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) + { + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + "Deleted tape: " + tapeFilename); + } + else + { + LOG4CXX_DEBUG(LOG.reportingLog, "[" + m_threadId + "] " + "Could not delete tape: " + tapeFilename); + } + + } + else + { + // Tape is wanted + if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start")) + { + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty); + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty); + } + } + //else + //{ + // if(!startMsg) + // { + // // Pass the tape to the next processor + // pReporting->Runsftp NextProcessor(audioTapeRef); + // } + //} + } + else + { + error = true; + + if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute + { + reportError = false; + reportErrorLastTime = time(NULL); + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Could not contact orktrack")); + } + if(realtimeMessage) + { + success = true; // No need to resend realtime messages + } + else + { + ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem + } + } + } + } + } + } + catch (CStdString& e) + { + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exception: ") + e); + } + } + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exiting thread")); +} |