diff options
author | Henri Herscher <henri@oreka.org> | 2006-07-05 01:59:32 +0000 |
---|---|---|
committer | Henri Herscher <henri@oreka.org> | 2006-07-05 01:59:32 +0000 |
commit | c27745d1b387606b7e1a5869b7fe4b566410720d (patch) | |
tree | 978a3e5176ff6db2f6b0856d8031e662b68ff03d | |
parent | 0d758fbcf5a581ca5909245cef65c00652219283 (diff) |
* Tape processor interface becomes usable and used
* Reporting and BatchProcessing become "standard" tape processors
* Imediate processing kicks off the tape processor chain
* Object now references Serializer
* ConfigManager singleton not an ACE singleton anymore because ACE singletons are not unique across DLL boundaries.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@296 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r-- | orkaudio/BatchProcessing.cpp | 284 | ||||
-rw-r--r-- | orkaudio/BatchProcessing.h | 28 | ||||
-rw-r--r-- | orkaudio/CapturePluginProxy.cpp | 2 | ||||
-rw-r--r-- | orkaudio/ImmediateProcessing.cpp | 10 | ||||
-rw-r--r-- | orkaudio/OrkAudio.cpp | 12 | ||||
-rw-r--r-- | orkaudio/Reporting.cpp | 46 | ||||
-rw-r--r-- | orkaudio/Reporting.h | 18 | ||||
-rw-r--r-- | orkbasecxx/AudioTape.cpp | 2 | ||||
-rw-r--r-- | orkbasecxx/AudioTape.h | 7 | ||||
-rw-r--r-- | orkbasecxx/Config.cpp | 1 | ||||
-rw-r--r-- | orkbasecxx/Config.h | 2 | ||||
-rw-r--r-- | orkbasecxx/ConfigManager.cpp | 6 | ||||
-rw-r--r-- | orkbasecxx/ConfigManager.h | 9 | ||||
-rw-r--r-- | orkbasecxx/Filter.h | 3 | ||||
-rw-r--r-- | orkbasecxx/Object.cpp | 36 | ||||
-rw-r--r-- | orkbasecxx/Object.h | 5 | ||||
-rw-r--r-- | orkbasecxx/OrkBase.dsp | 4 | ||||
-rw-r--r-- | orkbasecxx/TapeProcessor.cpp | 120 | ||||
-rw-r--r-- | orkbasecxx/TapeProcessor.h | 39 | ||||
-rw-r--r-- | orkbasecxx/messages/TapeMsg.h | 3 |
20 files changed, 405 insertions, 232 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp index 1769a01..2e3ce29 100644 --- a/orkaudio/BatchProcessing.cpp +++ b/orkaudio/BatchProcessing.cpp @@ -14,14 +14,19 @@ #include "ConfigManager.h" #include "BatchProcessing.h" -#include "Reporting.h" -#include "LogManager.h" #include "ace/OS_NS_unistd.h" #include "audiofile/LibSndFileFile.h" #include "Daemon.h" #include "Filter.h" -BatchProcessing BatchProcessing::m_batchProcessingSingleton; +TapeProcessorRef BatchProcessing::m_singleton; + +void BatchProcessing::Initialize() +{ + m_singleton.reset(new BatchProcessing()); + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); +} + BatchProcessing::BatchProcessing() { @@ -33,13 +38,17 @@ BatchProcessing::BatchProcessing() m_currentDay = date.tm_mday; } +CStdString __CDECL__ BatchProcessing::GetName() +{ + return "BatchProcessing"; +} -BatchProcessing* BatchProcessing::GetInstance() +TapeProcessorRef BatchProcessing::Instanciate() { - return &m_batchProcessingSingleton; + return m_singleton; } -void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef) +void BatchProcessing::AddAudioTape(AudioTapeRef& audioTapeRef) { if (!m_audioTapeQueue.push(audioTapeRef)) { @@ -53,70 +62,17 @@ void BatchProcessing::SetQueueSize(int size) m_audioTapeQueue.setSize(size); } -void BatchProcessing::TapeDropRegistration(CStdString& filename) -{ - MutexSentinel sentinel(m_tapeDropMutex); - - CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename; - if (ACE_OS::unlink((PCSTR)absoluteFilename) != 0) - { - LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not delete tape: " + filename); - m_tapesToDrop.insert(std::make_pair(filename, time(NULL))); - } - else - { - LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename); - } -} - -bool BatchProcessing::DropTapeIfNeeded(CStdString& filename) -{ - bool shouldDrop = false; - - MutexSentinel sentinel(m_tapeDropMutex); - - std::map<CStdString, time_t>::iterator pair; - pair = m_tapesToDrop.find(filename); - if(pair != m_tapesToDrop.end()) - { - shouldDrop = true; - CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename; - if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) - { - LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename); - m_tapesToDrop.erase(filename); - } - else - { - LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename); - } - } - - TapeDropHousekeeping(); - - return shouldDrop; -} - -void BatchProcessing::TapeDropHousekeeping() -{ - struct tm date = {0}; - time_t now = time(NULL); - ACE_OS::localtime_r(&now, &date); - if(m_currentDay != date.tm_mday) - { - // another day has passed away ... clear possible leftovers - m_currentDay = date.tm_mday; - m_tapesToDrop.clear(); - } -} - - - void BatchProcessing::ThreadHandler(void *args) { CStdString debug; - BatchProcessing* pBatchProcessing = BatchProcessing::GetInstance(); + TapeProcessorRef batchProcessing = TapeProcessorRegistry::instance()->GetNewTapeProcessor(CStdString("BatchProcessing")); + if(batchProcessing.get() == NULL) + { + LOG4CXX_ERROR(LOG.batchProcessingLog, "Could not instanciate BatchProcessing"); + return; + } + BatchProcessing* pBatchProcessing = (BatchProcessing*)(batchProcessing->Instanciate().get()); pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize); @@ -151,126 +107,114 @@ void BatchProcessing::ThreadHandler(void *args) fileRef = audioTapeRef->GetAudioFileRef(); CStdString filename = audioTapeRef->GetFilename(); - //if(pBatchProcessing->DropTapeIfNeeded(filename) == true) - //{ - // // The tape we have pulled has been dropped in the meantime. just delete the capture file - // if(CONFIG.m_deleteNativeFile) - // { - // fileRef->Delete(); - // } - //} - //else - //{ - // Let's work on the tape we have pulled - //CStdString threadIdString = IntToString(threadId); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); - - fileRef->MoveOrig(); - fileRef->Open(AudioFile::READ); - - AudioChunkRef chunkRef; - AudioChunkRef tmpChunkRef; - - switch(CONFIG.m_storageAudioFormat) - { - case FfUlaw: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV)); - break; - case FfAlaw: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV)); - break; - case FfGsm: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV)); - break; - case FfPcmWav: - default: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV)); - } + // Let's work on the tape we have pulled + //CStdString threadIdString = IntToString(threadId); + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); + + fileRef->MoveOrig(); + fileRef->Open(AudioFile::READ); + + AudioChunkRef chunkRef; + AudioChunkRef tmpChunkRef; + + switch(CONFIG.m_storageAudioFormat) + { + case FfUlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV)); + break; + case FfAlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV)); + break; + case FfGsm: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV)); + break; + case FfPcmWav: + default: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV)); + } - FilterRef filter; - FilterRef decoder1; - FilterRef decoder2; + FilterRef filter; + FilterRef decoder1; + FilterRef decoder2; - bool firstChunk = true; - bool voIpSession = false; + bool firstChunk = true; + bool voIpSession = false; - while(fileRef->ReadChunkMono(chunkRef)) + while(fileRef->ReadChunkMono(chunkRef)) + { + AudioChunkDetails details = *chunkRef->GetDetails(); + if(firstChunk && details.m_rtpPayloadType != -1) { - AudioChunkDetails details = *chunkRef->GetDetails(); - if(firstChunk && details.m_rtpPayloadType != -1) + CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType); + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " RTP payload type:" + rtpPayloadType); + + CStdString filterName("RtpMixer"); + filter = FilterRegistry::instance()->GetNewFilter(filterName); + if(filter.get() == NULL) { - CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " RTP payload type:" + rtpPayloadType); - - CStdString filterName("RtpMixer"); - filter = FilterRegistry::instance()->GetNewFilter(filterName); - if(filter.get() == NULL) - { - debug = "Could not instanciate RTP mixer"; - throw(debug); - } - decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); - decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); - if(decoder1.get() == NULL || decoder2.get() == NULL) - { - debug.Format("Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType); - throw(debug); - } - voIpSession = true; + debug = "Could not instanciate RTP mixer"; + throw(debug); } - if(firstChunk) + decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); + decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); + if(decoder1.get() == NULL || decoder2.get() == NULL) { - firstChunk = false; - - // At this point, we know we have the right codec, open the output file - CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier(); - outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate()); - } - if(voIpSession) - { - if(details.m_channel == 2) - { - decoder2->AudioChunkIn(chunkRef); - decoder2->AudioChunkOut(tmpChunkRef); - } - else - { - decoder1->AudioChunkIn(chunkRef); - decoder1->AudioChunkOut(tmpChunkRef); - } - filter->AudioChunkIn(tmpChunkRef); - filter->AudioChunkOut(tmpChunkRef); + debug.Format("Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType); + throw(debug); } - outFileRef->WriteChunk(tmpChunkRef); + voIpSession = true; + } + if(firstChunk) + { + firstChunk = false; - if(CONFIG.m_batchProcessingEnhancePriority == false) + // At this point, we know we have the right codec, open the output file + CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier(); + outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate()); + } + if(voIpSession) + { + if(details.m_channel == 2) { - // Give up CPU between every audio buffer to make sure the actual recording always has priority - //ACE_Time_Value yield; - //yield.set(0,1); // 1 us - //ACE_OS::sleep(yield); - - // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only. - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 1; - ACE_OS::nanosleep (&ts, NULL); + decoder2->AudioChunkIn(chunkRef); + decoder2->AudioChunkOut(tmpChunkRef); } + else + { + decoder1->AudioChunkIn(chunkRef); + decoder1->AudioChunkOut(tmpChunkRef); + } + filter->AudioChunkIn(tmpChunkRef); + filter->AudioChunkOut(tmpChunkRef); } + outFileRef->WriteChunk(tmpChunkRef); - fileRef->Close(); - outFileRef->Close(); - - if(CONFIG.m_deleteNativeFile) + if(CONFIG.m_batchProcessingEnhancePriority == false) { - fileRef->Delete(); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + // Give up CPU between every audio buffer to make sure the actual recording always has priority + //ACE_Time_Value yield; + //yield.set(0,1); // 1 us + //ACE_OS::sleep(yield); + + // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only. + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + ACE_OS::nanosleep (&ts, NULL); } - //CStdString filename = audioTapeRef->GetFilename(); - //pBatchProcessing->DropTapeIfNeeded(filename); // maybe the tape was dropped while we were processing it + } + + fileRef->Close(); + outFileRef->Close(); + + if(CONFIG.m_deleteNativeFile) + { + fileRef->Delete(); + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + } - Reporting::GetInstance()->AddAudioTape(audioTapeRef); - //} + // Finished processing the tape, pass on to next processor + pBatchProcessing->RunNextProcessor(audioTapeRef); } } catch (CStdString& e) diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h index ec9c88b..09eb1da 100644 --- a/orkaudio/BatchProcessing.h +++ b/orkaudio/BatchProcessing.h @@ -15,35 +15,39 @@ #define __BATCHPROCESSING_H__ #include "ThreadSafeQueue.h" +#include "TapeProcessor.h" #include "AudioTape.h" #include "ace/Thread_Mutex.h" #include <map> -class BatchProcessing +class BatchProcessing; +typedef boost::shared_ptr<BatchProcessing> BatchProcessingRef; + +/** + * This tape processor handles the audio transcoding + */ +class BatchProcessing : public TapeProcessor { public: - static BatchProcessing* GetInstance(); + static void Initialize(); + + CStdString __CDECL__ GetName(); + TapeProcessorRef __CDECL__ Instanciate(); + void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); + + static void ThreadHandler(void *args); - void AddAudioTape(AudioTapeRef audioTapeRef); void SetQueueSize(int size); - /** Ask for a tape to be deleted from disk */ - void TapeDropRegistration(CStdString& filename); private: BatchProcessing(); + static TapeProcessorRef m_singleton; - bool DropTapeIfNeeded(CStdString&filename); - void TapeDropHousekeeping(); - - static BatchProcessing m_batchProcessingSingleton; ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; size_t m_threadCount; ACE_Thread_Mutex m_mutex; - - ACE_Thread_Mutex m_tapeDropMutex; - std::map<CStdString, time_t> m_tapesToDrop; int m_currentDay; }; diff --git a/orkaudio/CapturePluginProxy.cpp b/orkaudio/CapturePluginProxy.cpp index c378d0c..ce6eb7d 100644 --- a/orkaudio/CapturePluginProxy.cpp +++ b/orkaudio/CapturePluginProxy.cpp @@ -93,7 +93,7 @@ bool CapturePluginProxy::Initialize() m_configureFunction = (ConfigureFunction)m_dll.symbol("Configure"); if (m_configureFunction) { - ConfigManagerSingleton::instance()->AddConfigureFunction(m_configureFunction); + ConfigManager::Instance()->AddConfigureFunction(m_configureFunction); m_initializeFunction = (InitializeFunction)m_dll.symbol("Initialize"); if (m_initializeFunction) diff --git a/orkaudio/ImmediateProcessing.cpp b/orkaudio/ImmediateProcessing.cpp index 5aeb1dc..181e659 100644 --- a/orkaudio/ImmediateProcessing.cpp +++ b/orkaudio/ImmediateProcessing.cpp @@ -12,12 +12,16 @@ */ #pragma warning( disable: 4786 ) +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + #include "ImmediateProcessing.h" #include "LogManager.h" #include "ace/OS_NS_unistd.h" #include "BatchProcessing.h" #include "Daemon.h" #include "ConfigManager.h" +#include "TapeProcessor.h" + ImmediateProcessing ImmediateProcessing::m_immediateProcessingSingleton; @@ -46,7 +50,6 @@ void ImmediateProcessing::ThreadHandler(void *args) CStdString logMsg; ImmediateProcessing* pImmediateProcessing = ImmediateProcessing::GetInstance(); - pImmediateProcessing->SetQueueSize(CONFIG.m_immediateProcessingQueueSize); logMsg.Format("thread starting - queue size:%d", CONFIG.m_immediateProcessingQueueSize); @@ -75,8 +78,8 @@ void ImmediateProcessing::ThreadHandler(void *args) if (audioTapeRef->IsReadyForBatchProcessing()) { - // Forward to batch processing thread - BatchProcessing::GetInstance()->AddAudioTape(audioTapeRef); + // Pass the tape to the tape processor chain + TapeProcessorRegistry::instance()->RunProcessingChain(audioTapeRef); } } } @@ -88,4 +91,3 @@ void ImmediateProcessing::ThreadHandler(void *args) LOG4CXX_INFO(LOG.immediateProcessingLog, CStdString("Exiting thread")); } - diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp index 37a3f58..5a161b7 100644 --- a/orkaudio/OrkAudio.cpp +++ b/orkaudio/OrkAudio.cpp @@ -37,6 +37,7 @@ #include "CapturePluginProxy.h" #include "AudioCapturePlugin.h" #include "Filter.h" +#include "TapeProcessor.h" #include <list> @@ -135,7 +136,10 @@ void MainThread() objRef.reset(new TestMsg); ObjectFactorySingleton::instance()->RegisterObject(objRef); - ConfigManagerSingleton::instance()->Initialize(); + ConfigManager::Instance()->Initialize(); + + std::list<ACE_DLL> pluginDlls; + LoadPlugins(pluginDlls); // Register in-built filters FilterRef filter(new AlawToPcmFilter()); @@ -143,8 +147,10 @@ void MainThread() filter.reset(new UlawToPcmFilter()); FilterRegistry::instance()->RegisterFilter(filter); - std::list<ACE_DLL> pluginDlls; - LoadPlugins(pluginDlls); + // Register in-built tape processors and build the processing chain + BatchProcessing::Initialize(); + Reporting::Initialize(); + TapeProcessorRegistry::instance()->CreateProcessingChain(); if (!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(ImmediateProcessing::ThreadHandler))) { diff --git a/orkaudio/Reporting.cpp b/orkaudio/Reporting.cpp index bd71abf..72c3f49 100644 --- a/orkaudio/Reporting.cpp +++ b/orkaudio/Reporting.cpp @@ -19,22 +19,33 @@ #include "messages/TapeMsg.h" #include "OrkClient.h" #include "Daemon.h" -#include "BatchProcessing.h" -Reporting Reporting::m_reportingSingleton; +TapeProcessorRef Reporting::m_singleton; + +void Reporting::Initialize() +{ + m_singleton.reset(new Reporting()); + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); +} + Reporting::Reporting() { m_queueFullError = false; } -Reporting* Reporting::GetInstance() +CStdString __CDECL__ Reporting::GetName() +{ + return "Reporting"; +} + +TapeProcessorRef Reporting::Instanciate() { - return &m_reportingSingleton; + return m_singleton; } -void Reporting::AddAudioTape(AudioTapeRef audioTapeRef) +void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) { if (m_audioTapeQueue.push(audioTapeRef)) { @@ -53,7 +64,14 @@ void Reporting::AddAudioTape(AudioTapeRef audioTapeRef) void Reporting::ThreadHandler(void *args) { - Reporting* pReporting = Reporting::GetInstance(); + TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(CStdString("Reporting")); + if(reporting.get() == NULL) + { + LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting"); + return; + } + Reporting* pReporting = (Reporting*)(reporting->Instanciate().get()); + bool stop = false; for(;stop == false;) @@ -80,21 +98,19 @@ void Reporting::ThreadHandler(void *args) LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString); OrkHttpSingleLineClient c; - TapeResponse tr; + TapeResponseRef tr(new TapeResponse()); + audioTapeRef->m_tapeResponse = tr; bool success = false; bool firstError = true; while (!success) { - if (c.Execute((SyncMessage&)(*msgRef.get()), tr, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) { success = true; - if(tr.m_deleteTape) + if(tr->m_deleteTape) { - //LOG4CXX_INFO(LOG.reportingLog, "Registered tape for removal: " + audioTapeRef->GetIdentifier()); - //BatchProcessing::GetInstance()->TapeDropRegistration(tapeFilename); - CStdString tapeFilename = audioTapeRef->GetFilename(); CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; @@ -108,6 +124,11 @@ void Reporting::ThreadHandler(void *args) } } + else + { + // Pass the tape to the next processor + pReporting->RunNextProcessor(audioTapeRef); + } } else { @@ -131,3 +152,4 @@ void Reporting::ThreadHandler(void *args) } + diff --git a/orkaudio/Reporting.h b/orkaudio/Reporting.h index e2afb72..e16f647 100644 --- a/orkaudio/Reporting.h +++ b/orkaudio/Reporting.h @@ -15,18 +15,26 @@ #define __REPORTING_H__ #include "ThreadSafeQueue.h" +#include "TapeProcessor.h" #include "AudioTape.h" -class Reporting +class Reporting : public TapeProcessor { public: - Reporting(); - static Reporting* GetInstance(); + static void Initialize(); + + CStdString __CDECL__ GetName(); + TapeProcessorRef __CDECL__ Instanciate(); + void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); + + //static Reporting* GetInstance(); static void ThreadHandler(void *args); - void AddAudioTape(AudioTapeRef audioTapeRef); private: - static Reporting m_reportingSingleton; + Reporting(); + //static Reporting m_reportingSingleton; + static TapeProcessorRef m_singleton; + ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; bool m_queueFullError; }; diff --git a/orkbasecxx/AudioTape.cpp b/orkbasecxx/AudioTape.cpp index 5195e2d..3cb81e5 100644 --- a/orkbasecxx/AudioTape.cpp +++ b/orkbasecxx/AudioTape.cpp @@ -12,6 +12,8 @@ */ #pragma warning( disable: 4786 ) +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + #include "ConfigManager.h" #include "AudioTape.h" #include "ace/OS_NS_time.h" diff --git a/orkbasecxx/AudioTape.h b/orkbasecxx/AudioTape.h index c335deb..cb0eaea 100644 --- a/orkbasecxx/AudioTape.h +++ b/orkbasecxx/AudioTape.h @@ -21,7 +21,8 @@ #include <queue> #include "AudioCapture.h" #include "audiofile/AudioFile.h" -#include "messages/Message.h" +#include "messages/TapeMsg.h" + class DLL_IMPORT_EXPORT_ORKBASE AudioTapeDescription : public Object { @@ -75,6 +76,7 @@ public: AudioFileRef GetAudioFileRef(); bool IsReadyForBatchProcessing(); + CStdString m_portId; CStdString m_localParty; CStdString m_localEntryPoint; @@ -85,6 +87,9 @@ public: time_t m_duration; CStdString m_localIp; CStdString m_remoteIp; + + TapeResponseRef m_tapeResponse; + private: void GenerateFilePathAndIdentifier(); diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp index 7921218..41d8771 100644 --- a/orkbasecxx/Config.cpp +++ b/orkbasecxx/Config.cpp @@ -83,6 +83,7 @@ void Config::Define(Serializer* s) s->BoolValue(BATCH_PROCESSING_ENHANCE_PRIORITY_PARAM, m_batchProcessingEnhancePriority); s->BoolValue(DELETE_FAILED_CAPTURE_FILE_PARAM, m_deleteFailedCaptureFile); s->CsvValue(CAPTURE_PORT_FILTERS_PARAM, m_capturePortFilters); + s->CsvValue(TAPE_PROCESSORS_PARAM, m_tapeProcessors); } void Config::Validate() diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h index adcab40..54a0787 100644 --- a/orkbasecxx/Config.h +++ b/orkbasecxx/Config.h @@ -70,6 +70,7 @@ #define DELETE_FAILED_CAPTURE_FILE_PARAM "DeleteFailedCaptureFile" #define DELETE_FAILED_CAPTURE_FILE_DEFAULT false #define CAPTURE_PORT_FILTERS_PARAM "CapturePortFilters" +#define TAPE_PROCESSORS_PARAM "TapeProcessors" class DLL_IMPORT_EXPORT_ORKBASE Config : public Object { @@ -109,6 +110,7 @@ public: bool m_batchProcessingEnhancePriority; bool m_deleteFailedCaptureFile; std::list<CStdString> m_capturePortFilters; + std::list<CStdString> m_tapeProcessors; private: log4cxx::LoggerPtr m_log; diff --git a/orkbasecxx/ConfigManager.cpp b/orkbasecxx/ConfigManager.cpp index c805599..e92312e 100644 --- a/orkbasecxx/ConfigManager.cpp +++ b/orkbasecxx/ConfigManager.cpp @@ -24,6 +24,12 @@ #define CONFIG_FILE_NAME "config.xml" #define ETC_CONFIG_FILE_NAME "/etc/orkaudio/config.xml" +ConfigManager ConfigManager::m_singleton; + +ConfigManager* ConfigManager::Instance() +{ + return &m_singleton; +} void ConfigManager::Initialize() { diff --git a/orkbasecxx/ConfigManager.h b/orkbasecxx/ConfigManager.h index 538dfbf..47efcae 100644 --- a/orkbasecxx/ConfigManager.h +++ b/orkbasecxx/ConfigManager.h @@ -15,25 +15,26 @@ #define __CONFIGMANAGER_H__ #include <list> -#include "ace/Singleton.h" #include "Config.h" #include "AudioCapturePlugin.h" +class ConfigManager; + class DLL_IMPORT_EXPORT_ORKBASE ConfigManager { public: + static ConfigManager* Instance(); void Initialize(); void AddConfigureFunction(ConfigureFunction); Config m_config; private: + static ConfigManager m_singleton; std::list<ConfigureFunction> m_configureFunctions; DOMNode* m_configTopNode; }; -typedef ACE_Singleton<ConfigManager, ACE_Thread_Mutex> ConfigManagerSingleton; - -#define CONFIG ConfigManagerSingleton::instance()->m_config +#define CONFIG ConfigManager::Instance()->m_config #endif diff --git a/orkbasecxx/Filter.h b/orkbasecxx/Filter.h index 48dc9a0..d6b0c2c 100644 --- a/orkbasecxx/Filter.h +++ b/orkbasecxx/Filter.h @@ -14,7 +14,6 @@ #define __FILTER_H__ #include <list> -#include "ace/Singleton.h" #include "AudioCapture.h" #include "dll.h" #include "OrkBase.h" @@ -81,7 +80,6 @@ private: }; //=================================================================== - /** Filter Registry */ class DLL_IMPORT_EXPORT_ORKBASE FilterRegistry @@ -99,6 +97,5 @@ private: static FilterRegistry* m_singleton; }; -//typedef ACE_Singleton<FilterRegistry, ACE_Thread_Mutex> FilterRegistrySingleton; #endif diff --git a/orkbasecxx/Object.cpp b/orkbasecxx/Object.cpp index 66ceeb9..cd6ed79 100644 --- a/orkbasecxx/Object.cpp +++ b/orkbasecxx/Object.cpp @@ -19,36 +19,48 @@ CStdString Object::SerializeSingleLine() { - SingleLineSerializer serializer(this); - return serializer.Serialize(); + SingleLineSerializer* serializer = new SingleLineSerializer(this); + m_serializer.reset(serializer); + return serializer->Serialize(); } void Object::DeSerializeSingleLine(CStdString& input) { - SingleLineSerializer serializer(this); - serializer.DeSerialize(input); + SingleLineSerializer* serializer = new SingleLineSerializer(this); + m_serializer.reset(serializer); + serializer->DeSerialize(input); } void Object::SerializeDom(XERCES_CPP_NAMESPACE::DOMDocument* doc) { - DomSerializer serializer(this); - serializer.Serialize(doc); + + DomSerializer* serializer = new DomSerializer(this); + m_serializer.reset(serializer); + serializer->Serialize(doc); } void Object::DeSerializeDom(DOMNode* doc) { - DomSerializer serializer(this); - serializer.DeSerialize(doc); + DomSerializer* serializer = new DomSerializer(this); + m_serializer.reset(serializer); + serializer->DeSerialize(doc); } CStdString Object::SerializeUrl() { - UrlSerializer serializer(this); - return serializer.Serialize(); + UrlSerializer* serializer = new UrlSerializer(this); + m_serializer.reset(serializer); + return serializer->Serialize(); } void Object::DeSerializeUrl(CStdString& input) { - UrlSerializer serializer(this); - serializer.DeSerialize(input); + UrlSerializer* serializer = new UrlSerializer(this); + m_serializer.reset(serializer); + serializer->DeSerialize(input); +} + +SerializerRef Object::GetSerializer() +{ + return m_serializer; } diff --git a/orkbasecxx/Object.h b/orkbasecxx/Object.h index 5674e55..64f5dc2 100644 --- a/orkbasecxx/Object.h +++ b/orkbasecxx/Object.h @@ -50,6 +50,11 @@ public: virtual ObjectRef NewInstance() = 0; virtual ObjectRef Process() = 0; + + boost::shared_ptr<Serializer> GetSerializer(); + +private: + boost::shared_ptr<Serializer> m_serializer; }; diff --git a/orkbasecxx/OrkBase.dsp b/orkbasecxx/OrkBase.dsp index d9dcc74..2b959ce 100644 --- a/orkbasecxx/OrkBase.dsp +++ b/orkbasecxx/OrkBase.dsp @@ -447,6 +447,10 @@ SOURCE=.\StdString.h # End Source File # Begin Source File +SOURCE=.\TapeProcessor.cpp +# End Source File +# Begin Source File + SOURCE=.\TapeProcessor.h # End Source File # Begin Source File diff --git a/orkbasecxx/TapeProcessor.cpp b/orkbasecxx/TapeProcessor.cpp new file mode 100644 index 0000000..a3d055b --- /dev/null +++ b/orkbasecxx/TapeProcessor.cpp @@ -0,0 +1,120 @@ +/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+
+#pragma warning( disable: 4786 )
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include <log4cxx/logger.h>
+#include "TapeProcessor.h"
+#include "ConfigManager.h"
+
+using namespace log4cxx;
+
+TapeProcessor::TapeProcessor()
+{
+ ;
+}
+
+void TapeProcessor::SetNextProcessor(TapeProcessorRef& nextProcessor)
+{
+ m_nextProcessor = nextProcessor;
+}
+
+void TapeProcessor::RunNextProcessor(AudioTapeRef& tape)
+{
+ if(m_nextProcessor.get())
+ {
+ m_nextProcessor->AddAudioTape(tape);
+ }
+}
+
+
+//=====================================================
+LoggerPtr s_log;
+
+TapeProcessorRegistry::TapeProcessorRegistry()
+{
+ s_log = Logger::getLogger("tape.taperegistry");
+}
+
+
+void TapeProcessorRegistry::RegisterTapeProcessor(TapeProcessorRef& tapeProcessor)
+{
+ m_TapeProcessors.push_back(tapeProcessor);
+ LOG4CXX_INFO(s_log, CStdString("Registered processor: ") + tapeProcessor->GetName());
+}
+
+
+TapeProcessorRef TapeProcessorRegistry::GetNewTapeProcessor(CStdString& TapeProcessorName)
+{
+ for(std::list<TapeProcessorRef>::iterator it = m_TapeProcessors.begin(); it!=m_TapeProcessors.end(); it++)
+ {
+ TapeProcessorRef TapeProcessor = *it;
+
+ if( TapeProcessor->GetName().CompareNoCase(TapeProcessorName) == 0 )
+ {
+ return TapeProcessor->Instanciate();
+ }
+ }
+ return TapeProcessorRef(); // No TapeProcessor found
+}
+
+TapeProcessorRegistry* TapeProcessorRegistry::m_singleton = 0;
+
+TapeProcessorRegistry* TapeProcessorRegistry::instance()
+{
+ if(m_singleton == NULL)
+ {
+ m_singleton = new TapeProcessorRegistry();
+ }
+ return m_singleton;
+}
+
+void TapeProcessorRegistry::CreateProcessingChain()
+{
+ TapeProcessorRef previousProcessor;
+
+ //ConfigManager* cm = ConfigManagerSingleton::instance();
+
+ for(std::list<CStdString>::iterator it = CONFIG.m_tapeProcessors.begin(); it != CONFIG.m_tapeProcessors.end(); it++)
+ {
+ CStdString tapeProcessorName = *it;
+ TapeProcessorRef processor = GetNewTapeProcessor(tapeProcessorName);
+ if(processor.get())
+ {
+ if(m_firstTapeProcessor.get() == NULL)
+ {
+ m_firstTapeProcessor = processor;
+ }
+ if(previousProcessor.get())
+ {
+ previousProcessor->SetNextProcessor(processor);
+ }
+ previousProcessor = processor;
+ LOG4CXX_DEBUG(s_log, CStdString("Adding processor to chain:") + tapeProcessorName);
+ }
+ else
+ {
+ LOG4CXX_ERROR(s_log, CStdString("Processor:") + tapeProcessorName + " does not exist, please check <TapeProcessors> in config.xml");
+ }
+ }
+}
+
+void TapeProcessorRegistry::RunProcessingChain(AudioTapeRef& tape)
+{
+ if(m_firstTapeProcessor.get())
+ {
+ m_firstTapeProcessor->AddAudioTape(tape);
+ }
+}
diff --git a/orkbasecxx/TapeProcessor.h b/orkbasecxx/TapeProcessor.h index 2b60fc8..a177480 100644 --- a/orkbasecxx/TapeProcessor.h +++ b/orkbasecxx/TapeProcessor.h @@ -16,22 +16,53 @@ //#include <list>
//#include "ace/Singleton.h"
#include "AudioCapture.h"
+#include "AudioTape.h"
#include "dll.h"
#include "OrkBase.h"
-class TapeProcessor;
+
+class DLL_IMPORT_EXPORT_ORKBASE TapeProcessor;
typedef boost::shared_ptr<TapeProcessor> TapeProcessorRef;
/** TapeProcessor Interface
- * a filter is a black box that takes media chunks as an input and produces media chunks as an output
- * it can be translating between two encodings (codec) or just processing the signal
+ * a Tape Processor is a black box that takes Audio Tapes as an input and
+ * processes them.
*/
class DLL_IMPORT_EXPORT_ORKBASE TapeProcessor
{
public:
+ TapeProcessor();
+
+ virtual CStdString __CDECL__ GetName() = 0;
virtual TapeProcessorRef __CDECL__ Instanciate() = 0;
- virtual void __CDECL__ AddAudioTape(AudioTapeRef audioTapeRef) = 0;
+ virtual void __CDECL__ AddAudioTape(AudioTapeRef&) = 0;
+
+ void SetNextProcessor(TapeProcessorRef& nextProcessor);
+ void RunNextProcessor(AudioTapeRef&);
+
+protected:
+ TapeProcessorRef m_nextProcessor;
+};
+
+//===================================================================
+/** TapeProcessor Registry
+*/
+class DLL_IMPORT_EXPORT_ORKBASE TapeProcessorRegistry
+{
+public:
+ static TapeProcessorRegistry* instance();
+ void RegisterTapeProcessor(TapeProcessorRef& TapeProcessor);
+ TapeProcessorRef GetNewTapeProcessor(CStdString& TapeProcessorName);
+
+ void RunProcessingChain(AudioTapeRef&);
+ void CreateProcessingChain();
+private:
+ TapeProcessorRegistry();
+ static TapeProcessorRegistry* m_singleton;
+
+ std::list<TapeProcessorRef> m_TapeProcessors;
+ TapeProcessorRef m_firstTapeProcessor;
};
#endif
\ No newline at end of file diff --git a/orkbasecxx/messages/TapeMsg.h b/orkbasecxx/messages/TapeMsg.h index 491c456..f83805e 100644 --- a/orkbasecxx/messages/TapeMsg.h +++ b/orkbasecxx/messages/TapeMsg.h @@ -16,7 +16,6 @@ #include "messages/SyncMessage.h" #include "messages/AsyncMessage.h" -#include "AudioTape.h" #define TAPE_MESSAGE_NAME "tape" #define REC_ID_PARAM "recid" @@ -81,5 +80,7 @@ public: bool m_deleteTape; }; +typedef boost::shared_ptr<TapeResponse> TapeResponseRef; + #endif |