summaryrefslogtreecommitdiff
path: root/orkaudio/BatchProcessing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'orkaudio/BatchProcessing.cpp')
-rw-r--r--orkaudio/BatchProcessing.cpp352
1 files changed, 0 insertions, 352 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp
deleted file mode 100644
index 3a720dd..0000000
--- a/orkaudio/BatchProcessing.cpp
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Oreka -- A media capture and retrieval platform
- *
- * Copyright (C) 2005, orecx LLC
- *
- * http://www.orecx.com
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License.
- * Please refer to http://www.gnu.org/copyleft/gpl.html
- *
- */
-#pragma warning( disable: 4786 )
-
-#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
-
-#include <vector>
-#include <bitset>
-
-#include "ConfigManager.h"
-#include "BatchProcessing.h"
-#include "ace/OS_NS_unistd.h"
-#include "audiofile/LibSndFileFile.h"
-#include "Daemon.h"
-#include "Filter.h"
-#include "Reporting.h"
-
-TapeProcessorRef BatchProcessing::m_singleton;
-
-void BatchProcessing::Initialize()
-{
- if(m_singleton.get() == NULL)
- {
- m_singleton.reset(new BatchProcessing());
- TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
- }
-}
-
-
-BatchProcessing::BatchProcessing()
-{
- m_threadCount = 0;
-
- struct tm date = {0};
- time_t now = time(NULL);
- ACE_OS::localtime_r(&now, &date);
- m_currentDay = date.tm_mday;
-}
-
-CStdString __CDECL__ BatchProcessing::GetName()
-{
- return "BatchProcessing";
-}
-
-TapeProcessorRef BatchProcessing::Instanciate()
-{
- return m_singleton;
-}
-
-void BatchProcessing::AddAudioTape(AudioTapeRef& audioTapeRef)
-{
- if (!m_audioTapeQueue.push(audioTapeRef))
- {
- // Log error
- LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("queue full"));
- }
-}
-
-void BatchProcessing::SetQueueSize(int size)
-{
- m_audioTapeQueue.setSize(size);
-}
-
-void BatchProcessing::ThreadHandler(void *args)
-{
- CStdString debug;
- CStdString logMsg;
-
- CStdString processorName("BatchProcessing");
- TapeProcessorRef batchProcessing = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName);
- if(batchProcessing.get() == NULL)
- {
- LOG4CXX_ERROR(LOG.batchProcessingLog, "Could not instanciate BatchProcessing");
- return;
- }
- BatchProcessing* pBatchProcessing = (BatchProcessing*)(batchProcessing->Instanciate().get());
-
- pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize);
-
- int threadId = 0;
- {
- MutexSentinel sentinel(pBatchProcessing->m_mutex);
- threadId = pBatchProcessing->m_threadCount++;
- }
- CStdString threadIdString = IntToString(threadId);
- debug.Format("thread Th%s starting - queue size:%d", threadIdString, CONFIG.m_batchProcessingQueueSize);
- LOG4CXX_INFO(LOG.batchProcessingLog, debug);
-
- bool stop = false;
-
- for(;stop == false;)
- {
- AudioFileRef fileRef;
- AudioFileRef outFileRef;
- AudioTapeRef audioTapeRef;
- CStdString trackingId = "[no-trk]";
-
- try
- {
- audioTapeRef = pBatchProcessing->m_audioTapeQueue.pop();
- if(audioTapeRef.get() == NULL)
- {
- if(DaemonSingleton::instance()->IsStopping())
- {
- stop = true;
- }
- if(DaemonSingleton::instance()->GetShortLived())
- {
- DaemonSingleton::instance()->Stop();
- }
- }
- else
- {
- fileRef = audioTapeRef->GetAudioFileRef();
- trackingId = audioTapeRef->m_trackingId;
-
- // Let's work on the tape we have pulled
- //CStdString threadIdString = IntToString(threadId);
- LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " processing " + audioTapeRef->GetIdentifier());
-
- //fileRef->MoveOrig(); // #### could do this only when original and output file have the same extension. Irrelevant for now as everything is captured as mcf file
- fileRef->Open(AudioFile::READ);
-
- AudioChunkRef chunkRef;
- AudioChunkRef tmpChunkRef;
-
- switch(CONFIG.m_storageAudioFormat)
- {
- case FfUlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
- break;
- case FfAlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
- break;
- case FfGsm:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
- break;
- case FfPcmWav:
- default:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
- }
-
- FilterRef filter;
- FilterRef decoder1;
- FilterRef decoder2;
- FilterRef decoder;
-
- std::bitset<RTP_PAYLOAD_TYPE_MAX> seenRtpPayloadTypes;
- std::vector<FilterRef> decoders1;
- std::vector<FilterRef> decoders2;
- for(int pt=0; pt<RTP_PAYLOAD_TYPE_MAX; pt++)
- {
- decoder1 = FilterRegistry::instance()->GetNewFilter(pt);
- decoders1.push_back(decoder1);
- decoder2 = FilterRegistry::instance()->GetNewFilter(pt);
- decoders2.push_back(decoder2);
- }
-
- bool firstChunk = true;
- bool voIpSession = false;
-
- size_t numSamplesS1 = 0;
- size_t numSamplesS2 = 0;
- size_t numSamplesOut = 0;
-
- while(fileRef->ReadChunkMono(chunkRef))
- {
- // ############ HACK
- //ACE_Time_Value yield;
- //yield.set(0,1);
- //ACE_OS::sleep(yield);
- // ############ HACK
-
- AudioChunkDetails details = *chunkRef->GetDetails();
- decoder.reset();
-
- if(details.m_rtpPayloadType < -1 || details.m_rtpPayloadType > RTP_PAYLOAD_TYPE_MAX)
- {
- logMsg.Format("RTP payload type out of bound:%d", details.m_rtpPayloadType);
- throw(logMsg);
- }
-
- // Instanciate any decoder we might need during a VoIP session
- if(details.m_rtpPayloadType != -1)
- {
- voIpSession = true;
-
- if(details.m_channel == 2)
- {
- decoder2 = decoders2.at(details.m_rtpPayloadType);
- decoder = decoder2;
- }
- else
- {
- decoder1 = decoders1.at(details.m_rtpPayloadType);
- decoder = decoder1;
- }
-
- bool ptAlreadySeen = seenRtpPayloadTypes.test(details.m_rtpPayloadType);
- seenRtpPayloadTypes.set(details.m_rtpPayloadType);
-
- if(decoder.get() == NULL)
- {
- if(ptAlreadySeen == false)
- {
- // First time we see a particular unsupported payload type in this session, log it
- CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
- LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " unsupported RTP payload type:" + rtpPayloadType);
- }
- // We cannot decode this chunk due to unknown codec, go to next chunk
- continue;
- }
- else if(ptAlreadySeen == false)
- {
- // First time we see a particular supported payload type in this session, log it
- CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
- LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " RTP payload type:" + rtpPayloadType);
- }
- }
- if(!voIpSession || (firstChunk && decoder.get()))
- {
- firstChunk = false;
-
- // At this point, we know we have a working codec, create an RTP mixer and open the output file
- if(voIpSession)
- {
- CStdString filterName("RtpMixer");
- filter = FilterRegistry::instance()->GetNewFilter(filterName);
- if(filter.get() == NULL)
- {
- debug = "Could not instanciate RTP mixer";
- throw(debug);
- }
- }
-
- CStdString path = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath();
- FileRecursiveMkdir(path);
-
- CStdString file = path + "/" + audioTapeRef->GetIdentifier();
- outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
- }
- if(voIpSession)
- {
- if(details.m_channel == 2)
- {
- decoder2->AudioChunkIn(chunkRef);
- decoder2->AudioChunkOut(tmpChunkRef);
- if(tmpChunkRef.get())
- {
- numSamplesS2 += tmpChunkRef->GetNumSamples();
- }
- }
- else
- {
- decoder1->AudioChunkIn(chunkRef);
- decoder1->AudioChunkOut(tmpChunkRef);
- if(tmpChunkRef.get())
- {
- numSamplesS1 += tmpChunkRef->GetNumSamples();
- }
- }
- filter->AudioChunkIn(tmpChunkRef);
- filter->AudioChunkOut(tmpChunkRef);
- }
- outFileRef->WriteChunk(tmpChunkRef);
- if(tmpChunkRef.get())
- {
- numSamplesOut += tmpChunkRef->GetNumSamples();
- }
-
- if(CONFIG.m_batchProcessingEnhancePriority == false)
- {
- // Give up CPU between every audio buffer to make sure the actual recording always has priority
- //ACE_Time_Value yield;
- //yield.set(0,1); // 1 us
- //ACE_OS::sleep(yield);
-
- // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only.
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 1;
- ACE_OS::nanosleep (&ts, NULL);
- }
- }
-
- if(voIpSession && !firstChunk)
- {
- // Flush the RTP mixer
- AudioChunkRef stopChunk(new AudioChunk());
- stopChunk->GetDetails()->m_marker = MEDIA_CHUNK_EOS_MARKER;
- filter->AudioChunkIn(stopChunk);
- filter->AudioChunkOut(tmpChunkRef);
- outFileRef->WriteChunk(tmpChunkRef);
- if(tmpChunkRef.get())
- {
- numSamplesOut += tmpChunkRef->GetNumSamples();
- }
- }
-
- fileRef->Close();
- outFileRef->Close();
- logMsg.Format("[%s] Th%s stop: num samples: s1:%u s2:%u out:%u", trackingId, threadIdString, numSamplesS1, numSamplesS2, numSamplesOut);
- LOG4CXX_INFO(LOG.batchProcessingLog, logMsg);
-
- if(CONFIG.m_deleteNativeFile)
- {
- fileRef->Delete();
- LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
- }
-
- // Report tape ready message to orktrack
- CaptureEventRef readyEvent(new CaptureEvent);
- readyEvent->m_type = CaptureEvent::EtReady;
- readyEvent->m_timestamp = time(NULL);
- audioTapeRef->AddCaptureEvent(readyEvent, true);
- Reporting::Instance()->AddAudioTape(audioTapeRef);
-
- // Finished processing the tape, pass on to next processor
- pBatchProcessing->RunNextProcessor(audioTapeRef);
- }
- }
- catch (CStdString& e)
- {
- LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " " + e);
- if(fileRef.get()) {fileRef->Close();}
- if(outFileRef.get()) {outFileRef->Close();}
- if(CONFIG.m_deleteFailedCaptureFile && fileRef.get() != NULL)
- {
- LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native and transcoded");
- if(fileRef.get()) {fileRef->Delete();}
- if(outFileRef.get()) {outFileRef->Delete();}
- }
- }
- //catch(...)
- //{
- // LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("unknown exception"));
- //}
- }
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Exiting thread Th" + threadIdString));
-}
-
-