summaryrefslogtreecommitdiff
path: root/orkbasecxx
diff options
context:
space:
mode:
authorGerald Begumisa <ben_g@users.sourceforge.net>2007-12-02 00:09:47 +0000
committerGerald Begumisa <ben_g@users.sourceforge.net>2007-12-02 00:09:47 +0000
commit292e5f35006a577f601e00bbbcddef17315c7b55 (patch)
tree9b261b170b89c6e1817db01bda7257dc905353d2 /orkbasecxx
parent90c2a7f79d23b6daf595152a9039685d78f05815 (diff)
Added support for reporting to multiple orktrack servers - the config.xml parameter TrackerHostname has been changed to accept a comma-separated list of tracker servers
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@514 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx')
-rw-r--r--orkbasecxx/Config.cpp5
-rw-r--r--orkbasecxx/Config.h2
-rw-r--r--orkbasecxx/Reporting.cpp285
-rw-r--r--orkbasecxx/Reporting.h46
4 files changed, 306 insertions, 32 deletions
diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp
index c9f62dd..27adb30 100644
--- a/orkbasecxx/Config.cpp
+++ b/orkbasecxx/Config.cpp
@@ -37,7 +37,7 @@ Config::Config()
m_vadHighThresholdDb = VAD_HIGH_THRESHOLD_DB_DEFAULT;
m_vadLowThresholdDb = VAD_LOW_THRESHOLD_DB_DEFAULT;
m_vadHoldOnSec = VAD_HOLD_ON_SEC_DEFAULT;
- m_trackerHostname = TRACKER_HOSTNAME_DEFAULT;
+ m_trackerHostname.push_back(TRACKER_HOSTNAME_DEFAULT);
m_trackerTcpPort = TRACKER_TCP_PORT_DEFAULT;
m_trackerServicename = TRACKER_SERVICENAME_DEFAULT;
m_audioOutputPath = AUDIO_OUTPUT_PATH_DEFAULT;
@@ -87,7 +87,8 @@ void Config::Define(Serializer* s)
s->DoubleValue(VAD_HIGH_THRESHOLD_DB_PARAM, m_vadHighThresholdDb);
s->DoubleValue(VAD_LOW_THRESHOLD_DB_PARAM, m_vadLowThresholdDb);
s->DoubleValue(VAD_HOLD_ON_SEC_PARAM, m_vadHoldOnSec);
- s->StringValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname);
+ //s->StringValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname);
+ s->CsvValue(TRACKER_HOSTNAME_PARAM, m_trackerHostname);
s->IntValue(TRACKER_TCP_PORT_PARAM, m_trackerTcpPort);
s->StringValue(TRACKER_SERVICENAME_PARAM, m_trackerServicename);
s->StringValue(SERVICE_NAME_PARAM, m_serviceName);
diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h
index 5baf30d..fb98405 100644
--- a/orkbasecxx/Config.h
+++ b/orkbasecxx/Config.h
@@ -130,7 +130,7 @@ public:
double m_vadHighThresholdDb;
double m_vadLowThresholdDb;
double m_vadHoldOnSec;
- CStdString m_trackerHostname;
+ std::list<CStdString> m_trackerHostname;
CStdString m_trackerServicename;
int m_trackerTcpPort;
CStdString m_serviceName;
diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp
index bf0f674..66a2a8a 100644
--- a/orkbasecxx/Reporting.cpp
+++ b/orkbasecxx/Reporting.cpp
@@ -22,19 +22,69 @@
#include "OrkClient.h"
#include "Daemon.h"
#include "CapturePluginProxy.h"
+#include "ace/Thread_Manager.h"
+#ifdef WIN32
+# ifndef snprintf
+# define snprintf _snprintf
+# endif
+#endif
TapeProcessorRef Reporting::m_singleton;
+static std::map<CStdString, ReportingThreadInfoRef> s_reportingThreads;
void Reporting::Initialize()
{
+ CStdString logMsg;
+
if(m_singleton.get() == NULL)
{
m_singleton.reset(new Reporting());
+
+ for(std::list<CStdString>::iterator it = CONFIG.m_trackerHostname.begin(); it != CONFIG.m_trackerHostname.end(); it++)
+ {
+ CStdString trackerHostname = *it;
+ ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)malloc(sizeof(ReportingThreadInfo));
+
+ memset(rtInfo, 0, sizeof(ReportingThreadInfo));
+ snprintf(rtInfo->m_serverHostname, sizeof(rtInfo->m_serverHostname), "%s", trackerHostname.c_str());
+ rtInfo->m_serverPort = CONFIG.m_trackerTcpPort;
+
+ if(!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(ReportingThreadEntryPoint), (void *)rtInfo))
+ {
+ logMsg.Format("Failed to start thread reporting to %s,%d", rtInfo->m_serverHostname, rtInfo->m_serverPort);
+ LOG4CXX_WARN(LOG.reportingLog, logMsg);
+ free(rtInfo);
+ }
+ }
+
TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
}
}
+void Reporting::ReportingThreadEntryPoint(void *args)
+{
+ ReportingThreadInfo *rtInfo = (ReportingThreadInfo *)args;
+ ReportingThreadInfoRef rtInfoRef(new ReportingThreadInfo());
+ ReportingThread myRunInfo;
+
+ myRunInfo.m_serverHostname.Format("%s", rtInfo->m_serverHostname);
+ myRunInfo.m_serverPort = rtInfo->m_serverPort;
+ myRunInfo.m_threadId.Format("%s,%d", myRunInfo.m_serverHostname, myRunInfo.m_serverPort);
+
+ snprintf(rtInfoRef->m_serverHostname, sizeof(rtInfoRef->m_serverHostname), "%s", rtInfo->m_serverHostname);
+ rtInfoRef->m_serverPort = rtInfo->m_serverPort;
+ rtInfoRef->m_numTapesToSkip = 0;
+ rtInfoRef->m_queueFullError = false;
+ snprintf(rtInfoRef->m_threadId, sizeof(rtInfoRef->m_threadId), "%s,%d", rtInfoRef->m_serverHostname, rtInfoRef->m_serverPort);
+ myRunInfo.m_myInfo = rtInfoRef;
+
+ s_reportingThreads.insert(std::make_pair(myRunInfo.m_serverHostname, rtInfoRef));
+ free(rtInfo);
+
+ myRunInfo.Run();
+}
+
Reporting* Reporting::Instance()
{
return (Reporting*)m_singleton.get();
@@ -42,8 +92,8 @@ Reporting* Reporting::Instance()
Reporting::Reporting()
{
- m_queueFullError = false;
- numTapesToSkip = 0;
+ //m_queueFullError = false;
+ //numTapesToSkip = 0;
}
CStdString __CDECL__ Reporting::GetName()
@@ -56,42 +106,80 @@ TapeProcessorRef Reporting::Instanciate()
return m_singleton;
}
-void __CDECL__ Reporting::SkipTapes(int number)
+void __CDECL__ Reporting::SkipTapes(int number, CStdString trackingServer)
{
- MutexSentinel sentinel(m_mutex);
- numTapesToSkip++;
-}
+ std::map<CStdString, ReportingThreadInfoRef>::iterator pair;
-bool Reporting::IsSkip()
-{
- MutexSentinel sentinel(m_mutex);
- if(numTapesToSkip)
+ if(!trackingServer.size())
{
- numTapesToSkip--;
- return true;
+ if(s_reportingThreads.size() == 1)
+ {
+ pair = s_reportingThreads.begin();
+ ReportingThreadInfoRef reportingThread = pair->second;
+ {
+ MutexSentinel sentinel(reportingThread->m_mutex);
+ reportingThread->m_numTapesToSkip += number;
+
+ return;
+ }
+ }
+ else
+ {
+ return;
+ }
+ }
+
+ pair = s_reportingThreads.find(trackingServer);
+ if(pair != s_reportingThreads.end())
+ {
+ ReportingThreadInfoRef reportingThread = pair->second;
+ {
+ MutexSentinel sentinel(reportingThread->m_mutex);
+ reportingThread->m_numTapesToSkip += number;
+ }
}
- return false;
}
+
void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
{
- if (m_audioTapeQueue.push(audioTapeRef))
- {
- LOG4CXX_DEBUG(LOG.reportingLog, CStdString("added audiotape to queue:") + audioTapeRef->GetIdentifier());
- m_queueFullError = false;
- }
- else
+ std::map<CStdString, ReportingThreadInfoRef>::iterator pair;
+ CStdString logMsg;
+
+ for(pair = s_reportingThreads.begin(); pair != s_reportingThreads.end(); pair++)
{
- if(m_queueFullError == false)
+ ReportingThreadInfoRef reportingThread = pair->second;
+
+ if(reportingThread->m_audioTapeQueue.push(audioTapeRef))
{
- m_queueFullError = true;
- LOG4CXX_ERROR(LOG.reportingLog, CStdString("queue full"));
+ reportingThread->m_queueFullError = false;
+ logMsg.Format("[%s] added audiotape to queue: %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier());
+ LOG4CXX_INFO(LOG.reportingLog, logMsg);
+ }
+ else
+ {
+ if(reportingThread->m_queueFullError == false)
+ {
+ logMsg.Format("[%s] queue full, could not add audiotape %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier());
+ LOG4CXX_WARN(LOG.reportingLog, logMsg);
+ reportingThread->m_queueFullError = true;
+ }
}
}
}
void Reporting::ThreadHandler(void *args)
{
+ int humptyDumptySatOnAWall;
+
+ humptyDumptySatOnAWall = 0;
+
+ return;
+}
+
+#if 0
+void Reporting::ThreadHandler(void *args)
+{
CStdString processorName("Reporting");
TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName);
if(reporting.get() == NULL)
@@ -223,6 +311,7 @@ void Reporting::ThreadHandler(void *args)
}
LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread"));
}
+#endif
//=======================================================
#define REPORTING_SKIP_TAPE_CLASS "reportingskiptape"
@@ -238,6 +327,7 @@ void ReportingSkipTapeMsg::Define(Serializer* s)
CStdString thisClass(REPORTING_SKIP_TAPE_CLASS);
s->StringValue(OBJECT_TYPE_TAG, thisClass, true);
s->IntValue("num", (int&)m_number, false);
+ s->StringValue("tracker", m_tracker, false);
}
@@ -259,7 +349,7 @@ ObjectRef ReportingSkipTapeMsg::Process()
Reporting* reporting = Reporting::Instance();
if(reporting)
{
- reporting->SkipTapes(m_number);
+ reporting->SkipTapes(m_number, m_tracker);
}
SimpleResponseMsg* msg = new SimpleResponseMsg;
@@ -270,4 +360,153 @@ ObjectRef ReportingSkipTapeMsg::Process()
}
+//=======================================================
+ReportingThread::ReportingThread()
+{
+ m_serverPort = 0;
+ m_serverHostname = "0.0.0.0";
+}
+
+bool ReportingThread::IsSkip()
+{
+ MutexSentinel sentinel(m_myInfo->m_mutex);
+
+ if(m_myInfo->m_numTapesToSkip > 0)
+ {
+ m_myInfo->m_numTapesToSkip--;
+ return true;
+ }
+
+ return false;
+}
+
+void ReportingThread::Run()
+{
+ CStdString logMsg;
+
+ logMsg.Format("Thread reporting to %s started", m_threadId);
+ LOG4CXX_INFO(LOG.reportingLog, logMsg);
+
+ bool stop = false;
+ bool reportError = true;
+ time_t reportErrorLastTime = 0;
+ bool error = false;
+
+ for(;stop == false;)
+ {
+ try
+ {
+ AudioTapeRef audioTapeRef = m_myInfo->m_audioTapeQueue.pop();
+
+ if(audioTapeRef.get() == NULL)
+ {
+ if(Daemon::Singleton()->IsStopping())
+ {
+ stop = true;
+ }
+ }
+ else
+ {
+
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get();
+ //bool startMsg = false;
+ bool realtimeMessage = false;
+
+ if(msgRef.get() && CONFIG.m_enableReporting)
+ {
+ //if(ptapeMsg->m_stage.Equals("START"))
+ //{
+ // startMsg = true;
+ //}
+ if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop"))
+ {
+ realtimeMessage = true;
+ }
+
+ CStdString msgAsSingleLineString = msgRef->SerializeSingleLine();
+ LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + msgAsSingleLineString);
+
+ OrkHttpSingleLineClient c;
+ TapeResponseRef tr(new TapeResponse());
+ audioTapeRef->m_tapeResponse = tr;
+
+ bool success = false;
+
+ while (!success && !IsSkip())
+ {
+ if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), m_serverHostname, m_serverPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
+ {
+ success = true;
+ reportError = true; // reenable error reporting
+ if(error)
+ {
+ error = false;
+ LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Orktrack successfully contacted"));
+ }
+
+ if(tr->m_deleteTape)
+ {
+ CStdString tapeFilename = audioTapeRef->GetFilename();
+
+ CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename;
+ if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
+ {
+ LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + "Deleted tape: " + tapeFilename);
+ }
+ else
+ {
+ LOG4CXX_DEBUG(LOG.reportingLog, "[" + m_threadId + "] " + "Could not delete tape: " + tapeFilename);
+ }
+
+ }
+ else
+ {
+ // Tape is wanted
+ if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start"))
+ {
+ CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty);
+ CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty);
+ }
+ }
+ //else
+ //{
+ // if(!startMsg)
+ // {
+ // // Pass the tape to the next processor
+ // pReporting->Runsftp NextProcessor(audioTapeRef);
+ // }
+ //}
+ }
+ else
+ {
+ error = true;
+
+ if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute
+ {
+ reportError = false;
+ reportErrorLastTime = time(NULL);
+ LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Could not contact orktrack"));
+ }
+ if(realtimeMessage)
+ {
+ success = true; // No need to resend realtime messages
+ }
+ else
+ {
+ ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (CStdString& e)
+ {
+ LOG4CXX_ERROR(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exception: ") + e);
+ }
+ }
+ LOG4CXX_INFO(LOG.reportingLog, "[" + m_threadId + "] " + CStdString("Exiting thread"));
+}
diff --git a/orkbasecxx/Reporting.h b/orkbasecxx/Reporting.h
index 7f7778d..5446467 100644
--- a/orkbasecxx/Reporting.h
+++ b/orkbasecxx/Reporting.h
@@ -18,6 +18,37 @@
#include "TapeProcessor.h"
#include "AudioTape.h"
+
+struct ReportingThreadInfo
+{
+ char m_serverHostname[256];
+ int m_serverPort;
+ int m_numTapesToSkip;
+ bool m_queueFullError;
+ char m_threadId[256];
+ ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
+ ACE_Thread_Mutex m_mutex;
+};
+typedef boost::shared_ptr<ReportingThreadInfo> ReportingThreadInfoRef;
+
+
+class ReportingThread
+{
+public:
+ ReportingThread();
+ void Run();
+
+ CStdString m_serverHostname;
+ int m_serverPort;
+
+ CStdString m_threadId;
+ ReportingThreadInfoRef m_myInfo;
+private:
+ bool IsSkip();
+};
+
+//=======================================================
+
class DLL_IMPORT_EXPORT_ORKBASE Reporting : public TapeProcessor
{
public:
@@ -27,22 +58,23 @@ public:
CStdString __CDECL__ GetName();
TapeProcessorRef __CDECL__ Instanciate();
void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef);
- void __CDECL__ SkipTapes(int number);
+ void __CDECL__ SkipTapes(int number, CStdString trackingServer="");
//static Reporting* GetInstance();
static void ThreadHandler(void *args);
+ static void ReportingThreadEntryPoint(void *args);
private:
Reporting();
- bool IsSkip();
+ //bool IsSkip();
//static Reporting m_reportingSingleton;
static TapeProcessorRef m_singleton;
- ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
- bool m_queueFullError;
- int numTapesToSkip;
- ACE_Thread_Mutex m_mutex;
+ //ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
+ //bool m_queueFullError;
+ //int numTapesToSkip;
+ //ACE_Thread_Mutex m_mutex;
};
class DLL_IMPORT_EXPORT_ORKBASE ReportingSkipTapeMsg : public SyncMessage
@@ -58,7 +90,9 @@ public:
ObjectRef Process();
int m_number;
+ CStdString m_tracker;
};
+
#endif