summaryrefslogtreecommitdiff
path: root/orkbasecxx/Reporting.cpp
diff options
context:
space:
mode:
authorHenri Herscher <henri@oreka.org>2007-07-30 14:32:19 +0000
committerHenri Herscher <henri@oreka.org>2007-07-30 14:32:19 +0000
commit72fda6ebe7d6245b57178441c6355eb9d2402747 (patch)
treed5683a93b1e4d0efee26995caeeccd55faae0d8c /orkbasecxx/Reporting.cpp
parent483b0c94e1754d01c934dc3421527fc6eefa3ebd (diff)
Added non-lookback recording mode.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@458 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx/Reporting.cpp')
-rw-r--r--orkbasecxx/Reporting.cpp263
1 files changed, 263 insertions, 0 deletions
diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp
new file mode 100644
index 0000000..9aa053d
--- /dev/null
+++ b/orkbasecxx/Reporting.cpp
@@ -0,0 +1,263 @@
+/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#pragma warning( disable: 4786 )
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include "ConfigManager.h"
+#include "Reporting.h"
+#include "LogManager.h"
+#include "messages/Message.h"
+#include "messages/TapeMsg.h"
+#include "OrkClient.h"
+#include "Daemon.h"
+
+
+TapeProcessorRef Reporting::m_singleton;
+
+void Reporting::Initialize()
+{
+ if(m_singleton.get() == NULL)
+ {
+ m_singleton.reset(new Reporting());
+ TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
+ }
+}
+
+Reporting* Reporting::Instance()
+{
+ return (Reporting*)m_singleton.get();
+}
+
+Reporting::Reporting()
+{
+ m_queueFullError = false;
+ numTapesToSkip = 0;
+}
+
+CStdString __CDECL__ Reporting::GetName()
+{
+ return "Reporting";
+}
+
+TapeProcessorRef Reporting::Instanciate()
+{
+ return m_singleton;
+}
+
+void __CDECL__ Reporting::SkipTapes(int number)
+{
+ MutexSentinel sentinel(m_mutex);
+ numTapesToSkip++;
+}
+
+bool Reporting::IsSkip()
+{
+ MutexSentinel sentinel(m_mutex);
+ if(numTapesToSkip)
+ {
+ numTapesToSkip--;
+ return true;
+ }
+ return false;
+}
+
+void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
+{
+ if (m_audioTapeQueue.push(audioTapeRef))
+ {
+ LOG4CXX_DEBUG(LOG.reportingLog, CStdString("added audiotape to queue:") + audioTapeRef->GetIdentifier());
+ m_queueFullError = false;
+ }
+ else
+ {
+ if(m_queueFullError == false)
+ {
+ m_queueFullError = true;
+ LOG4CXX_ERROR(LOG.reportingLog, CStdString("queue full"));
+ }
+ }
+}
+
+void Reporting::ThreadHandler(void *args)
+{
+ CStdString processorName("Reporting");
+ TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName);
+ if(reporting.get() == NULL)
+ {
+ LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting");
+ return;
+ }
+ Reporting* pReporting = (Reporting*)(reporting->Instanciate().get());
+
+ bool stop = false;
+ bool reportError = true;
+ time_t reportErrorLastTime = 0;
+ bool error = false;
+
+ for(;stop == false;)
+ {
+ try
+ {
+ AudioTapeRef audioTapeRef = pReporting->m_audioTapeQueue.pop();
+
+ if(audioTapeRef.get() == NULL)
+ {
+ if(Daemon::Singleton()->IsStopping())
+ {
+ stop = true;
+ }
+ }
+ else
+ {
+
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get();
+ //bool startMsg = false;
+ bool realtimeMessage = false;
+
+ if(msgRef.get() && CONFIG.m_enableReporting)
+ {
+ //if(ptapeMsg->m_stage.Equals("START"))
+ //{
+ // startMsg = true;
+ //}
+ if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop"))
+ {
+ realtimeMessage = true;
+ }
+
+ CStdString msgAsSingleLineString = msgRef->SerializeSingleLine();
+ LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString);
+
+ OrkHttpSingleLineClient c;
+ TapeResponseRef tr(new TapeResponse());
+ audioTapeRef->m_tapeResponse = tr;
+
+ bool success = false;
+
+ while (!success && !pReporting->IsSkip())
+ {
+ if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
+ {
+ success = true;
+ reportError = true; // reenable error reporting
+ if(error)
+ {
+ error = false;
+ LOG4CXX_ERROR(LOG.reportingLog, CStdString("Orktrack successfully contacted"));
+ }
+
+ if(tr->m_deleteTape)
+ {
+ CStdString tapeFilename = audioTapeRef->GetFilename();
+
+ CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename;
+ if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
+ {
+ LOG4CXX_INFO(LOG.reportingLog, "Deleted tape: " + tapeFilename);
+ }
+ else
+ {
+ LOG4CXX_DEBUG(LOG.reportingLog, "Could not delete tape: " + tapeFilename);
+ }
+
+ }
+ //else
+ //{
+ // if(!startMsg)
+ // {
+ // // Pass the tape to the next processor
+ // pReporting->Runsftp NextProcessor(audioTapeRef);
+ // }
+ //}
+ }
+ else
+ {
+ error = true;
+
+ if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute
+ {
+ reportError = false;
+ reportErrorLastTime = time(NULL);
+ LOG4CXX_ERROR(LOG.reportingLog, CStdString("Could not contact orktrack"));
+ }
+ if(realtimeMessage)
+ {
+ success = true; // No need to resend realtime messages
+ }
+ else
+ {
+ ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (CStdString& e)
+ {
+ LOG4CXX_ERROR(LOG.reportingLog, CStdString("Exception: ") + e);
+ }
+ }
+ LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread"));
+}
+
+//=======================================================
+#define REPORTING_SKIP_TAPE_CLASS "reportingskiptape"
+
+ReportingSkipTapeMsg::ReportingSkipTapeMsg()
+{
+ m_number = 1;
+}
+
+
+void ReportingSkipTapeMsg::Define(Serializer* s)
+{
+ CStdString thisClass(REPORTING_SKIP_TAPE_CLASS);
+ s->StringValue(OBJECT_TYPE_TAG, thisClass, true);
+ s->IntValue("num", (int&)m_number, false);
+}
+
+
+CStdString ReportingSkipTapeMsg::GetClassName()
+{
+ return CStdString(REPORTING_SKIP_TAPE_CLASS);
+}
+
+ObjectRef ReportingSkipTapeMsg::NewInstance()
+{
+ return ObjectRef(new ReportingSkipTapeMsg);
+}
+
+ObjectRef ReportingSkipTapeMsg::Process()
+{
+ bool success = true;
+ CStdString logMsg;
+
+ Reporting* reporting = Reporting::Instance();
+ if(reporting)
+ {
+ reporting->SkipTapes(m_number);
+ }
+
+ SimpleResponseMsg* msg = new SimpleResponseMsg;
+ ObjectRef ref(msg);
+ msg->m_success = success;
+ msg->m_comment = logMsg;
+ return ref;
+}
+
+
+