summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenri Herscher <henri@oreka.org>2006-07-05 01:59:32 +0000
committerHenri Herscher <henri@oreka.org>2006-07-05 01:59:32 +0000
commitc27745d1b387606b7e1a5869b7fe4b566410720d (patch)
tree978a3e5176ff6db2f6b0856d8031e662b68ff03d
parent0d758fbcf5a581ca5909245cef65c00652219283 (diff)
* Tape processor interface becomes usable and used
* Reporting and BatchProcessing become "standard" tape processors * Imediate processing kicks off the tape processor chain * Object now references Serializer * ConfigManager singleton not an ACE singleton anymore because ACE singletons are not unique across DLL boundaries. git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@296 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r--orkaudio/BatchProcessing.cpp284
-rw-r--r--orkaudio/BatchProcessing.h28
-rw-r--r--orkaudio/CapturePluginProxy.cpp2
-rw-r--r--orkaudio/ImmediateProcessing.cpp10
-rw-r--r--orkaudio/OrkAudio.cpp12
-rw-r--r--orkaudio/Reporting.cpp46
-rw-r--r--orkaudio/Reporting.h18
-rw-r--r--orkbasecxx/AudioTape.cpp2
-rw-r--r--orkbasecxx/AudioTape.h7
-rw-r--r--orkbasecxx/Config.cpp1
-rw-r--r--orkbasecxx/Config.h2
-rw-r--r--orkbasecxx/ConfigManager.cpp6
-rw-r--r--orkbasecxx/ConfigManager.h9
-rw-r--r--orkbasecxx/Filter.h3
-rw-r--r--orkbasecxx/Object.cpp36
-rw-r--r--orkbasecxx/Object.h5
-rw-r--r--orkbasecxx/OrkBase.dsp4
-rw-r--r--orkbasecxx/TapeProcessor.cpp120
-rw-r--r--orkbasecxx/TapeProcessor.h39
-rw-r--r--orkbasecxx/messages/TapeMsg.h3
20 files changed, 405 insertions, 232 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp
index 1769a01..2e3ce29 100644
--- a/orkaudio/BatchProcessing.cpp
+++ b/orkaudio/BatchProcessing.cpp
@@ -14,14 +14,19 @@
#include "ConfigManager.h"
#include "BatchProcessing.h"
-#include "Reporting.h"
-#include "LogManager.h"
#include "ace/OS_NS_unistd.h"
#include "audiofile/LibSndFileFile.h"
#include "Daemon.h"
#include "Filter.h"
-BatchProcessing BatchProcessing::m_batchProcessingSingleton;
+TapeProcessorRef BatchProcessing::m_singleton;
+
+void BatchProcessing::Initialize()
+{
+ m_singleton.reset(new BatchProcessing());
+ TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
+}
+
BatchProcessing::BatchProcessing()
{
@@ -33,13 +38,17 @@ BatchProcessing::BatchProcessing()
m_currentDay = date.tm_mday;
}
+CStdString __CDECL__ BatchProcessing::GetName()
+{
+ return "BatchProcessing";
+}
-BatchProcessing* BatchProcessing::GetInstance()
+TapeProcessorRef BatchProcessing::Instanciate()
{
- return &m_batchProcessingSingleton;
+ return m_singleton;
}
-void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef)
+void BatchProcessing::AddAudioTape(AudioTapeRef& audioTapeRef)
{
if (!m_audioTapeQueue.push(audioTapeRef))
{
@@ -53,70 +62,17 @@ void BatchProcessing::SetQueueSize(int size)
m_audioTapeQueue.setSize(size);
}
-void BatchProcessing::TapeDropRegistration(CStdString& filename)
-{
- MutexSentinel sentinel(m_tapeDropMutex);
-
- CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename;
- if (ACE_OS::unlink((PCSTR)absoluteFilename) != 0)
- {
- LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not delete tape: " + filename);
- m_tapesToDrop.insert(std::make_pair(filename, time(NULL)));
- }
- else
- {
- LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename);
- }
-}
-
-bool BatchProcessing::DropTapeIfNeeded(CStdString& filename)
-{
- bool shouldDrop = false;
-
- MutexSentinel sentinel(m_tapeDropMutex);
-
- std::map<CStdString, time_t>::iterator pair;
- pair = m_tapesToDrop.find(filename);
- if(pair != m_tapesToDrop.end())
- {
- shouldDrop = true;
- CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename;
- if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
- {
- LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename);
- m_tapesToDrop.erase(filename);
- }
- else
- {
- LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename);
- }
- }
-
- TapeDropHousekeeping();
-
- return shouldDrop;
-}
-
-void BatchProcessing::TapeDropHousekeeping()
-{
- struct tm date = {0};
- time_t now = time(NULL);
- ACE_OS::localtime_r(&now, &date);
- if(m_currentDay != date.tm_mday)
- {
- // another day has passed away ... clear possible leftovers
- m_currentDay = date.tm_mday;
- m_tapesToDrop.clear();
- }
-}
-
-
-
void BatchProcessing::ThreadHandler(void *args)
{
CStdString debug;
- BatchProcessing* pBatchProcessing = BatchProcessing::GetInstance();
+ TapeProcessorRef batchProcessing = TapeProcessorRegistry::instance()->GetNewTapeProcessor(CStdString("BatchProcessing"));
+ if(batchProcessing.get() == NULL)
+ {
+ LOG4CXX_ERROR(LOG.batchProcessingLog, "Could not instanciate BatchProcessing");
+ return;
+ }
+ BatchProcessing* pBatchProcessing = (BatchProcessing*)(batchProcessing->Instanciate().get());
pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize);
@@ -151,126 +107,114 @@ void BatchProcessing::ThreadHandler(void *args)
fileRef = audioTapeRef->GetAudioFileRef();
CStdString filename = audioTapeRef->GetFilename();
- //if(pBatchProcessing->DropTapeIfNeeded(filename) == true)
- //{
- // // The tape we have pulled has been dropped in the meantime. just delete the capture file
- // if(CONFIG.m_deleteNativeFile)
- // {
- // fileRef->Delete();
- // }
- //}
- //else
- //{
- // Let's work on the tape we have pulled
- //CStdString threadIdString = IntToString(threadId);
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier());
-
- fileRef->MoveOrig();
- fileRef->Open(AudioFile::READ);
-
- AudioChunkRef chunkRef;
- AudioChunkRef tmpChunkRef;
-
- switch(CONFIG.m_storageAudioFormat)
- {
- case FfUlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
- break;
- case FfAlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
- break;
- case FfGsm:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
- break;
- case FfPcmWav:
- default:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
- }
+ // Let's work on the tape we have pulled
+ //CStdString threadIdString = IntToString(threadId);
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier());
+
+ fileRef->MoveOrig();
+ fileRef->Open(AudioFile::READ);
+
+ AudioChunkRef chunkRef;
+ AudioChunkRef tmpChunkRef;
+
+ switch(CONFIG.m_storageAudioFormat)
+ {
+ case FfUlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
+ break;
+ case FfAlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
+ break;
+ case FfGsm:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
+ break;
+ case FfPcmWav:
+ default:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
+ }
- FilterRef filter;
- FilterRef decoder1;
- FilterRef decoder2;
+ FilterRef filter;
+ FilterRef decoder1;
+ FilterRef decoder2;
- bool firstChunk = true;
- bool voIpSession = false;
+ bool firstChunk = true;
+ bool voIpSession = false;
- while(fileRef->ReadChunkMono(chunkRef))
+ while(fileRef->ReadChunkMono(chunkRef))
+ {
+ AudioChunkDetails details = *chunkRef->GetDetails();
+ if(firstChunk && details.m_rtpPayloadType != -1)
{
- AudioChunkDetails details = *chunkRef->GetDetails();
- if(firstChunk && details.m_rtpPayloadType != -1)
+ CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " RTP payload type:" + rtpPayloadType);
+
+ CStdString filterName("RtpMixer");
+ filter = FilterRegistry::instance()->GetNewFilter(filterName);
+ if(filter.get() == NULL)
{
- CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType);
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " RTP payload type:" + rtpPayloadType);
-
- CStdString filterName("RtpMixer");
- filter = FilterRegistry::instance()->GetNewFilter(filterName);
- if(filter.get() == NULL)
- {
- debug = "Could not instanciate RTP mixer";
- throw(debug);
- }
- decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
- decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
- if(decoder1.get() == NULL || decoder2.get() == NULL)
- {
- debug.Format("Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType);
- throw(debug);
- }
- voIpSession = true;
+ debug = "Could not instanciate RTP mixer";
+ throw(debug);
}
- if(firstChunk)
+ decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
+ decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
+ if(decoder1.get() == NULL || decoder2.get() == NULL)
{
- firstChunk = false;
-
- // At this point, we know we have the right codec, open the output file
- CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier();
- outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
- }
- if(voIpSession)
- {
- if(details.m_channel == 2)
- {
- decoder2->AudioChunkIn(chunkRef);
- decoder2->AudioChunkOut(tmpChunkRef);
- }
- else
- {
- decoder1->AudioChunkIn(chunkRef);
- decoder1->AudioChunkOut(tmpChunkRef);
- }
- filter->AudioChunkIn(tmpChunkRef);
- filter->AudioChunkOut(tmpChunkRef);
+ debug.Format("Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType);
+ throw(debug);
}
- outFileRef->WriteChunk(tmpChunkRef);
+ voIpSession = true;
+ }
+ if(firstChunk)
+ {
+ firstChunk = false;
- if(CONFIG.m_batchProcessingEnhancePriority == false)
+ // At this point, we know we have the right codec, open the output file
+ CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier();
+ outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
+ }
+ if(voIpSession)
+ {
+ if(details.m_channel == 2)
{
- // Give up CPU between every audio buffer to make sure the actual recording always has priority
- //ACE_Time_Value yield;
- //yield.set(0,1); // 1 us
- //ACE_OS::sleep(yield);
-
- // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only.
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 1;
- ACE_OS::nanosleep (&ts, NULL);
+ decoder2->AudioChunkIn(chunkRef);
+ decoder2->AudioChunkOut(tmpChunkRef);
}
+ else
+ {
+ decoder1->AudioChunkIn(chunkRef);
+ decoder1->AudioChunkOut(tmpChunkRef);
+ }
+ filter->AudioChunkIn(tmpChunkRef);
+ filter->AudioChunkOut(tmpChunkRef);
}
+ outFileRef->WriteChunk(tmpChunkRef);
- fileRef->Close();
- outFileRef->Close();
-
- if(CONFIG.m_deleteNativeFile)
+ if(CONFIG.m_batchProcessingEnhancePriority == false)
{
- fileRef->Delete();
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
+ // Give up CPU between every audio buffer to make sure the actual recording always has priority
+ //ACE_Time_Value yield;
+ //yield.set(0,1); // 1 us
+ //ACE_OS::sleep(yield);
+
+ // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only.
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1;
+ ACE_OS::nanosleep (&ts, NULL);
}
- //CStdString filename = audioTapeRef->GetFilename();
- //pBatchProcessing->DropTapeIfNeeded(filename); // maybe the tape was dropped while we were processing it
+ }
+
+ fileRef->Close();
+ outFileRef->Close();
+
+ if(CONFIG.m_deleteNativeFile)
+ {
+ fileRef->Delete();
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
+ }
- Reporting::GetInstance()->AddAudioTape(audioTapeRef);
- //}
+ // Finished processing the tape, pass on to next processor
+ pBatchProcessing->RunNextProcessor(audioTapeRef);
}
}
catch (CStdString& e)
diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h
index ec9c88b..09eb1da 100644
--- a/orkaudio/BatchProcessing.h
+++ b/orkaudio/BatchProcessing.h
@@ -15,35 +15,39 @@
#define __BATCHPROCESSING_H__
#include "ThreadSafeQueue.h"
+#include "TapeProcessor.h"
#include "AudioTape.h"
#include "ace/Thread_Mutex.h"
#include <map>
-class BatchProcessing
+class BatchProcessing;
+typedef boost::shared_ptr<BatchProcessing> BatchProcessingRef;
+
+/**
+ * This tape processor handles the audio transcoding
+ */
+class BatchProcessing : public TapeProcessor
{
public:
- static BatchProcessing* GetInstance();
+ static void Initialize();
+
+ CStdString __CDECL__ GetName();
+ TapeProcessorRef __CDECL__ Instanciate();
+ void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef);
+
+
static void ThreadHandler(void *args);
- void AddAudioTape(AudioTapeRef audioTapeRef);
void SetQueueSize(int size);
- /** Ask for a tape to be deleted from disk */
- void TapeDropRegistration(CStdString& filename);
private:
BatchProcessing();
+ static TapeProcessorRef m_singleton;
- bool DropTapeIfNeeded(CStdString&filename);
- void TapeDropHousekeeping();
-
- static BatchProcessing m_batchProcessingSingleton;
ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
size_t m_threadCount;
ACE_Thread_Mutex m_mutex;
-
- ACE_Thread_Mutex m_tapeDropMutex;
- std::map<CStdString, time_t> m_tapesToDrop;
int m_currentDay;
};
diff --git a/orkaudio/CapturePluginProxy.cpp b/orkaudio/CapturePluginProxy.cpp
index c378d0c..ce6eb7d 100644
--- a/orkaudio/CapturePluginProxy.cpp
+++ b/orkaudio/CapturePluginProxy.cpp
@@ -93,7 +93,7 @@ bool CapturePluginProxy::Initialize()
m_configureFunction = (ConfigureFunction)m_dll.symbol("Configure");
if (m_configureFunction)
{
- ConfigManagerSingleton::instance()->AddConfigureFunction(m_configureFunction);
+ ConfigManager::Instance()->AddConfigureFunction(m_configureFunction);
m_initializeFunction = (InitializeFunction)m_dll.symbol("Initialize");
if (m_initializeFunction)
diff --git a/orkaudio/ImmediateProcessing.cpp b/orkaudio/ImmediateProcessing.cpp
index 5aeb1dc..181e659 100644
--- a/orkaudio/ImmediateProcessing.cpp
+++ b/orkaudio/ImmediateProcessing.cpp
@@ -12,12 +12,16 @@
*/
#pragma warning( disable: 4786 )
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
#include "ImmediateProcessing.h"
#include "LogManager.h"
#include "ace/OS_NS_unistd.h"
#include "BatchProcessing.h"
#include "Daemon.h"
#include "ConfigManager.h"
+#include "TapeProcessor.h"
+
ImmediateProcessing ImmediateProcessing::m_immediateProcessingSingleton;
@@ -46,7 +50,6 @@ void ImmediateProcessing::ThreadHandler(void *args)
CStdString logMsg;
ImmediateProcessing* pImmediateProcessing = ImmediateProcessing::GetInstance();
-
pImmediateProcessing->SetQueueSize(CONFIG.m_immediateProcessingQueueSize);
logMsg.Format("thread starting - queue size:%d", CONFIG.m_immediateProcessingQueueSize);
@@ -75,8 +78,8 @@ void ImmediateProcessing::ThreadHandler(void *args)
if (audioTapeRef->IsReadyForBatchProcessing())
{
- // Forward to batch processing thread
- BatchProcessing::GetInstance()->AddAudioTape(audioTapeRef);
+ // Pass the tape to the tape processor chain
+ TapeProcessorRegistry::instance()->RunProcessingChain(audioTapeRef);
}
}
}
@@ -88,4 +91,3 @@ void ImmediateProcessing::ThreadHandler(void *args)
LOG4CXX_INFO(LOG.immediateProcessingLog, CStdString("Exiting thread"));
}
-
diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp
index 37a3f58..5a161b7 100644
--- a/orkaudio/OrkAudio.cpp
+++ b/orkaudio/OrkAudio.cpp
@@ -37,6 +37,7 @@
#include "CapturePluginProxy.h"
#include "AudioCapturePlugin.h"
#include "Filter.h"
+#include "TapeProcessor.h"
#include <list>
@@ -135,7 +136,10 @@ void MainThread()
objRef.reset(new TestMsg);
ObjectFactorySingleton::instance()->RegisterObject(objRef);
- ConfigManagerSingleton::instance()->Initialize();
+ ConfigManager::Instance()->Initialize();
+
+ std::list<ACE_DLL> pluginDlls;
+ LoadPlugins(pluginDlls);
// Register in-built filters
FilterRef filter(new AlawToPcmFilter());
@@ -143,8 +147,10 @@ void MainThread()
filter.reset(new UlawToPcmFilter());
FilterRegistry::instance()->RegisterFilter(filter);
- std::list<ACE_DLL> pluginDlls;
- LoadPlugins(pluginDlls);
+ // Register in-built tape processors and build the processing chain
+ BatchProcessing::Initialize();
+ Reporting::Initialize();
+ TapeProcessorRegistry::instance()->CreateProcessingChain();
if (!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(ImmediateProcessing::ThreadHandler)))
{
diff --git a/orkaudio/Reporting.cpp b/orkaudio/Reporting.cpp
index bd71abf..72c3f49 100644
--- a/orkaudio/Reporting.cpp
+++ b/orkaudio/Reporting.cpp
@@ -19,22 +19,33 @@
#include "messages/TapeMsg.h"
#include "OrkClient.h"
#include "Daemon.h"
-#include "BatchProcessing.h"
-Reporting Reporting::m_reportingSingleton;
+TapeProcessorRef Reporting::m_singleton;
+
+void Reporting::Initialize()
+{
+ m_singleton.reset(new Reporting());
+ TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton);
+}
+
Reporting::Reporting()
{
m_queueFullError = false;
}
-Reporting* Reporting::GetInstance()
+CStdString __CDECL__ Reporting::GetName()
+{
+ return "Reporting";
+}
+
+TapeProcessorRef Reporting::Instanciate()
{
- return &m_reportingSingleton;
+ return m_singleton;
}
-void Reporting::AddAudioTape(AudioTapeRef audioTapeRef)
+void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
{
if (m_audioTapeQueue.push(audioTapeRef))
{
@@ -53,7 +64,14 @@ void Reporting::AddAudioTape(AudioTapeRef audioTapeRef)
void Reporting::ThreadHandler(void *args)
{
- Reporting* pReporting = Reporting::GetInstance();
+ TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(CStdString("Reporting"));
+ if(reporting.get() == NULL)
+ {
+ LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting");
+ return;
+ }
+ Reporting* pReporting = (Reporting*)(reporting->Instanciate().get());
+
bool stop = false;
for(;stop == false;)
@@ -80,21 +98,19 @@ void Reporting::ThreadHandler(void *args)
LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString);
OrkHttpSingleLineClient c;
- TapeResponse tr;
+ TapeResponseRef tr(new TapeResponse());
+ audioTapeRef->m_tapeResponse = tr;
bool success = false;
bool firstError = true;
while (!success)
{
- if (c.Execute((SyncMessage&)(*msgRef.get()), tr, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
+ if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
{
success = true;
- if(tr.m_deleteTape)
+ if(tr->m_deleteTape)
{
- //LOG4CXX_INFO(LOG.reportingLog, "Registered tape for removal: " + audioTapeRef->GetIdentifier());
- //BatchProcessing::GetInstance()->TapeDropRegistration(tapeFilename);
-
CStdString tapeFilename = audioTapeRef->GetFilename();
CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename;
@@ -108,6 +124,11 @@ void Reporting::ThreadHandler(void *args)
}
}
+ else
+ {
+ // Pass the tape to the next processor
+ pReporting->RunNextProcessor(audioTapeRef);
+ }
}
else
{
@@ -131,3 +152,4 @@ void Reporting::ThreadHandler(void *args)
}
+
diff --git a/orkaudio/Reporting.h b/orkaudio/Reporting.h
index e2afb72..e16f647 100644
--- a/orkaudio/Reporting.h
+++ b/orkaudio/Reporting.h
@@ -15,18 +15,26 @@
#define __REPORTING_H__
#include "ThreadSafeQueue.h"
+#include "TapeProcessor.h"
#include "AudioTape.h"
-class Reporting
+class Reporting : public TapeProcessor
{
public:
- Reporting();
- static Reporting* GetInstance();
+ static void Initialize();
+
+ CStdString __CDECL__ GetName();
+ TapeProcessorRef __CDECL__ Instanciate();
+ void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef);
+
+ //static Reporting* GetInstance();
static void ThreadHandler(void *args);
- void AddAudioTape(AudioTapeRef audioTapeRef);
private:
- static Reporting m_reportingSingleton;
+ Reporting();
+ //static Reporting m_reportingSingleton;
+ static TapeProcessorRef m_singleton;
+
ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
bool m_queueFullError;
};
diff --git a/orkbasecxx/AudioTape.cpp b/orkbasecxx/AudioTape.cpp
index 5195e2d..3cb81e5 100644
--- a/orkbasecxx/AudioTape.cpp
+++ b/orkbasecxx/AudioTape.cpp
@@ -12,6 +12,8 @@
*/
#pragma warning( disable: 4786 )
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
#include "ConfigManager.h"
#include "AudioTape.h"
#include "ace/OS_NS_time.h"
diff --git a/orkbasecxx/AudioTape.h b/orkbasecxx/AudioTape.h
index c335deb..cb0eaea 100644
--- a/orkbasecxx/AudioTape.h
+++ b/orkbasecxx/AudioTape.h
@@ -21,7 +21,8 @@
#include <queue>
#include "AudioCapture.h"
#include "audiofile/AudioFile.h"
-#include "messages/Message.h"
+#include "messages/TapeMsg.h"
+
class DLL_IMPORT_EXPORT_ORKBASE AudioTapeDescription : public Object
{
@@ -75,6 +76,7 @@ public:
AudioFileRef GetAudioFileRef();
bool IsReadyForBatchProcessing();
+
CStdString m_portId;
CStdString m_localParty;
CStdString m_localEntryPoint;
@@ -85,6 +87,9 @@ public:
time_t m_duration;
CStdString m_localIp;
CStdString m_remoteIp;
+
+ TapeResponseRef m_tapeResponse;
+
private:
void GenerateFilePathAndIdentifier();
diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp
index 7921218..41d8771 100644
--- a/orkbasecxx/Config.cpp
+++ b/orkbasecxx/Config.cpp
@@ -83,6 +83,7 @@ void Config::Define(Serializer* s)
s->BoolValue(BATCH_PROCESSING_ENHANCE_PRIORITY_PARAM, m_batchProcessingEnhancePriority);
s->BoolValue(DELETE_FAILED_CAPTURE_FILE_PARAM, m_deleteFailedCaptureFile);
s->CsvValue(CAPTURE_PORT_FILTERS_PARAM, m_capturePortFilters);
+ s->CsvValue(TAPE_PROCESSORS_PARAM, m_tapeProcessors);
}
void Config::Validate()
diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h
index adcab40..54a0787 100644
--- a/orkbasecxx/Config.h
+++ b/orkbasecxx/Config.h
@@ -70,6 +70,7 @@
#define DELETE_FAILED_CAPTURE_FILE_PARAM "DeleteFailedCaptureFile"
#define DELETE_FAILED_CAPTURE_FILE_DEFAULT false
#define CAPTURE_PORT_FILTERS_PARAM "CapturePortFilters"
+#define TAPE_PROCESSORS_PARAM "TapeProcessors"
class DLL_IMPORT_EXPORT_ORKBASE Config : public Object
{
@@ -109,6 +110,7 @@ public:
bool m_batchProcessingEnhancePriority;
bool m_deleteFailedCaptureFile;
std::list<CStdString> m_capturePortFilters;
+ std::list<CStdString> m_tapeProcessors;
private:
log4cxx::LoggerPtr m_log;
diff --git a/orkbasecxx/ConfigManager.cpp b/orkbasecxx/ConfigManager.cpp
index c805599..e92312e 100644
--- a/orkbasecxx/ConfigManager.cpp
+++ b/orkbasecxx/ConfigManager.cpp
@@ -24,6 +24,12 @@
#define CONFIG_FILE_NAME "config.xml"
#define ETC_CONFIG_FILE_NAME "/etc/orkaudio/config.xml"
+ConfigManager ConfigManager::m_singleton;
+
+ConfigManager* ConfigManager::Instance()
+{
+ return &m_singleton;
+}
void ConfigManager::Initialize()
{
diff --git a/orkbasecxx/ConfigManager.h b/orkbasecxx/ConfigManager.h
index 538dfbf..47efcae 100644
--- a/orkbasecxx/ConfigManager.h
+++ b/orkbasecxx/ConfigManager.h
@@ -15,25 +15,26 @@
#define __CONFIGMANAGER_H__
#include <list>
-#include "ace/Singleton.h"
#include "Config.h"
#include "AudioCapturePlugin.h"
+class ConfigManager;
+
class DLL_IMPORT_EXPORT_ORKBASE ConfigManager
{
public:
+ static ConfigManager* Instance();
void Initialize();
void AddConfigureFunction(ConfigureFunction);
Config m_config;
private:
+ static ConfigManager m_singleton;
std::list<ConfigureFunction> m_configureFunctions;
DOMNode* m_configTopNode;
};
-typedef ACE_Singleton<ConfigManager, ACE_Thread_Mutex> ConfigManagerSingleton;
-
-#define CONFIG ConfigManagerSingleton::instance()->m_config
+#define CONFIG ConfigManager::Instance()->m_config
#endif
diff --git a/orkbasecxx/Filter.h b/orkbasecxx/Filter.h
index 48dc9a0..d6b0c2c 100644
--- a/orkbasecxx/Filter.h
+++ b/orkbasecxx/Filter.h
@@ -14,7 +14,6 @@
#define __FILTER_H__
#include <list>
-#include "ace/Singleton.h"
#include "AudioCapture.h"
#include "dll.h"
#include "OrkBase.h"
@@ -81,7 +80,6 @@ private:
};
//===================================================================
-
/** Filter Registry
*/
class DLL_IMPORT_EXPORT_ORKBASE FilterRegistry
@@ -99,6 +97,5 @@ private:
static FilterRegistry* m_singleton;
};
-//typedef ACE_Singleton<FilterRegistry, ACE_Thread_Mutex> FilterRegistrySingleton;
#endif
diff --git a/orkbasecxx/Object.cpp b/orkbasecxx/Object.cpp
index 66ceeb9..cd6ed79 100644
--- a/orkbasecxx/Object.cpp
+++ b/orkbasecxx/Object.cpp
@@ -19,36 +19,48 @@
CStdString Object::SerializeSingleLine()
{
- SingleLineSerializer serializer(this);
- return serializer.Serialize();
+ SingleLineSerializer* serializer = new SingleLineSerializer(this);
+ m_serializer.reset(serializer);
+ return serializer->Serialize();
}
void Object::DeSerializeSingleLine(CStdString& input)
{
- SingleLineSerializer serializer(this);
- serializer.DeSerialize(input);
+ SingleLineSerializer* serializer = new SingleLineSerializer(this);
+ m_serializer.reset(serializer);
+ serializer->DeSerialize(input);
}
void Object::SerializeDom(XERCES_CPP_NAMESPACE::DOMDocument* doc)
{
- DomSerializer serializer(this);
- serializer.Serialize(doc);
+
+ DomSerializer* serializer = new DomSerializer(this);
+ m_serializer.reset(serializer);
+ serializer->Serialize(doc);
}
void Object::DeSerializeDom(DOMNode* doc)
{
- DomSerializer serializer(this);
- serializer.DeSerialize(doc);
+ DomSerializer* serializer = new DomSerializer(this);
+ m_serializer.reset(serializer);
+ serializer->DeSerialize(doc);
}
CStdString Object::SerializeUrl()
{
- UrlSerializer serializer(this);
- return serializer.Serialize();
+ UrlSerializer* serializer = new UrlSerializer(this);
+ m_serializer.reset(serializer);
+ return serializer->Serialize();
}
void Object::DeSerializeUrl(CStdString& input)
{
- UrlSerializer serializer(this);
- serializer.DeSerialize(input);
+ UrlSerializer* serializer = new UrlSerializer(this);
+ m_serializer.reset(serializer);
+ serializer->DeSerialize(input);
+}
+
+SerializerRef Object::GetSerializer()
+{
+ return m_serializer;
}
diff --git a/orkbasecxx/Object.h b/orkbasecxx/Object.h
index 5674e55..64f5dc2 100644
--- a/orkbasecxx/Object.h
+++ b/orkbasecxx/Object.h
@@ -50,6 +50,11 @@ public:
virtual ObjectRef NewInstance() = 0;
virtual ObjectRef Process() = 0;
+
+ boost::shared_ptr<Serializer> GetSerializer();
+
+private:
+ boost::shared_ptr<Serializer> m_serializer;
};
diff --git a/orkbasecxx/OrkBase.dsp b/orkbasecxx/OrkBase.dsp
index d9dcc74..2b959ce 100644
--- a/orkbasecxx/OrkBase.dsp
+++ b/orkbasecxx/OrkBase.dsp
@@ -447,6 +447,10 @@ SOURCE=.\StdString.h
# End Source File
# Begin Source File
+SOURCE=.\TapeProcessor.cpp
+# End Source File
+# Begin Source File
+
SOURCE=.\TapeProcessor.h
# End Source File
# Begin Source File
diff --git a/orkbasecxx/TapeProcessor.cpp b/orkbasecxx/TapeProcessor.cpp
new file mode 100644
index 0000000..a3d055b
--- /dev/null
+++ b/orkbasecxx/TapeProcessor.cpp
@@ -0,0 +1,120 @@
+/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+
+#pragma warning( disable: 4786 )
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include <log4cxx/logger.h>
+#include "TapeProcessor.h"
+#include "ConfigManager.h"
+
+using namespace log4cxx;
+
+TapeProcessor::TapeProcessor()
+{
+ ;
+}
+
+void TapeProcessor::SetNextProcessor(TapeProcessorRef& nextProcessor)
+{
+ m_nextProcessor = nextProcessor;
+}
+
+void TapeProcessor::RunNextProcessor(AudioTapeRef& tape)
+{
+ if(m_nextProcessor.get())
+ {
+ m_nextProcessor->AddAudioTape(tape);
+ }
+}
+
+
+//=====================================================
+LoggerPtr s_log;
+
+TapeProcessorRegistry::TapeProcessorRegistry()
+{
+ s_log = Logger::getLogger("tape.taperegistry");
+}
+
+
+void TapeProcessorRegistry::RegisterTapeProcessor(TapeProcessorRef& tapeProcessor)
+{
+ m_TapeProcessors.push_back(tapeProcessor);
+ LOG4CXX_INFO(s_log, CStdString("Registered processor: ") + tapeProcessor->GetName());
+}
+
+
+TapeProcessorRef TapeProcessorRegistry::GetNewTapeProcessor(CStdString& TapeProcessorName)
+{
+ for(std::list<TapeProcessorRef>::iterator it = m_TapeProcessors.begin(); it!=m_TapeProcessors.end(); it++)
+ {
+ TapeProcessorRef TapeProcessor = *it;
+
+ if( TapeProcessor->GetName().CompareNoCase(TapeProcessorName) == 0 )
+ {
+ return TapeProcessor->Instanciate();
+ }
+ }
+ return TapeProcessorRef(); // No TapeProcessor found
+}
+
+TapeProcessorRegistry* TapeProcessorRegistry::m_singleton = 0;
+
+TapeProcessorRegistry* TapeProcessorRegistry::instance()
+{
+ if(m_singleton == NULL)
+ {
+ m_singleton = new TapeProcessorRegistry();
+ }
+ return m_singleton;
+}
+
+void TapeProcessorRegistry::CreateProcessingChain()
+{
+ TapeProcessorRef previousProcessor;
+
+ //ConfigManager* cm = ConfigManagerSingleton::instance();
+
+ for(std::list<CStdString>::iterator it = CONFIG.m_tapeProcessors.begin(); it != CONFIG.m_tapeProcessors.end(); it++)
+ {
+ CStdString tapeProcessorName = *it;
+ TapeProcessorRef processor = GetNewTapeProcessor(tapeProcessorName);
+ if(processor.get())
+ {
+ if(m_firstTapeProcessor.get() == NULL)
+ {
+ m_firstTapeProcessor = processor;
+ }
+ if(previousProcessor.get())
+ {
+ previousProcessor->SetNextProcessor(processor);
+ }
+ previousProcessor = processor;
+ LOG4CXX_DEBUG(s_log, CStdString("Adding processor to chain:") + tapeProcessorName);
+ }
+ else
+ {
+ LOG4CXX_ERROR(s_log, CStdString("Processor:") + tapeProcessorName + " does not exist, please check <TapeProcessors> in config.xml");
+ }
+ }
+}
+
+void TapeProcessorRegistry::RunProcessingChain(AudioTapeRef& tape)
+{
+ if(m_firstTapeProcessor.get())
+ {
+ m_firstTapeProcessor->AddAudioTape(tape);
+ }
+}
diff --git a/orkbasecxx/TapeProcessor.h b/orkbasecxx/TapeProcessor.h
index 2b60fc8..a177480 100644
--- a/orkbasecxx/TapeProcessor.h
+++ b/orkbasecxx/TapeProcessor.h
@@ -16,22 +16,53 @@
//#include <list>
//#include "ace/Singleton.h"
#include "AudioCapture.h"
+#include "AudioTape.h"
#include "dll.h"
#include "OrkBase.h"
-class TapeProcessor;
+
+class DLL_IMPORT_EXPORT_ORKBASE TapeProcessor;
typedef boost::shared_ptr<TapeProcessor> TapeProcessorRef;
/** TapeProcessor Interface
- * a filter is a black box that takes media chunks as an input and produces media chunks as an output
- * it can be translating between two encodings (codec) or just processing the signal
+ * a Tape Processor is a black box that takes Audio Tapes as an input and
+ * processes them.
*/
class DLL_IMPORT_EXPORT_ORKBASE TapeProcessor
{
public:
+ TapeProcessor();
+
+ virtual CStdString __CDECL__ GetName() = 0;
virtual TapeProcessorRef __CDECL__ Instanciate() = 0;
- virtual void __CDECL__ AddAudioTape(AudioTapeRef audioTapeRef) = 0;
+ virtual void __CDECL__ AddAudioTape(AudioTapeRef&) = 0;
+
+ void SetNextProcessor(TapeProcessorRef& nextProcessor);
+ void RunNextProcessor(AudioTapeRef&);
+
+protected:
+ TapeProcessorRef m_nextProcessor;
+};
+
+//===================================================================
+/** TapeProcessor Registry
+*/
+class DLL_IMPORT_EXPORT_ORKBASE TapeProcessorRegistry
+{
+public:
+ static TapeProcessorRegistry* instance();
+ void RegisterTapeProcessor(TapeProcessorRef& TapeProcessor);
+ TapeProcessorRef GetNewTapeProcessor(CStdString& TapeProcessorName);
+
+ void RunProcessingChain(AudioTapeRef&);
+ void CreateProcessingChain();
+private:
+ TapeProcessorRegistry();
+ static TapeProcessorRegistry* m_singleton;
+
+ std::list<TapeProcessorRef> m_TapeProcessors;
+ TapeProcessorRef m_firstTapeProcessor;
};
#endif \ No newline at end of file
diff --git a/orkbasecxx/messages/TapeMsg.h b/orkbasecxx/messages/TapeMsg.h
index 491c456..f83805e 100644
--- a/orkbasecxx/messages/TapeMsg.h
+++ b/orkbasecxx/messages/TapeMsg.h
@@ -16,7 +16,6 @@
#include "messages/SyncMessage.h"
#include "messages/AsyncMessage.h"
-#include "AudioTape.h"
#define TAPE_MESSAGE_NAME "tape"
#define REC_ID_PARAM "recid"
@@ -81,5 +80,7 @@ public:
bool m_deleteTape;
};
+typedef boost::shared_ptr<TapeResponse> TapeResponseRef;
+
#endif