summaryrefslogtreecommitdiff
path: root/orkbasecxx
diff options
context:
space:
mode:
authorGerald Begumisa <ben_g@users.sourceforge.net>2008-05-07 17:54:52 +0000
committerGerald Begumisa <ben_g@users.sourceforge.net>2008-05-07 17:54:52 +0000
commit233100139984e7ddf250d83166c778c84b917161 (patch)
treef652391c5bf5c72abe59de9758da0a412682addc /orkbasecxx
parent0ef1e5245dd3112c25001439b72386e9004cb545 (diff)
Corrected error in reporting system where if more than one tracking server is used, only the first server gets a correct message, the rest getting the wrong messages (depending on what is in the event queue on the AudioTape object provided)
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@535 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx')
-rw-r--r--orkbasecxx/CapturePort.cpp22
-rw-r--r--orkbasecxx/Reporting.cpp195
-rw-r--r--orkbasecxx/Reporting.h4
3 files changed, 64 insertions, 157 deletions
diff --git a/orkbasecxx/CapturePort.cpp b/orkbasecxx/CapturePort.cpp
index 46faff2..13d70e2 100644
--- a/orkbasecxx/CapturePort.cpp
+++ b/orkbasecxx/CapturePort.cpp
@@ -230,11 +230,14 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef)
case CaptureEvent::EtStart:
break;
case CaptureEvent::EtStop:
-
+ {
m_capturing = false;
LOG4CXX_INFO(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " stop");
audioTapeRef->AddCaptureEvent(eventRef, true);
- Reporting::Instance()->AddAudioTape(audioTapeRef);
+
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ Reporting::Instance()->AddTapeMessage(msgRef);
if (m_audioTapeRef->GetAudioFileRef().get())
{
@@ -247,15 +250,26 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef)
LOG4CXX_WARN(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " no audio reported between last start and stop");
}
break;
+ }
case CaptureEvent::EtEndMetadata:
+ {
// Now that all metadata has been acquired, we can generate the tape start message
- Reporting::Instance()->AddAudioTape(audioTapeRef);
+
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ Reporting::Instance()->AddTapeMessage(msgRef);
+
break;
+ }
case CaptureEvent::EtUpdate:
+ {
audioTapeRef->AddCaptureEvent(eventRef, true);
// Generate tape update message
- Reporting::Instance()->AddAudioTape(audioTapeRef);
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ Reporting::Instance()->AddTapeMessage(msgRef);
break;
+ }
case CaptureEvent::EtDirection:
case CaptureEvent::EtRemoteParty:
case CaptureEvent::EtLocalParty:
diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp
index 66a2a8a..54db288 100644
--- a/orkbasecxx/Reporting.cpp
+++ b/orkbasecxx/Reporting.cpp
@@ -140,27 +140,45 @@ void __CDECL__ Reporting::SkipTapes(int number, CStdString trackingServer)
}
}
-
-void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
+void Reporting::AddTapeMessage(MessageRef& messageRef)
{
std::map<CStdString, ReportingThreadInfoRef>::iterator pair;
CStdString logMsg;
-
+ TapeMsg *pTapeMsg = (TapeMsg*)messageRef.get(), *pRptTapeMsg;
+ MessageRef reportingMsgRef;
+
for(pair = s_reportingThreads.begin(); pair != s_reportingThreads.end(); pair++)
{
ReportingThreadInfoRef reportingThread = pair->second;
- if(reportingThread->m_audioTapeQueue.push(audioTapeRef))
+ reportingMsgRef.reset(new TapeMsg);
+ pRptTapeMsg = (TapeMsg*)reportingMsgRef.get();
+
+ pRptTapeMsg->m_recId = pTapeMsg->m_recId;
+ pRptTapeMsg->m_fileName = pTapeMsg->m_fileName;
+ pRptTapeMsg->m_stage = pTapeMsg->m_stage;
+ pRptTapeMsg->m_capturePort = pTapeMsg->m_capturePort;
+ pRptTapeMsg->m_localParty = pTapeMsg->m_localParty;
+ pRptTapeMsg->m_localEntryPoint = pTapeMsg->m_localEntryPoint;
+ pRptTapeMsg->m_remoteParty = pTapeMsg->m_remoteParty;
+ pRptTapeMsg->m_direction = pTapeMsg->m_direction;
+ pRptTapeMsg->m_duration = pTapeMsg->m_duration;
+ pRptTapeMsg->m_timestamp = pTapeMsg->m_timestamp;
+ pRptTapeMsg->m_localIp = pTapeMsg->m_localIp;
+ pRptTapeMsg->m_remoteIp = pTapeMsg->m_remoteIp;
+ pRptTapeMsg->m_nativeCallId = pTapeMsg->m_nativeCallId;
+
+ if(reportingThread->m_messageQueue.push(reportingMsgRef))
{
reportingThread->m_queueFullError = false;
- logMsg.Format("[%s] added audiotape to queue: %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier());
+ logMsg.Format("[%s] added %s tape message to queue: %s", reportingThread->m_threadId, pTapeMsg->m_stage, pTapeMsg->m_recId);
LOG4CXX_INFO(LOG.reportingLog, logMsg);
}
else
{
if(reportingThread->m_queueFullError == false)
{
- logMsg.Format("[%s] queue full, could not add audiotape %s", reportingThread->m_threadId, audioTapeRef->GetIdentifier());
+ logMsg.Format("[%s] queue full, could not add tape message %s", reportingThread->m_threadId, pTapeMsg->m_recId);
LOG4CXX_WARN(LOG.reportingLog, logMsg);
reportingThread->m_queueFullError = true;
}
@@ -168,150 +186,27 @@ void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
}
}
-void Reporting::ThreadHandler(void *args)
+void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
{
- int humptyDumptySatOnAWall;
-
- humptyDumptySatOnAWall = 0;
-
- return;
+ // What to do?
+ MessageRef msgRef;
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+ audioTapeRef->GetMessage(msgRef);
+
+ TapeMsg *pTapeMsg = (TapeMsg*)msgRef.get();
+
+ AddTapeMessage(msgRef);
}
-#if 0
void Reporting::ThreadHandler(void *args)
{
- CStdString processorName("Reporting");
- TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName);
- if(reporting.get() == NULL)
- {
- LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting");
- return;
- }
- Reporting* pReporting = (Reporting*)(reporting->Instanciate().get());
-
- bool stop = false;
- bool reportError = true;
- time_t reportErrorLastTime = 0;
- bool error = false;
-
- for(;stop == false;)
- {
- try
- {
- AudioTapeRef audioTapeRef = pReporting->m_audioTapeQueue.pop();
-
- if(audioTapeRef.get() == NULL)
- {
- if(Daemon::Singleton()->IsStopping())
- {
- stop = true;
- }
- }
- else
- {
-
- MessageRef msgRef;
- audioTapeRef->GetMessage(msgRef);
- TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get();
- //bool startMsg = false;
- bool realtimeMessage = false;
-
- if(msgRef.get() && CONFIG.m_enableReporting)
- {
- //if(ptapeMsg->m_stage.Equals("START"))
- //{
- // startMsg = true;
- //}
- if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop"))
- {
- realtimeMessage = true;
- }
-
- CStdString msgAsSingleLineString = msgRef->SerializeSingleLine();
- LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString);
-
- OrkHttpSingleLineClient c;
- TapeResponseRef tr(new TapeResponse());
- audioTapeRef->m_tapeResponse = tr;
-
- bool success = false;
-
- while (!success && !pReporting->IsSkip())
- {
- if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
- {
- success = true;
- reportError = true; // reenable error reporting
- if(error)
- {
- error = false;
- LOG4CXX_ERROR(LOG.reportingLog, CStdString("Orktrack successfully contacted"));
- }
-
- if(tr->m_deleteTape)
- {
- CStdString tapeFilename = audioTapeRef->GetFilename();
-
- CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename;
- if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
- {
- LOG4CXX_INFO(LOG.reportingLog, "Deleted tape: " + tapeFilename);
- }
- else
- {
- LOG4CXX_DEBUG(LOG.reportingLog, "Could not delete tape: " + tapeFilename);
- }
-
- }
- else
- {
- // Tape is wanted
- if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start"))
- {
- CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty);
- CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty);
- }
- }
- //else
- //{
- // if(!startMsg)
- // {
- // // Pass the tape to the next processor
- // pReporting->Runsftp NextProcessor(audioTapeRef);
- // }
- //}
- }
- else
- {
- error = true;
-
- if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute
- {
- reportError = false;
- reportErrorLastTime = time(NULL);
- LOG4CXX_ERROR(LOG.reportingLog, CStdString("Could not contact orktrack"));
- }
- if(realtimeMessage)
- {
- success = true; // No need to resend realtime messages
- }
- else
- {
- ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem
- }
- }
- }
- }
- }
- }
- catch (CStdString& e)
- {
- LOG4CXX_ERROR(LOG.reportingLog, CStdString("Exception: ") + e);
- }
- }
- LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread"));
+ return;
}
-#endif
//=======================================================
#define REPORTING_SKIP_TAPE_CLASS "reportingskiptape"
@@ -321,7 +216,6 @@ ReportingSkipTapeMsg::ReportingSkipTapeMsg()
m_number = 1;
}
-
void ReportingSkipTapeMsg::Define(Serializer* s)
{
CStdString thisClass(REPORTING_SKIP_TAPE_CLASS);
@@ -396,9 +290,9 @@ void ReportingThread::Run()
{
try
{
- AudioTapeRef audioTapeRef = m_myInfo->m_audioTapeQueue.pop();
+ MessageRef msgRef = m_myInfo->m_messageQueue.pop();
- if(audioTapeRef.get() == NULL)
+ if(msgRef.get() == NULL)
{
if(Daemon::Singleton()->IsStopping())
{
@@ -407,9 +301,6 @@ void ReportingThread::Run()
}
else
{
-
- MessageRef msgRef;
- audioTapeRef->GetMessage(msgRef);
TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get();
//bool startMsg = false;
bool realtimeMessage = false;
@@ -430,7 +321,7 @@ void ReportingThread::Run()
OrkHttpSingleLineClient c;
TapeResponseRef tr(new TapeResponse());
- audioTapeRef->m_tapeResponse = tr;
+ //audioTapeRef->m_tapeResponse = tr;
bool success = false;
@@ -448,7 +339,7 @@ void ReportingThread::Run()
if(tr->m_deleteTape)
{
- CStdString tapeFilename = audioTapeRef->GetFilename();
+ CStdString tapeFilename = ptapeMsg->m_fileName;
CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename;
if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
diff --git a/orkbasecxx/Reporting.h b/orkbasecxx/Reporting.h
index 5446467..141a0c8 100644
--- a/orkbasecxx/Reporting.h
+++ b/orkbasecxx/Reporting.h
@@ -26,7 +26,8 @@ struct ReportingThreadInfo
int m_numTapesToSkip;
bool m_queueFullError;
char m_threadId[256];
- ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
+ //ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
+ ThreadSafeQueue<MessageRef> m_messageQueue;
ACE_Thread_Mutex m_mutex;
};
typedef boost::shared_ptr<ReportingThreadInfo> ReportingThreadInfoRef;
@@ -58,6 +59,7 @@ public:
CStdString __CDECL__ GetName();
TapeProcessorRef __CDECL__ Instanciate();
void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef);
+ void __CDECL__ AddTapeMessage(MessageRef& messageRef);
void __CDECL__ SkipTapes(int number, CStdString trackingServer="");
//static Reporting* GetInstance();