summaryrefslogtreecommitdiff
path: root/orkbasecxx/BatchProcessing.cpp
diff options
context:
space:
mode:
authorHenri Herscher <henri@oreka.org>2007-07-30 14:32:19 +0000
committerHenri Herscher <henri@oreka.org>2007-07-30 14:32:19 +0000
commit72fda6ebe7d6245b57178441c6355eb9d2402747 (patch)
treed5683a93b1e4d0efee26995caeeccd55faae0d8c /orkbasecxx/BatchProcessing.cpp
parent483b0c94e1754d01c934dc3421527fc6eefa3ebd (diff)
Added non-lookback recording mode.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@458 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx/BatchProcessing.cpp')
-rw-r--r--orkbasecxx/BatchProcessing.cpp352
1 files changed, 352 insertions, 0 deletions
diff --git a/orkbasecxx/BatchProcessing.cpp b/orkbasecxx/BatchProcessing.cpp
new file mode 100644
index 0000000..1700595
--- /dev/null
+++ b/orkbasecxx/BatchProcessing.cpp
@@ -0,0 +1,352 @@
+/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#pragma warning( disable: 4786 )
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include <vector>
+#include <bitset>
+
+#include "ConfigManager.h"
+#include "BatchProcessing.h"
+#include "ace/OS_NS_unistd.h"
+#include "audiofile/LibSndFileFile.h"
+#include "Daemon.h"
+#include "Filter.h"
+#include "Reporting.h"
+
+TapeProcessorRef BatchProcessing::m_singleton;
+
+void BatchProcessing::Initialize()
+{
+ if(m_singleton.get() == NULL)
+ {
+ m_singleton.reset(new BatchProcessing());
+ TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
+ }
+}
+
+
+BatchProcessing::BatchProcessing()
+{
+ m_threadCount = 0;
+
+ struct tm date = {0};
+ time_t now = time(NULL);
+ ACE_OS::localtime_r(&now, &date);
+ m_currentDay = date.tm_mday;
+}
+
+CStdString __CDECL__ BatchProcessing::GetName()
+{
+ return "BatchProcessing";
+}
+
+TapeProcessorRef BatchProcessing::Instanciate()
+{
+ return m_singleton;
+}
+
+void BatchProcessing::AddAudioTape(AudioTapeRef& audioTapeRef)
+{
+ if (!m_audioTapeQueue.push(audioTapeRef))
+ {
+ // Log error
+ LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("queue full"));
+ }
+}
+
+void BatchProcessing::SetQueueSize(int size)
+{
+ m_audioTapeQueue.setSize(size);
+}
+
+void BatchProcessing::ThreadHandler(void *args)
+{
+ CStdString debug;
+ CStdString logMsg;
+
+ CStdString processorName("BatchProcessing");
+ TapeProcessorRef batchProcessing = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName);
+ if(batchProcessing.get() == NULL)
+ {
+ LOG4CXX_ERROR(LOG.batchProcessingLog, "Could not instanciate BatchProcessing");
+ return;
+ }
+ BatchProcessing* pBatchProcessing = (BatchProcessing*)(batchProcessing->Instanciate().get());
+
+ pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize);
+
+ int threadId = 0;
+ {
+ MutexSentinel sentinel(pBatchProcessing->m_mutex);
+ threadId = pBatchProcessing->m_threadCount++;
+ }
+ CStdString threadIdString = IntToString(threadId);
+ debug.Format("thread Th%s starting - queue size:%d", threadIdString, CONFIG.m_batchProcessingQueueSize);
+ LOG4CXX_INFO(LOG.batchProcessingLog, debug);
+
+ bool stop = false;
+
+ for(;stop == false;)
+ {
+ AudioFileRef fileRef;
+ AudioFileRef outFileRef;
+ AudioTapeRef audioTapeRef;
+ CStdString trackingId = "[no-trk]";
+
+ try
+ {
+ audioTapeRef = pBatchProcessing->m_audioTapeQueue.pop();
+ if(audioTapeRef.get() == NULL)
+ {
+ if(Daemon::Singleton()->IsStopping())
+ {
+ stop = true;
+ }
+ if(Daemon::Singleton()->GetShortLived())
+ {
+ Daemon::Singleton()->Stop();
+ }
+ }
+ else
+ {
+ fileRef = audioTapeRef->GetAudioFileRef();
+ trackingId = audioTapeRef->m_trackingId;
+
+ // Let's work on the tape we have pulled
+ //CStdString threadIdString = IntToString(threadId);
+ LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " processing " + audioTapeRef->GetIdentifier());
+
+ //fileRef->MoveOrig(); // #### could do this only when original and output file have the same extension. Irrelevant for now as everything is captured as mcf file
+ fileRef->Open(AudioFile::READ);
+
+ AudioChunkRef chunkRef;
+ AudioChunkRef tmpChunkRef;
+
+ switch(CONFIG.m_storageAudioFormat)
+ {
+ case FfUlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
+ break;
+ case FfAlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
+ break;
+ case FfGsm:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
+ break;
+ case FfPcmWav:
+ default:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
+ }
+
+ FilterRef filter;
+ FilterRef decoder1;
+ FilterRef decoder2;
+ FilterRef decoder;
+
+ std::bitset<RTP_PAYLOAD_TYPE_MAX> seenRtpPayloadTypes;
+ std::vector<FilterRef> decoders1;
+ std::vector<FilterRef> decoders2;
+ for(int pt=0; pt<RTP_PAYLOAD_TYPE_MAX; pt++)
+ {
+ decoder1 = FilterRegistry::instance()->GetNewFilter(pt);
+ decoders1.push_back(decoder1);
+ decoder2 = FilterRegistry::instance()->GetNewFilter(pt);
+ decoders2.push_back(decoder2);
+ }
+
+ bool firstChunk = true;
+ bool voIpSession = false;
+
+ size_t numSamplesS1 = 0;
+ size_t numSamplesS2 = 0;
+ size_t numSamplesOut = 0;
+
+ while(fileRef->ReadChunkMono(chunkRef))
+ {
+ // ############ HACK
+ //ACE_Time_Value yield;
+ //yield.set(0,1);
+ //ACE_OS::sleep(yield);
+ // ############ HACK
+
+ AudioChunkDetails details = *chunkRef->GetDetails();
+ decoder.reset();
+
+ if(details.m_rtpPayloadType < -1 || details.m_rtpPayloadType > RTP_PAYLOAD_TYPE_MAX)
+ {
+ logMsg.Format("RTP payload type out of bound:%d", details.m_rtpPayloadType);
+ throw(logMsg);
+ }
+
+ // Instanciate any decoder we might need during a VoIP session
+ if(details.m_rtpPayloadType != -1)
+ {
+ voIpSession = true;
+
+ if(details.m_channel == 2)
+ {
+ decoder2 = decoders2.at(details.m_rtpPayloadType);
+ decoder = decoder2;
+ }
+ else
+ {
+ decoder1 = decoders1.at(details.m_rtpPayloadType);
+ decoder = decoder1;
+ }
+
+ bool ptAlreadySeen = seenRtpPayloadTypes.test(details.m_rtpPayloadType);
+ seenRtpPayloadTypes.set(details.m_rtpPayloadType);
+
+ if(decoder.get() == NULL)
+ {
+ if(ptAlreadySeen == false)
+ {
+ // First time we see a particular unsupported payload type in this session, log it
+ CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
+ LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " unsupported RTP payload type:" + rtpPayloadType);
+ }
+ // We cannot decode this chunk due to unknown codec, go to next chunk
+ continue;
+ }
+ else if(ptAlreadySeen == false)
+ {
+ // First time we see a particular supported payload type in this session, log it
+ CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
+ LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " RTP payload type:" + rtpPayloadType);
+ }
+ }
+ if(!voIpSession || (firstChunk && decoder.get()))
+ {
+ firstChunk = false;
+
+ // At this point, we know we have a working codec, create an RTP mixer and open the output file
+ if(voIpSession)
+ {
+ CStdString filterName("RtpMixer");
+ filter = FilterRegistry::instance()->GetNewFilter(filterName);
+ if(filter.get() == NULL)
+ {
+ debug = "Could not instanciate RTP mixer";
+ throw(debug);
+ }
+ }
+
+ CStdString path = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath();
+ FileRecursiveMkdir(path);
+
+ CStdString file = path + "/" + audioTapeRef->GetIdentifier();
+ outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
+ }
+ if(voIpSession)
+ {
+ if(details.m_channel == 2)
+ {
+ decoder2->AudioChunkIn(chunkRef);
+ decoder2->AudioChunkOut(tmpChunkRef);
+ if(tmpChunkRef.get())
+ {
+ numSamplesS2 += tmpChunkRef->GetNumSamples();
+ }
+ }
+ else
+ {
+ decoder1->AudioChunkIn(chunkRef);
+ decoder1->AudioChunkOut(tmpChunkRef);
+ if(tmpChunkRef.get())
+ {
+ numSamplesS1 += tmpChunkRef->GetNumSamples();
+ }
+ }
+ filter->AudioChunkIn(tmpChunkRef);
+ filter->AudioChunkOut(tmpChunkRef);
+ }
+ outFileRef->WriteChunk(tmpChunkRef);
+ if(tmpChunkRef.get())
+ {
+ numSamplesOut += tmpChunkRef->GetNumSamples();
+ }
+
+ if(CONFIG.m_batchProcessingEnhancePriority == false)
+ {
+ // Give up CPU between every audio buffer to make sure the actual recording always has priority
+ //ACE_Time_Value yield;
+ //yield.set(0,1); // 1 us
+ //ACE_OS::sleep(yield);
+
+ // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only.
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1;
+ ACE_OS::nanosleep (&ts, NULL);
+ }
+ }
+
+ if(voIpSession && !firstChunk)
+ {
+ // Flush the RTP mixer
+ AudioChunkRef stopChunk(new AudioChunk());
+ stopChunk->GetDetails()->m_marker = MEDIA_CHUNK_EOS_MARKER;
+ filter->AudioChunkIn(stopChunk);
+ filter->AudioChunkOut(tmpChunkRef);
+ outFileRef->WriteChunk(tmpChunkRef);
+ if(tmpChunkRef.get())
+ {
+ numSamplesOut += tmpChunkRef->GetNumSamples();
+ }
+ }
+
+ fileRef->Close();
+ outFileRef->Close();
+ logMsg.Format("[%s] Th%s stop: num samples: s1:%u s2:%u out:%u", trackingId, threadIdString, numSamplesS1, numSamplesS2, numSamplesOut);
+ LOG4CXX_INFO(LOG.batchProcessingLog, logMsg);
+
+ if(CONFIG.m_deleteNativeFile)
+ {
+ fileRef->Delete();
+ LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
+ }
+
+ // Report tape ready message to orktrack
+ CaptureEventRef readyEvent(new CaptureEvent);
+ readyEvent->m_type = CaptureEvent::EtReady;
+ readyEvent->m_timestamp = time(NULL);
+ audioTapeRef->AddCaptureEvent(readyEvent, true);
+ Reporting::Instance()->AddAudioTape(audioTapeRef);
+
+ // Finished processing the tape, pass on to next processor
+ pBatchProcessing->RunNextProcessor(audioTapeRef);
+ }
+ }
+ catch (CStdString& e)
+ {
+ LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " " + e);
+ if(fileRef.get()) {fileRef->Close();}
+ if(outFileRef.get()) {outFileRef->Close();}
+ if(CONFIG.m_deleteFailedCaptureFile && fileRef.get() != NULL)
+ {
+ LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native and transcoded");
+ if(fileRef.get()) {fileRef->Delete();}
+ if(outFileRef.get()) {outFileRef->Delete();}
+ }
+ }
+ //catch(...)
+ //{
+ // LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("unknown exception"));
+ //}
+ }
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Exiting thread Th" + threadIdString));
+}
+
+