diff options
author | Gerald Begumisa <ben_g@users.sourceforge.net> | 2007-12-02 00:09:47 +0000 |
---|---|---|
committer | Gerald Begumisa <ben_g@users.sourceforge.net> | 2007-12-02 00:09:47 +0000 |
commit | 292e5f35006a577f601e00bbbcddef17315c7b55 (patch) | |
tree | 9b261b170b89c6e1817db01bda7257dc905353d2 /orkbasecxx | |
parent | 90c2a7f79d23b6daf595152a9039685d78f05815 (diff) |
Added support for reporting to multiple orktrack servers - the config.xml parameter TrackerHostname has been changed to accept a comma-separated list of tracker servers
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@514 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx')
-rw-r--r-- | orkbasecxx/Config.cpp | 5 | ||||
-rw-r--r-- | orkbasecxx/Config.h | 2 | ||||
-rw-r--r-- | orkbasecxx/Reporting.cpp | 285 | ||||
-rw-r--r-- | orkbasecxx/Reporting.h | 46 |
4 files changed, 306 insertions, 32 deletions
diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp index c9f62dd..27adb30 100644 --- a/orkbasecxx/Config.cpp +++ b/orkbasecxx/Config.cpp @@ -37,7 +37,7 @@ Config::Config() m_vadHighThresholdDb = VAD_HIGH_THRESHOLD_DB_DEFAULT; m_vadLowThresholdDb = VAD_LOW_THRESHOLD_DB_DEFAULT; m_vadHoldOnSec = VAD_HOLD_ON_SEC_DEFAULT; - m_trackerHostname = TRACKER_HOSTNAME_DEFAULT; + m_trackerHostname.push_back(TRACKER_HOSTNAME_DEFAULT); m_trackerTcpPort = TRACKER_TCP_PORT_DEFAULT; m_trackerServicename = TRACKER_SERVICENAME_DEFAULT; m_audioOutputPath = AUDIO_OUTPUT_PATH_DEFAULT; @@ -87,7 +87,8 @@ void Config::Define(Serializer* s) s->DoubleValue(VAD_HIGH_THRESHOLD_DB_PARAM, m_vadHighThresholdDb); s->DoubleValue(VAD_LOW_THRESHOLD_DB_PARAM, m_vadLowThresholdDb); s->DoubleValue(VAD_HOLD_ON_SEC_PARAM, m_vadHoldOnSec); - s->StringValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname); + //s->StringValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname); + s->CsvValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname); s->IntValue(TRACKER_TCP_PORT_PARAM, m_trackerTcpPort); s->StringValue(TRACKER_SERVICENAME_PARAM, m_trackerServicename); s->StringValue(SERVICE_NAME_PARAM, m_serviceName); diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h index 5baf30d..fb98405 100644 --- a/orkbasecxx/Config.h +++ b/orkbasecxx/Config.h @@ -130,7 +130,7 @@ public: double m_vadHighThresholdDb; double m_vadLowThresholdDb; double m_vadHoldOnSec; - CStdString m_trackerHostname; + std::list<CStdString> m_trackerHostname; CStdString m_trackerServicename; int m_trackerTcpPort; CStdString m_serviceName; diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp index bf0f674..66a2a8a 100644 --- a/orkbasecxx/Reporting.cpp +++ b/orkbasecxx/Reporting.cpp @@ -22,19 +22,69 @@ #include "OrkClient.h" #include "Daemon.h" #include "CapturePluginProxy.h" +#include "ace/Thread_Manager.h" +#ifdef WIN32 +# ifndef snprintf +# define snprintf _snprintf +# endif +#endif TapeProcessorRef Reporting::m_singleton; +static std::map<CStdString, ReportingThreadInfoRef> s_reportingThreads; void Reporting::Initialize() { + CStdString logMsg; + if(m_singleton.get() == NULL) { m_singleton.reset(new Reporting()); + + for(std::list<CStdString>::iterator it = CONFIG.m_trackerHostname.begin(); it != CONFIG.m_trackerHostname.end(); it++) + { + CStdString trackerHostname = *it; + ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)malloc(sizeof(ReportingThreadInfo)); + + memset(rtInfo, 0, sizeof(ReportingThreadInfo)); + snprintf(rtInfo->m_serverHostname, sizeof(rtInfo->m_serverHostname), "%s", trackerHostname.c_str()); + rtInfo->m_serverPort = CONFIG.m_trackerTcpPort; + + if(!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(ReportingThreadEntryPoint), (void *)rtInfo)) + { + logMsg.Format("Failed to start thread reporting to %s,%d", rtInfo->m_serverHostname, rtInfo->m_serverPort); + LOG4CXX_WARN(LOG.reportingLog, logMsg); + free(rtInfo); + } + } + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); } } +void Reporting::ReportingThreadEntryPoint(void *args) +{ + ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)args; + ReportingThreadInfoRef rtInfoRef(new ReportingThreadInfo()); + ReportingThread myRunInfo; + + myRunInfo.m_serverHostname.Format("%s", rtInfo->m_serverHostname); + myRunInfo.m_serverPort = rtInfo->m_serverPort; + myRunInfo.m_threadId.Format("%s,%d", myRunInfo.m_serverHostname, myRunInfo.m_serverPort); + + snprintf(rtInfoRef->m_serverHostname, sizeof(rtInfoRef->m_serverHostname), "%s", rtInfo->m_serverHostname); + rtInfoRef->m_serverPort = rtInfo->m_serverPort; + rtInfoRef->m_numTapesToSkip = 0; + rtInfoRef->m_queueFullError = false; + snprintf(rtInfoRef->m_threadId, sizeof(rtInfoRef->m_threadId), "%s,%d", rtInfoRef->m_serverHostname, rtInfoRef->m_serverPort); + myRunInfo.m_myInfo = rtInfoRef; + + s_reportingThreads.insert(std::make_pair(myRunInfo.m_serverHostname, rtInfoRef)); + free(rtInfo); + + myRunInfo.Run(); +} + Reporting* Reporting::Instance() { return (Reporting*)m_singleton.get(); @@ -42,8 +92,8 @@ Reporting* Reporting::Instance() Reporting::Reporting() { - m_queueFullError = false; - numTapesToSkip = 0; + //m_queueFullError = false; + //numTapesToSkip = 0; } CStdString __CDECL__ Reporting::GetName() @@ -56,42 +106,80 @@ TapeProcessorRef Reporting::Instanciate() return m_singleton; } -void __CDECL__ Reporting::SkipTapes(int number) +void __CDECL__ Reporting::SkipTapes(int number, CStdString trackingServer) { - MutexSentinel sentinel(m_mutex); - numTapesToSkip++; -} + std::map<CStdString, ReportingThreadInfoRef>::iterator pair; -bool Reporting::IsSkip() -{ - MutexSentinel sentinel(m_mutex); - if(numTapesToSkip) + if(!trackingServer.size()) { - numTapesToSkip--; - return true; + if(s_reportingThreads.size() == 1) + { + pair = s_reportingThreads.begin(); + ReportingThreadInfoRef reportingThread = pair->second; + { + MutexSentinel sentinel(reportingThread->m_mutex); + reportingThread->m_numTapesToSkip += number; + + return; + } + } + else + { + return; + } + } + + pair = s_reportingThreads.find(trackingServer); + if(pair != s_reportingThreads.end()) + { + ReportingThreadInfoRef reportingThread = pair->second; + { + MutexSentinel sentinel(reportingThread->m_mutex); + reportingThread->m_numTapesToSkip += number; + } } - return false; } + void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) { - if (m_audioTapeQueue.push(audioTapeRef)) - { - LOG4CXX_DEBUG(LOG.reportingLog, CStdString("added audiotape to queue:") + audioTapeRef->GetIdentifier()); - m_queueFullError = false; - } - else + std::map<CStdString, ReportingThreadInfoRef>::iterator pair; + CStdString logMsg; + + for(pair = s_reportingThreads.begin(); pair != s_reportingThreads.end(); pair++) { - if(m_queueFullError == false) + ReportingThreadInfoRef reportingThread = pair->second; + + if(reportingThread->m_audioTapeQueue.push(audioTapeRef)) { - m_queueFullError = true; - LOG4CXX_ERROR(LOG.reportingLog, CStdString("queue full")); + reportingThread->m_queueFullError = false; + logMsg.Format("[%s] added audiotape to queue: %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + LOG4CXX_INFO(LOG.reportingLog, logMsg); + } + else + { + if(reportingThread->m_queueFullError == false) + { + logMsg.Format("[%s] queue full, could not add audiotape %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier()); + LOG4CXX_WARN(LOG.reportingLog, logMsg); + reportingThread->m_queueFullError = true; + } } } } void Reporting::ThreadHandler(void *args) { + int humptyDumptySatOnAWall; + + humptyDumptySatOnAWall = 0; + + return; +} + +#if 0 +void Reporting::ThreadHandler(void *args) +{ CStdString processorName("Reporting"); TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName); if(reporting.get() == NULL) @@ -223,6 +311,7 @@ void Reporting::ThreadHandler(void *args) } LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread")); } +#endif //======================================================= #define REPORTING_SKIP_TAPE_CLASS "reportingskiptape" @@ -238,6 +327,7 @@ void ReportingSkipTapeMsg::Define(Serializer* s) CStdString thisClass(REPORTING_SKIP_TAPE_CLASS); s->StringValue(OBJECT_TYPE_TAG, thisClass, true); s->IntValue("num", (int&)m_number, false); + s->StringValue("tracker", m_tracker, false); } @@ -259,7 +349,7 @@ ObjectRef ReportingSkipTapeMsg::Process() Reporting* reporting = Reporting::Instance(); if(reporting) { - reporting->SkipTapes(m_number); + reporting->SkipTapes(m_number, m_tracker); } SimpleResponseMsg* msg = new SimpleResponseMsg; @@ -270,4 +360,153 @@ ObjectRef ReportingSkipTapeMsg::Process() } +//======================================================= +ReportingThread::ReportingThread() +{ + m_serverPort = 0; + m_serverHostname = "0.0.0.0"; +} + +bool ReportingThread::IsSkip() +{ + MutexSentinel sentinel(m_myInfo->m_mutex); + + if(m_myInfo->m_numTapesToSkip > 0) + { + m_myInfo->m_numTapesToSkip--; + return true; + } + + return false; +} + +void ReportingThread::Run() +{ + CStdString logMsg; + + logMsg.Format("Thread reporting to %s started", m_threadId); + LOG4CXX_INFO(LOG.reportingLog, logMsg); + + bool stop = false; + bool reportError = true; + time_t reportErrorLastTime = 0; + bool error = false; + + for(;stop == false;) + { + try + { + AudioTapeRef audioTapeRef = m_myInfo->m_audioTapeQueue.pop(); + + if(audioTapeRef.get() == NULL) + { + if(Daemon::Singleton()->IsStopping()) + { + stop = true; + } + } + else + { + + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get(); + //bool startMsg = false; + bool realtimeMessage = false; + + if(msgRef.get() && CONFIG.m_enableReporting) + { + //if(ptapeMsg->m_stage.Equals("START")) + //{ + // startMsg = true; + //} + if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop")) + { + realtimeMessage = true; + } + + CStdString msgAsSingleLineString = msgRef->SerializeSingleLine(); + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + msgAsSingleLineString); + + OrkHttpSingleLineClient c; + TapeResponseRef tr(new TapeResponse()); + audioTapeRef->m_tapeResponse = tr; + + bool success = false; + + while (!success && !IsSkip()) + { + if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), m_serverHostname, m_serverPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + { + success = true; + reportError = true; // reenable error reporting + if(error) + { + error = false; + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Orktrack successfully contacted")); + } + + if(tr->m_deleteTape) + { + CStdString tapeFilename = audioTapeRef->GetFilename(); + + CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; + if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) + { + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + "Deleted tape: " + tapeFilename); + } + else + { + LOG4CXX_DEBUG(LOG.reportingLog, "[" + m_threadId + "] " + "Could not delete tape: " + tapeFilename); + } + + } + else + { + // Tape is wanted + if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start")) + { + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty); + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty); + } + } + //else + //{ + // if(!startMsg) + // { + // // Pass the tape to the next processor + // pReporting->Runsftp NextProcessor(audioTapeRef); + // } + //} + } + else + { + error = true; + + if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute + { + reportError = false; + reportErrorLastTime = time(NULL); + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Could not contact orktrack")); + } + if(realtimeMessage) + { + success = true; // No need to resend realtime messages + } + else + { + ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem + } + } + } + } + } + } + catch (CStdString& e) + { + LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exception: ") + e); + } + } + LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exiting thread")); +} diff --git a/orkbasecxx/Reporting.h b/orkbasecxx/Reporting.h index 7f7778d..5446467 100644 --- a/orkbasecxx/Reporting.h +++ b/orkbasecxx/Reporting.h @@ -18,6 +18,37 @@ #include "TapeProcessor.h" #include "AudioTape.h" + +struct ReportingThreadInfo +{ + char m_serverHostname[256]; + int m_serverPort; + int m_numTapesToSkip; + bool m_queueFullError; + char m_threadId[256]; + ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; + ACE_Thread_Mutex m_mutex; +}; +typedef boost::shared_ptr<ReportingThreadInfo> ReportingThreadInfoRef; + + +class ReportingThread +{ +public: + ReportingThread(); + void Run(); + + CStdString m_serverHostname; + int m_serverPort; + + CStdString m_threadId; + ReportingThreadInfoRef m_myInfo; +private: + bool IsSkip(); +}; + +//======================================================= + class DLL_IMPORT_EXPORT_ORKBASE Reporting : public TapeProcessor { public: @@ -27,22 +58,23 @@ public: CStdString __CDECL__ GetName(); TapeProcessorRef __CDECL__ Instanciate(); void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); - void __CDECL__ SkipTapes(int number); + void __CDECL__ SkipTapes(int number, CStdString trackingServer=""); //static Reporting* GetInstance(); static void ThreadHandler(void *args); + static void ReportingThreadEntryPoint(void *args); private: Reporting(); - bool IsSkip(); + //bool IsSkip(); //static Reporting m_reportingSingleton; static TapeProcessorRef m_singleton; - ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; - bool m_queueFullError; - int numTapesToSkip; - ACE_Thread_Mutex m_mutex; + //ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; + //bool m_queueFullError; + //int numTapesToSkip; + //ACE_Thread_Mutex m_mutex; }; class DLL_IMPORT_EXPORT_ORKBASE ReportingSkipTapeMsg : public SyncMessage @@ -58,7 +90,9 @@ public: ObjectRef Process(); int m_number; + CStdString m_tracker; }; + #endif |