From 72fda6ebe7d6245b57178441c6355eb9d2402747 Mon Sep 17 00:00:00 2001 From: Henri Herscher Date: Mon, 30 Jul 2007 14:32:19 +0000 Subject: Added non-lookback recording mode. git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@458 09dcff7a-b715-0410-9601-b79a96267cd0 --- orkbasecxx/BatchProcessing.cpp | 352 ++++++++++++++++++++++++++++++++++++ orkbasecxx/BatchProcessing.h | 55 ++++++ orkbasecxx/CapturePluginProxy.cpp | 230 ++++++++++++++++++++++++ orkbasecxx/CapturePluginProxy.h | 55 ++++++ orkbasecxx/CapturePort.cpp | 358 +++++++++++++++++++++++++++++++++++++ orkbasecxx/CapturePort.h | 78 ++++++++ orkbasecxx/Daemon.cpp | 284 +++++++++++++++++++++++++++++ orkbasecxx/Daemon.h | 58 ++++++ orkbasecxx/ImmediateProcessing.cpp | 101 +++++++++++ orkbasecxx/ImmediateProcessing.h | 37 ++++ orkbasecxx/OrkBase.dsp | 56 ++++++ orkbasecxx/Reporting.cpp | 263 +++++++++++++++++++++++++++ orkbasecxx/Reporting.h | 64 +++++++ orkbasecxx/messages/Makefile.am | 4 +- orkbasecxx/messages/Message.h | 2 + orkbasecxx/messages/RecordMsg.cpp | 51 ++++++ orkbasecxx/messages/RecordMsg.h | 34 ++++ 17 files changed, 2080 insertions(+), 2 deletions(-) create mode 100644 orkbasecxx/BatchProcessing.cpp create mode 100644 orkbasecxx/BatchProcessing.h create mode 100644 orkbasecxx/CapturePluginProxy.cpp create mode 100644 orkbasecxx/CapturePluginProxy.h create mode 100644 orkbasecxx/CapturePort.cpp create mode 100644 orkbasecxx/CapturePort.h create mode 100644 orkbasecxx/Daemon.cpp create mode 100644 orkbasecxx/Daemon.h create mode 100644 orkbasecxx/ImmediateProcessing.cpp create mode 100644 orkbasecxx/ImmediateProcessing.h create mode 100644 orkbasecxx/Reporting.cpp create mode 100644 orkbasecxx/Reporting.h create mode 100644 orkbasecxx/messages/RecordMsg.cpp create mode 100644 orkbasecxx/messages/RecordMsg.h (limited to 'orkbasecxx') diff --git a/orkbasecxx/BatchProcessing.cpp b/orkbasecxx/BatchProcessing.cpp new file mode 100644 index 0000000..1700595 --- /dev/null +++ b/orkbasecxx/BatchProcessing.cpp @@ -0,0 +1,352 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ +#pragma warning( disable: 4786 ) + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include +#include + +#include "ConfigManager.h" +#include "BatchProcessing.h" +#include "ace/OS_NS_unistd.h" +#include "audiofile/LibSndFileFile.h" +#include "Daemon.h" +#include "Filter.h" +#include "Reporting.h" + +TapeProcessorRef BatchProcessing::m_singleton; + +void BatchProcessing::Initialize() +{ + if(m_singleton.get() == NULL) + { + m_singleton.reset(new BatchProcessing()); + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); + } +} + + +BatchProcessing::BatchProcessing() +{ + m_threadCount = 0; + + struct tm date = {0}; + time_t now = time(NULL); + ACE_OS::localtime_r(&now, &date); + m_currentDay = date.tm_mday; +} + +CStdString __CDECL__ BatchProcessing::GetName() +{ + return "BatchProcessing"; +} + +TapeProcessorRef BatchProcessing::Instanciate() +{ + return m_singleton; +} + +void BatchProcessing::AddAudioTape(AudioTapeRef& audioTapeRef) +{ + if (!m_audioTapeQueue.push(audioTapeRef)) + { + // Log error + LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("queue full")); + } +} + +void BatchProcessing::SetQueueSize(int size) +{ + m_audioTapeQueue.setSize(size); +} + +void BatchProcessing::ThreadHandler(void *args) +{ + CStdString debug; + CStdString logMsg; + + CStdString processorName("BatchProcessing"); + TapeProcessorRef batchProcessing = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName); + if(batchProcessing.get() == NULL) + { + LOG4CXX_ERROR(LOG.batchProcessingLog, "Could not instanciate BatchProcessing"); + return; + } + BatchProcessing* pBatchProcessing = (BatchProcessing*)(batchProcessing->Instanciate().get()); + + pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize); + + int threadId = 0; + { + MutexSentinel sentinel(pBatchProcessing->m_mutex); + threadId = pBatchProcessing->m_threadCount++; + } + CStdString threadIdString = IntToString(threadId); + debug.Format("thread Th%s starting - queue size:%d", threadIdString, CONFIG.m_batchProcessingQueueSize); + LOG4CXX_INFO(LOG.batchProcessingLog, debug); + + bool stop = false; + + for(;stop == false;) + { + AudioFileRef fileRef; + AudioFileRef outFileRef; + AudioTapeRef audioTapeRef; + CStdString trackingId = "[no-trk]"; + + try + { + audioTapeRef = pBatchProcessing->m_audioTapeQueue.pop(); + if(audioTapeRef.get() == NULL) + { + if(Daemon::Singleton()->IsStopping()) + { + stop = true; + } + if(Daemon::Singleton()->GetShortLived()) + { + Daemon::Singleton()->Stop(); + } + } + else + { + fileRef = audioTapeRef->GetAudioFileRef(); + trackingId = audioTapeRef->m_trackingId; + + // Let's work on the tape we have pulled + //CStdString threadIdString = IntToString(threadId); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " processing " + audioTapeRef->GetIdentifier()); + + //fileRef->MoveOrig(); // #### could do this only when original and output file have the same extension. Irrelevant for now as everything is captured as mcf file + fileRef->Open(AudioFile::READ); + + AudioChunkRef chunkRef; + AudioChunkRef tmpChunkRef; + + switch(CONFIG.m_storageAudioFormat) + { + case FfUlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV)); + break; + case FfAlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV)); + break; + case FfGsm: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV)); + break; + case FfPcmWav: + default: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV)); + } + + FilterRef filter; + FilterRef decoder1; + FilterRef decoder2; + FilterRef decoder; + + std::bitset seenRtpPayloadTypes; + std::vector decoders1; + std::vector decoders2; + for(int pt=0; ptGetNewFilter(pt); + decoders1.push_back(decoder1); + decoder2 = FilterRegistry::instance()->GetNewFilter(pt); + decoders2.push_back(decoder2); + } + + bool firstChunk = true; + bool voIpSession = false; + + size_t numSamplesS1 = 0; + size_t numSamplesS2 = 0; + size_t numSamplesOut = 0; + + while(fileRef->ReadChunkMono(chunkRef)) + { + // ############ HACK + //ACE_Time_Value yield; + //yield.set(0,1); + //ACE_OS::sleep(yield); + // ############ HACK + + AudioChunkDetails details = *chunkRef->GetDetails(); + decoder.reset(); + + if(details.m_rtpPayloadType < -1 || details.m_rtpPayloadType > RTP_PAYLOAD_TYPE_MAX) + { + logMsg.Format("RTP payload type out of bound:%d", details.m_rtpPayloadType); + throw(logMsg); + } + + // Instanciate any decoder we might need during a VoIP session + if(details.m_rtpPayloadType != -1) + { + voIpSession = true; + + if(details.m_channel == 2) + { + decoder2 = decoders2.at(details.m_rtpPayloadType); + decoder = decoder2; + } + else + { + decoder1 = decoders1.at(details.m_rtpPayloadType); + decoder = decoder1; + } + + bool ptAlreadySeen = seenRtpPayloadTypes.test(details.m_rtpPayloadType); + seenRtpPayloadTypes.set(details.m_rtpPayloadType); + + if(decoder.get() == NULL) + { + if(ptAlreadySeen == false) + { + // First time we see a particular unsupported payload type in this session, log it + CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType); + LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " unsupported RTP payload type:" + rtpPayloadType); + } + // We cannot decode this chunk due to unknown codec, go to next chunk + continue; + } + else if(ptAlreadySeen == false) + { + // First time we see a particular supported payload type in this session, log it + CStdString rtpPayloadType = IntToString(details.m_rtpPayloadType); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " RTP payload type:" + rtpPayloadType); + } + } + if(!voIpSession || (firstChunk && decoder.get())) + { + firstChunk = false; + + // At this point, we know we have a working codec, create an RTP mixer and open the output file + if(voIpSession) + { + CStdString filterName("RtpMixer"); + filter = FilterRegistry::instance()->GetNewFilter(filterName); + if(filter.get() == NULL) + { + debug = "Could not instanciate RTP mixer"; + throw(debug); + } + } + + CStdString path = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath(); + FileRecursiveMkdir(path); + + CStdString file = path + "/" + audioTapeRef->GetIdentifier(); + outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate()); + } + if(voIpSession) + { + if(details.m_channel == 2) + { + decoder2->AudioChunkIn(chunkRef); + decoder2->AudioChunkOut(tmpChunkRef); + if(tmpChunkRef.get()) + { + numSamplesS2 += tmpChunkRef->GetNumSamples(); + } + } + else + { + decoder1->AudioChunkIn(chunkRef); + decoder1->AudioChunkOut(tmpChunkRef); + if(tmpChunkRef.get()) + { + numSamplesS1 += tmpChunkRef->GetNumSamples(); + } + } + filter->AudioChunkIn(tmpChunkRef); + filter->AudioChunkOut(tmpChunkRef); + } + outFileRef->WriteChunk(tmpChunkRef); + if(tmpChunkRef.get()) + { + numSamplesOut += tmpChunkRef->GetNumSamples(); + } + + if(CONFIG.m_batchProcessingEnhancePriority == false) + { + // Give up CPU between every audio buffer to make sure the actual recording always has priority + //ACE_Time_Value yield; + //yield.set(0,1); // 1 us + //ACE_OS::sleep(yield); + + // Use this instead, even if it still seems this holds the whole process under Linux instead of this thread only. + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + ACE_OS::nanosleep (&ts, NULL); + } + } + + if(voIpSession && !firstChunk) + { + // Flush the RTP mixer + AudioChunkRef stopChunk(new AudioChunk()); + stopChunk->GetDetails()->m_marker = MEDIA_CHUNK_EOS_MARKER; + filter->AudioChunkIn(stopChunk); + filter->AudioChunkOut(tmpChunkRef); + outFileRef->WriteChunk(tmpChunkRef); + if(tmpChunkRef.get()) + { + numSamplesOut += tmpChunkRef->GetNumSamples(); + } + } + + fileRef->Close(); + outFileRef->Close(); + logMsg.Format("[%s] Th%s stop: num samples: s1:%u s2:%u out:%u", trackingId, threadIdString, numSamplesS1, numSamplesS2, numSamplesOut); + LOG4CXX_INFO(LOG.batchProcessingLog, logMsg); + + if(CONFIG.m_deleteNativeFile) + { + fileRef->Delete(); + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + } + + // Report tape ready message to orktrack + CaptureEventRef readyEvent(new CaptureEvent); + readyEvent->m_type = CaptureEvent::EtReady; + readyEvent->m_timestamp = time(NULL); + audioTapeRef->AddCaptureEvent(readyEvent, true); + Reporting::Instance()->AddAudioTape(audioTapeRef); + + // Finished processing the tape, pass on to next processor + pBatchProcessing->RunNextProcessor(audioTapeRef); + } + } + catch (CStdString& e) + { + LOG4CXX_ERROR(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " " + e); + if(fileRef.get()) {fileRef->Close();} + if(outFileRef.get()) {outFileRef->Close();} + if(CONFIG.m_deleteFailedCaptureFile && fileRef.get() != NULL) + { + LOG4CXX_INFO(LOG.batchProcessingLog, "[" + trackingId + "] Th" + threadIdString + " deleting native and transcoded"); + if(fileRef.get()) {fileRef->Delete();} + if(outFileRef.get()) {outFileRef->Delete();} + } + } + //catch(...) + //{ + // LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("unknown exception")); + //} + } + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Exiting thread Th" + threadIdString)); +} + + diff --git a/orkbasecxx/BatchProcessing.h b/orkbasecxx/BatchProcessing.h new file mode 100644 index 0000000..e3579b5 --- /dev/null +++ b/orkbasecxx/BatchProcessing.h @@ -0,0 +1,55 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __BATCHPROCESSING_H__ +#define __BATCHPROCESSING_H__ + +#include "ThreadSafeQueue.h" +#include "TapeProcessor.h" +#include "AudioTape.h" +#include "ace/Thread_Mutex.h" +#include + +class BatchProcessing; +typedef boost::shared_ptr BatchProcessingRef; + +/** + * This tape processor handles the audio transcoding + */ +class DLL_IMPORT_EXPORT_ORKBASE BatchProcessing : public TapeProcessor +{ +public: + static void Initialize(); + + CStdString __CDECL__ GetName(); + TapeProcessorRef __CDECL__ Instanciate(); + void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); + + + static void ThreadHandler(void *args); + + void SetQueueSize(int size); + +private: + BatchProcessing(); + static TapeProcessorRef m_singleton; + + ThreadSafeQueue m_audioTapeQueue; + + size_t m_threadCount; + ACE_Thread_Mutex m_mutex; + int m_currentDay; +}; + +#endif + diff --git a/orkbasecxx/CapturePluginProxy.cpp b/orkbasecxx/CapturePluginProxy.cpp new file mode 100644 index 0000000..fbfc3da --- /dev/null +++ b/orkbasecxx/CapturePluginProxy.cpp @@ -0,0 +1,230 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#include "CapturePluginProxy.h" +#include "ace/OS_NS_dirent.h" +#include "ace/OS_NS_string.h" +#include "ace/Thread_Manager.h" +#include "ConfigManager.h" +#include "CapturePort.h" + +CapturePluginProxy* CapturePluginProxy::m_singleton; + +CapturePluginProxy::CapturePluginProxy() +{ + m_configureFunction = NULL; + m_registerCallBacksFunction = NULL; + m_initializeFunction = NULL; + m_runFunction = NULL; + m_startCaptureFunction = NULL; + m_stopCaptureFunction = NULL; + + m_loaded = false; +} + +CapturePluginProxy* CapturePluginProxy::Singleton() +{ + return m_singleton; +} + +bool CapturePluginProxy::Initialize() +{ + m_singleton = new CapturePluginProxy(); + return m_singleton->Init(); +} + +bool CapturePluginProxy::Init() +{ + // Get the desired capture plugin from the config file, or else, use the first dll encountered. + CStdString pluginDirectory = CONFIG.m_capturePluginPath + "/"; + CStdString pluginPath; + if (!CONFIG.m_capturePlugin.IsEmpty()) + { + // A specific plugin was specified in the config file + pluginPath = pluginDirectory + CONFIG.m_capturePlugin; + } + else + { + // No plugin specified, find the first one in the plugin directory + ACE_DIR* dir = ACE_OS::opendir((PCSTR)pluginDirectory); + if (!dir) + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Capture plugin directory could not be found:" + pluginDirectory)); + } + else + { + dirent* dirEntry = NULL; + bool found = false; + bool done = false; + while(!found && !done) + { + dirEntry = ACE_OS::readdir(dir); + if(dirEntry) + { + if (ACE_OS::strstr(dirEntry->d_name, ".dll")) + { + found = true; + done = true; + pluginPath = pluginDirectory + dirEntry->d_name; + } + } + else + { + done = true; + } + } + ACE_OS::closedir(dir); + } + } + if (!pluginPath.IsEmpty()) + { + m_dll.open((PCSTR)pluginPath); + ACE_TCHAR* error = m_dll.error(); + if(error) + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Failed to load the following plugin: ") + pluginPath); + } + else + { + // Ok, the dll has been successfully loaded + LOG4CXX_INFO(LOG.rootLog, CStdString("Loaded plugin: ") + pluginPath); + + RegisterCallBacksFunction registerCallBacks; + registerCallBacks = (RegisterCallBacksFunction)m_dll.symbol("RegisterCallBacks"); + registerCallBacks(AudioChunkCallBack, CaptureEventCallBack, OrkLogManager::Instance()); + + m_configureFunction = (ConfigureFunction)m_dll.symbol("Configure"); + if (m_configureFunction) + { + ConfigManager::Instance()->AddConfigureFunction(m_configureFunction); + + m_initializeFunction = (InitializeFunction)m_dll.symbol("Initialize"); + if (m_initializeFunction) + { + m_initializeFunction(); + + m_runFunction = (RunFunction)m_dll.symbol("Run"); + if (m_runFunction) + { + m_startCaptureFunction = (StartCaptureFunction)m_dll.symbol("StartCapture"); + if (m_startCaptureFunction) + { + m_stopCaptureFunction = (StopCaptureFunction)m_dll.symbol("StopCapture"); + if (m_stopCaptureFunction) + { + m_loaded = true; + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find StopCapture function in ") + pluginPath); + } + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find StartCapture function in ") + pluginPath); + } + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find Run function in ") + pluginPath); + } + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find Initialize function in ") + pluginPath); + } + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find Configure function in ") + pluginPath); + } + } + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Failed to find any capture plugin in: ") + pluginDirectory); + } + + return m_loaded; +} + +void CapturePluginProxy::Run() +{ + if (!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(m_runFunction))) + { + LOG4CXX_INFO(LOG.rootLog, CStdString("Failed to create capture thread")); + } +} + +void CapturePluginProxy::Shutdown() +{ + ShutdownFunction shutdownFunction = (ShutdownFunction)m_dll.symbol("Shutdown"); + if (shutdownFunction) + { + LOG4CXX_INFO(LOG.rootLog, CStdString("Shutting down")); + shutdownFunction(); + } + else + { + LOG4CXX_INFO(LOG.rootLog, CStdString("Could not find DLL Shutdown function")); + } +} + +void CapturePluginProxy::StartCapture(CStdString& party) +{ + if(m_loaded) + { + m_startCaptureFunction(party); + } + else + { + throw(CStdString("StartCapture: Capture plugin not yet loaded")); + } +} + +void CapturePluginProxy::StopCapture(CStdString& party) +{ + if(m_loaded) + { + m_stopCaptureFunction(party); + } + else + { + throw(CStdString("StopCapture: Capture plugin not yet loaded")); + } +} + +void __CDECL__ CapturePluginProxy::AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort) +{ + // find the right port and give it the audio chunk + CapturePortRef portRef = CapturePortsSingleton::instance()->AddAndReturnPort(capturePort); + portRef->AddAudioChunk(chunkRef); +} + +void __CDECL__ CapturePluginProxy::CaptureEventCallBack(CaptureEventRef eventRef, CStdString& capturePort) +{ + if(CONFIG.m_vad || CONFIG.m_audioSegmentation) + { + if (eventRef->m_type == CaptureEvent::EtStart || eventRef->m_type == CaptureEvent::EtStop) + { + LOG4CXX_ERROR(LOG.portLog, "#" + capturePort + ": received start or stop while in VAD or audio segmentation mode"); + } + } + else + { + // find the right port and give it the event + CapturePortRef portRef = CapturePortsSingleton::instance()->AddAndReturnPort(capturePort); + portRef->AddCaptureEvent(eventRef); + } +} + diff --git a/orkbasecxx/CapturePluginProxy.h b/orkbasecxx/CapturePluginProxy.h new file mode 100644 index 0000000..631976d --- /dev/null +++ b/orkbasecxx/CapturePluginProxy.h @@ -0,0 +1,55 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __CAPTUREPLUGINPROXY_H__ +#define __CAPTUREPLUGINPROXY_H__ + +#include "ace/Singleton.h" +#include "ace/Thread_Mutex.h" +#include "ace/DLL.h" +#include "AudioCapture.h" +#include "AudioCapturePlugin.h" + +class DLL_IMPORT_EXPORT_ORKBASE CapturePluginProxy +{ +public: + static bool Initialize(); + static CapturePluginProxy* Singleton(); + + void Run(); + void Shutdown(); + void StartCapture(CStdString& party); + void StopCapture(CStdString& party); + + static void __CDECL__ AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort); + static void __CDECL__ CaptureEventCallBack(CaptureEventRef eventRef, CStdString& capturePort); +private: + CapturePluginProxy(); + static CapturePluginProxy* m_singleton; + bool Init(); + + ConfigureFunction m_configureFunction; + RegisterCallBacksFunction m_registerCallBacksFunction; + InitializeFunction m_initializeFunction; + RunFunction m_runFunction; + StartCaptureFunction m_startCaptureFunction; + StopCaptureFunction m_stopCaptureFunction; + + ACE_DLL m_dll; + bool m_loaded; +}; + +//typedef ACE_Singleton CapturePluginProxySingleton; + +#endif + diff --git a/orkbasecxx/CapturePort.cpp b/orkbasecxx/CapturePort.cpp new file mode 100644 index 0000000..677d966 --- /dev/null +++ b/orkbasecxx/CapturePort.cpp @@ -0,0 +1,358 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include +#include "CapturePort.h" +#include "Utils.h" +#include "ImmediateProcessing.h" +#include "Reporting.h" +#include "ConfigManager.h" + +static LoggerPtr s_log; + + +CapturePort::CapturePort(CStdString& id) +{ + m_id = id; + m_vadBelowThresholdSec = 0.0; + m_vadUp = false; + m_capturing = false; + m_lastUpdated = 0; + + LoadFilters(); +} + +void CapturePort::LoadFilters() +{ + for(std::list::iterator it = CONFIG.m_capturePortFilters.begin(); it != CONFIG.m_capturePortFilters.end(); it++) + { + CStdString filterName = *it; + FilterRef filter = FilterRegistry::instance()->GetNewFilter(filterName); + if(filter.get()) + { + m_filters.push_back(filter); + LOG4CXX_DEBUG(s_log, CStdString("Adding filter:") + filterName); + } + else + { + LOG4CXX_ERROR(s_log, CStdString("Filter:") + filterName + " does not exist, please check in config.xml"); + } + } +} + + +CStdString CapturePort::ToString() +{ + CStdString ret; + return ret; +} + +CStdString CapturePort::GetId() +{ + return m_id; +} + +void CapturePort::FilterAudioChunk(AudioChunkRef& chunkRef) +{ + // Iterate through all filters + std::list::iterator it; + for(it = m_filters.begin(); it != m_filters.end(); it++) + { + FilterRef filter = *it; + filter->AudioChunkIn(chunkRef); + filter->AudioChunkOut(chunkRef); + } +} + +void CapturePort::FilterCaptureEvent(CaptureEventRef& eventRef) +{ + // Iterate through all filters + std::list::iterator it; + for(it = m_filters.begin(); it != m_filters.end(); it++) + { + FilterRef filter = *it; + filter->CaptureEventIn(eventRef); + filter->CaptureEventOut(eventRef); + } +} + +void CapturePort::AddAudioChunk(AudioChunkRef chunkRef) +{ + FilterAudioChunk(chunkRef); + + time_t now = time(NULL); + m_lastUpdated = now; + + if(CONFIG.m_audioSegmentation) + { + if (m_audioTapeRef.get()) + { + if ((now - m_audioTapeRef->m_beginDate) >= CONFIG.m_audioSegmentDuration) + { + // signal current tape stop event + CaptureEventRef eventRef(new CaptureEvent); + eventRef->m_type = CaptureEvent::EtStop; + eventRef->m_timestamp = now; + AddCaptureEvent(eventRef); + + // create new tape + m_audioTapeRef.reset(new AudioTape(m_id)); + + // signal new tape start event + eventRef.reset(new CaptureEvent); + eventRef->m_type = CaptureEvent::EtStart; + eventRef->m_timestamp = now; + AddCaptureEvent(eventRef); + } + } + else + { + // create new tape + m_audioTapeRef.reset(new AudioTape(m_id)); + + // signal new tape start event + CaptureEventRef eventRef(new CaptureEvent); + eventRef->m_type = CaptureEvent::EtStart; + eventRef->m_timestamp = now; + AddCaptureEvent(eventRef); + } + } + else if (CONFIG.m_vad) + { + if(chunkRef->GetEncoding() == PcmAudio) + { + if(m_vadUp) + { + // There is an ongoing capture + if (chunkRef->ComputeRmsDb() < CONFIG.m_vadLowThresholdDb) + { + // Level has gone below low threshold, increase holdon counter + m_vadBelowThresholdSec += chunkRef->GetDurationSec(); + } + else + { + // Level has gone above low threshold, reset holdon counter + m_vadBelowThresholdSec = 0.0; + } + + if (m_vadBelowThresholdSec > CONFIG.m_vadHoldOnSec) + { + // no activity detected for more than hold on time + m_vadUp = false; + + // signal current tape stop event + CaptureEventRef eventRef(new CaptureEvent); + eventRef->m_type = CaptureEvent::EtStop; + eventRef->m_timestamp = now; + AddCaptureEvent(eventRef); + } + } + else + { + // No capture is taking place yet + if (chunkRef->ComputeRmsDb() > CONFIG.m_vadHighThresholdDb) + { + // Voice detected, start a new capture + m_vadBelowThresholdSec = 0.0; + m_vadUp = true; + + // create new tape + m_audioTapeRef.reset(new AudioTape(m_id)); + + // signal new tape start event + CaptureEventRef eventRef(new CaptureEvent); + eventRef->m_type = CaptureEvent::EtStart; + eventRef->m_timestamp = now; + AddCaptureEvent(eventRef); + } + } + } + else + { + LOG4CXX_ERROR(s_log, CStdString("Voice activity detection cannot be used on non PCM audio")); + } + } + + if (m_audioTapeRef.get() && m_capturing) + { + m_audioTapeRef->AddAudioChunk(chunkRef); + + // Signal to immediate processing thread that tape has new stuff + ImmediateProcessing::GetInstance()->AddAudioTape(m_audioTapeRef); + } +} + +void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) +{ + FilterCaptureEvent(eventRef); + + m_lastUpdated = time(NULL); + + AudioTapeRef audioTapeRef = m_audioTapeRef; + + // First of all, handle tape start + if (eventRef->m_type == CaptureEvent::EtStart) + { + m_capturing = true; + if (audioTapeRef.get()) + { + audioTapeRef->SetShouldStop(); // force stop of previous tape + } + audioTapeRef.reset(new AudioTape(m_id)); // Create a new tape + audioTapeRef->AddCaptureEvent(eventRef, true); + + m_audioTapeRef = audioTapeRef; + LOG4CXX_INFO(s_log, "[" + m_audioTapeRef->m_trackingId + "] #" + m_id + " start"); + } + + if (!audioTapeRef.get()) + { + LOG4CXX_WARN(s_log, "#" + m_id + ": received unexpected capture event:" + + CaptureEvent::EventTypeToString(eventRef->m_type)); + } + else + { + // Ok, at this point, we know we have a valid audio tape + switch(eventRef->m_type) + { + case CaptureEvent::EtStart: + break; + case CaptureEvent::EtStop: + + m_capturing = false; + LOG4CXX_INFO(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " stop"); + audioTapeRef->AddCaptureEvent(eventRef, true); + Reporting::Instance()->AddAudioTape(audioTapeRef); + + if (m_audioTapeRef->GetAudioFileRef().get()) + { + // Notify immediate processing that tape has stopped + ImmediateProcessing::GetInstance()->AddAudioTape(m_audioTapeRef); + } + else + { + // Received a stop but there is no valid audio file associated with the tape + LOG4CXX_WARN(s_log, "[" + audioTapeRef->m_trackingId + "] #" + m_id + " no audio reported between last start and stop"); + } + break; + case CaptureEvent::EtEndMetadata: + // Now that all metadata has been acquired, we can generate the tape start message + Reporting::Instance()->AddAudioTape(audioTapeRef); + break; + case CaptureEvent::EtUpdate: + audioTapeRef->AddCaptureEvent(eventRef, true); + // Generate tape update message + Reporting::Instance()->AddAudioTape(audioTapeRef); + break; + case CaptureEvent::EtDirection: + case CaptureEvent::EtRemoteParty: + case CaptureEvent::EtLocalParty: + case CaptureEvent::EtLocalEntryPoint: + default: + audioTapeRef->AddCaptureEvent(eventRef, false); + } + } +} + +bool CapturePort::IsExpired(time_t now) +{ + if((now - m_lastUpdated) > (10*60)) // 10 minutes + { + return true; + } + return false; +} + + +//======================================= +CapturePorts::CapturePorts() +{ + m_ports.clear(); + m_lastHooveringTime = time(NULL); + s_log = Logger::getLogger("port"); +} + +CapturePortRef CapturePorts::GetPort(CStdString & portId) +{ + Hoover(); + + std::map::iterator pair; + + pair = m_ports.find(portId); + + if (pair == m_ports.end()) + { + CapturePortRef nullPortRef; + return nullPortRef; + } + else + { + return pair->second; + } +} + +CapturePortRef CapturePorts::AddAndReturnPort(CStdString & portId) +{ + //MutexGuard mutexGuard(m_mutex); // To make sure a channel cannot be created twice - not used for now. CapturePorts only ever gets interaction from capture single thread + + CapturePortRef portRef = GetPort(portId); + if (portRef.get() == NULL) + { + // The port does not already exist, create it. + CapturePortRef newPortRef(new CapturePort(portId)); + m_ports.insert(std::make_pair(portId, newPortRef)); + return newPortRef; + } + else + { + return portRef; + } +} + +void CapturePorts::Hoover() +{ + CStdString logMsg; + time_t now = time(NULL); + if( (now - m_lastHooveringTime) > 10) // Hoover every 10 seconds + { + m_lastHooveringTime = now; + int numPorts = m_ports.size(); + + // Go round and detect inactive ports + std::map::iterator pair; + std::list toDismiss; + + for(pair = m_ports.begin(); pair != m_ports.end(); pair++) + { + CapturePortRef port = pair->second; + if(port->IsExpired(now)) + { + toDismiss.push_back(port); + } + } + + // Discard inactive ports + for (std::list::iterator it = toDismiss.begin(); it != toDismiss.end() ; it++) + { + CapturePortRef port = *it; + m_ports.erase(port->GetId()); + LOG4CXX_DEBUG(s_log, port->GetId() + ": Expired"); + } + logMsg.Format("Hoovered %d ports. New number:%d", (numPorts - m_ports.size()), m_ports.size()); + LOG4CXX_DEBUG(s_log, logMsg); + } +} + + diff --git a/orkbasecxx/CapturePort.h b/orkbasecxx/CapturePort.h new file mode 100644 index 0000000..aa80e29 --- /dev/null +++ b/orkbasecxx/CapturePort.h @@ -0,0 +1,78 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#pragma warning( disable: 4786 ) + +#ifndef __PORT_H__ +#define __PORT_H__ + +#include +#include +#include "boost/shared_ptr.hpp" +#include "ace/Thread_Mutex.h" +#include "ace/Singleton.h" + +#include "StdString.h" + +#include "AudioCapture.h" +#include "AudioTape.h" +#include "Filter.h" + + +/** Base class for all types of capture ports. */ +class DLL_IMPORT_EXPORT_ORKBASE CapturePort +{ +public: + CapturePort(CStdString& Id); + CStdString ToString(); + CStdString GetId(); + + void AddAudioChunk(AudioChunkRef chunkRef); + void AddCaptureEvent(CaptureEventRef eventRef); + bool IsExpired(time_t now); +private: + void LoadFilters(); + void FilterAudioChunk(AudioChunkRef& chunkRef); + void FilterCaptureEvent(CaptureEventRef& eventRef); + + CStdString m_id; + AudioTapeRef m_audioTapeRef; + ACE_Thread_Mutex m_mutex; + bool m_capturing; + double m_vadBelowThresholdSec; + bool m_vadUp; + time_t m_lastUpdated; + std::list m_filters; +}; + +typedef boost::shared_ptr CapturePortRef; + +/** This singleton holds all dynamically created capture ports and allows convenient access. */ +class DLL_IMPORT_EXPORT_ORKBASE CapturePorts +{ +public: + CapturePorts(); + CapturePortRef GetPort(CStdString & portId); + /** Tries to find a capture port from its ID. If unsuccessful, creates a new one and returns it */ + CapturePortRef AddAndReturnPort(CStdString & portId); + void Hoover(); +private: + std::map m_ports; + ACE_Thread_Mutex m_mutex; + time_t m_lastHooveringTime; +}; + +typedef ACE_Singleton CapturePortsSingleton; + +#endif + diff --git a/orkbasecxx/Daemon.cpp b/orkbasecxx/Daemon.cpp new file mode 100644 index 0000000..8a07f2d --- /dev/null +++ b/orkbasecxx/Daemon.cpp @@ -0,0 +1,284 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include "ace/OS_NS_dirent.h" +#include "Utils.h" +#ifdef WIN32 +#include +#include +SERVICE_STATUS serviceStatus; +SERVICE_STATUS_HANDLE serviceStatusHandle = 0; +HANDLE stopServiceEvent = 0; +#endif +#include "ace/OS_NS_signal.h" +#include "Daemon.h" + +void handle_signal(int sig_num) +{ + signal(SIGUSR1, handle_signal); + Daemon::Singleton()->Stop(); +} + +#ifdef WIN32 +void WINAPI ServiceControlHandler( DWORD controlCode ) +{ + switch ( controlCode ) + { + case SERVICE_CONTROL_INTERROGATE: + break; + + case SERVICE_CONTROL_SHUTDOWN: + case SERVICE_CONTROL_STOP: + serviceStatus.dwCurrentState = SERVICE_STOP_PENDING; + SetServiceStatus( serviceStatusHandle, &serviceStatus ); + Daemon::Singleton()->Stop(); + return; + + case SERVICE_CONTROL_PAUSE: + break; + + case SERVICE_CONTROL_CONTINUE: + break; + + default: + ; + } + + SetServiceStatus( serviceStatusHandle, &serviceStatus ); +} +#endif + +Daemon* Daemon::m_singleton; + +Daemon::Daemon() +{ +} + +Daemon* Daemon::Singleton() +{ + return m_singleton; +} + +void Daemon::Initialize(CStdString serviceName, DaemonHandler runHandler, DaemonHandler stopHandler) +{ + m_singleton = new Daemon(); + + m_singleton->m_runHandler = runHandler; + m_singleton->m_stopHandler = stopHandler; + m_singleton->m_serviceName = serviceName; + + m_singleton->m_stopping = false; + m_singleton->m_shortLived = false; +} + +void Daemon::Start() +{ +#ifdef WIN32 + // change current directory to service location (default for NT services is system32) + CStdString workingDirectory; + + TCHAR path[ _MAX_PATH + 1 ]; + if ( GetModuleFileName( 0, path, sizeof(path)/sizeof(path[0]) ) > 0 ) + { + CStdString pathString = path; + int lastBackSlashPosition = pathString.ReverseFind("\\"); + if (lastBackSlashPosition != -1) + { + workingDirectory = pathString.Left(lastBackSlashPosition); + chdir((PCSTR)workingDirectory); + } + } + + SERVICE_TABLE_ENTRY serviceTable[] = + { + { (char *)(PCSTR)m_serviceName, Daemon::Run }, + { 0, 0 } + }; + + StartServiceCtrlDispatcher( serviceTable ); +#else + signal(SIGUSR1, handle_signal); + Daemon::Run(); +#endif +} + +#ifdef WIN32 +void WINAPI Daemon::Run( DWORD /*argc*/, TCHAR* /*argv*/[] ) +#else +void Daemon::Run() +#endif +{ +#ifdef WIN32 + // initialise service status + serviceStatus.dwServiceType = SERVICE_WIN32; + serviceStatus.dwCurrentState = SERVICE_START_PENDING; + serviceStatus.dwControlsAccepted = 0; + serviceStatus.dwWin32ExitCode = NO_ERROR; + serviceStatus.dwServiceSpecificExitCode = NO_ERROR; + serviceStatus.dwCheckPoint = 0; + serviceStatus.dwWaitHint = 0; + + serviceStatusHandle = RegisterServiceCtrlHandler( (PCSTR)Daemon::Singleton()->m_serviceName, ServiceControlHandler ); + if ( serviceStatusHandle ) + { + // service is starting + serviceStatus.dwCurrentState = SERVICE_START_PENDING; + SetServiceStatus( serviceStatusHandle, &serviceStatus ); + + // running + serviceStatus.dwControlsAccepted |= (SERVICE_ACCEPT_STOP | SERVICE_ACCEPT_SHUTDOWN); + serviceStatus.dwCurrentState = SERVICE_RUNNING; + SetServiceStatus( serviceStatusHandle, &serviceStatus ); + } +#else // non WIN32 + int i,lfp; + char str[10]; + if(getppid()==1) return; /* already a daemon */ + i=fork(); + if (i<0) exit(1); /* fork error */ + if (i>0) exit(0); /* parent exits */ + /* child (daemon) continues */ + setsid(); /* obtain a new process group */ + for (i=getdtablesize();i>=0;--i) close(i); /* close all descriptors */ + i=open("/dev/null",O_RDWR); dup(i); dup(i); /* handle standart I/O */ + umask(027); /* set newly created file permissions */ + //chdir(RUNNING_DIR); /* change running directory */ + + char *loggingPath = NULL; + CStdString lockFile = CStdString(""); + + loggingPath = ACE_OS::getenv("ORKAUDIO_LOGGING_PATH"); + if(loggingPath) { + ACE_DIR* dir = ACE_OS::opendir(loggingPath); + if(dir) { + ACE_OS::closedir(dir); + lockFile.Format("%s/orkaudio.lock", loggingPath); + } + } + + if(!lockFile.size()) { + lfp=open("/var/log/orkaudio/orkaudio.lock",O_RDWR|O_CREAT,0640); + } else { + lfp=open(lockFile.c_str(),O_RDWR|O_CREAT,0640); + } + + if (lfp<0) + { + lfp=open("orkaudio.lock",O_RDWR|O_CREAT,0640); + } + if (lfp<0) + { + exit(1); /* can not open */ + } + if (lockf(lfp,F_TLOCK,0)<0) exit(0); /* can not lock */ + /* first instance continues */ + sprintf(str,"%d\n",getpid()); + write(lfp,str,strlen(str)); /* record pid to lockfile */ + + //signal(SIGCHLD,SIG_IGN); /* ignore child */ + //signal(SIGTSTP,SIG_IGN); /* ignore tty signals */ + //signal(SIGTTOU,SIG_IGN); + //signal(SIGTTIN,SIG_IGN); + //signal(SIGHUP,signal_handler); /* catch hangup signal */ +#endif + + Daemon::Singleton()->m_runHandler(); + +#ifdef WIN32 + // service was stopped + serviceStatus.dwCurrentState = SERVICE_STOP_PENDING; + SetServiceStatus( serviceStatusHandle, &serviceStatus ); + + // do cleanup here + CloseHandle( stopServiceEvent ); + stopServiceEvent = 0; + + // service is now stopped + serviceStatus.dwControlsAccepted &= ~(SERVICE_ACCEPT_STOP | SERVICE_ACCEPT_SHUTDOWN); + serviceStatus.dwCurrentState = SERVICE_STOPPED; + SetServiceStatus( serviceStatusHandle, &serviceStatus ); +#endif +} + +void Daemon::Stop() +{ + m_stopping = true; + m_stopHandler(); +} + +void Daemon::Install() +{ +#ifdef WIN32 + SC_HANDLE serviceControlManager = OpenSCManager( 0, 0, SC_MANAGER_CREATE_SERVICE ); + + if ( serviceControlManager ) + { + TCHAR path[ _MAX_PATH + 1 ]; + if ( GetModuleFileName( 0, path, sizeof(path)/sizeof(path[0]) ) > 0 ) + { + SC_HANDLE service = CreateService( serviceControlManager, + (PCSTR)m_serviceName, (PCSTR)m_serviceName, + SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS, + SERVICE_AUTO_START, SERVICE_ERROR_IGNORE, path, + 0, 0, 0, 0, 0 ); + if ( service ) + CloseServiceHandle( service ); + } + + CloseServiceHandle( serviceControlManager ); + } +#endif +} + +void Daemon::Uninstall() +{ +#ifdef WIN32 + SC_HANDLE serviceControlManager = OpenSCManager( 0, 0, SC_MANAGER_CONNECT ); + + if ( serviceControlManager ) + { + SC_HANDLE service = OpenService( serviceControlManager, + (PCSTR)m_serviceName, SERVICE_QUERY_STATUS | DELETE ); + if ( service ) + { + SERVICE_STATUS serviceStatus; + if ( QueryServiceStatus( service, &serviceStatus ) ) + { + if ( serviceStatus.dwCurrentState == SERVICE_STOPPED ) + DeleteService( service ); + } + + CloseServiceHandle( service ); + } + + CloseServiceHandle( serviceControlManager ); + } +#endif +} + +bool Daemon::IsStopping() +{ + return m_stopping; +} + +void Daemon::SetShortLived() +{ + m_shortLived = true; +} + +bool Daemon::GetShortLived() +{ + return m_shortLived; +} diff --git a/orkbasecxx/Daemon.h b/orkbasecxx/Daemon.h new file mode 100644 index 0000000..ba7c42f --- /dev/null +++ b/orkbasecxx/Daemon.h @@ -0,0 +1,58 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __DAEMON_H__ +#define __DAEMON_H__ + +#include "StdString.h" +#include "ace/Singleton.h" +#include "ace/Thread_Mutex.h" + +typedef void (*DaemonHandler)(void); + +class DLL_IMPORT_EXPORT_ORKBASE Daemon +{ +public: + static void Initialize(CStdString serviceName, DaemonHandler runHandler, DaemonHandler stopHandler); + static Daemon* Singleton(); + void Start(); + void Stop(); + void Install(); + void Uninstall(); + bool IsStopping(); + + void SetShortLived(); + bool GetShortLived(); + +private: + Daemon(); + static Daemon* m_singleton; + +#ifdef WIN32 + static void WINAPI Run( DWORD /*argc*/, TCHAR* /*argv*/[] ); +#else + static void Run(); +#endif + + DaemonHandler m_runHandler; + DaemonHandler m_stopHandler; + CStdString m_serviceName; + + bool m_stopping; + bool m_shortLived; +}; + +//typedef ACE_Singleton DaemonSingleton; + +#endif + diff --git a/orkbasecxx/ImmediateProcessing.cpp b/orkbasecxx/ImmediateProcessing.cpp new file mode 100644 index 0000000..1ac414c --- /dev/null +++ b/orkbasecxx/ImmediateProcessing.cpp @@ -0,0 +1,101 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ +#pragma warning( disable: 4786 ) + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include "ImmediateProcessing.h" +#include "LogManager.h" +#include "ace/OS_NS_unistd.h" +#include "BatchProcessing.h" +#include "Daemon.h" +#include "ConfigManager.h" +#include "TapeProcessor.h" + + +ImmediateProcessing ImmediateProcessing::m_immediateProcessingSingleton; + +ImmediateProcessing::ImmediateProcessing() +{ + m_lastQueueFullTime = time(NULL); +} + +ImmediateProcessing* ImmediateProcessing::GetInstance() +{ + return &m_immediateProcessingSingleton; +} + +void ImmediateProcessing::AddAudioTape(AudioTapeRef audioTapeRef) +{ + if (!m_audioTapeQueue.push(audioTapeRef)) + { + if( (time(NULL) - m_lastQueueFullTime) > 10 ) + { + m_lastQueueFullTime = time(NULL); + LOG4CXX_ERROR(LOG.immediateProcessingLog, CStdString("ImmediateProcessing: queue full")); + } + } +} + +void ImmediateProcessing::SetQueueSize(int size) +{ + m_audioTapeQueue.setSize(size); +} + + +void ImmediateProcessing::ThreadHandler(void *args) +{ + CStdString logMsg; + + ImmediateProcessing* pImmediateProcessing = ImmediateProcessing::GetInstance(); + pImmediateProcessing->SetQueueSize(CONFIG.m_immediateProcessingQueueSize); + + logMsg.Format("thread starting - queue size:%d", CONFIG.m_immediateProcessingQueueSize); + LOG4CXX_INFO(LOG.immediateProcessingLog, logMsg); + + bool stop = false; + + for(;stop == false;) + { + try + { + AudioTapeRef audioTapeRef = pImmediateProcessing->m_audioTapeQueue.pop(); + + if(audioTapeRef.get() == NULL) + { + if(Daemon::Singleton()->IsStopping()) + { + stop = true; + } + } + else + { + //LOG4CXX_DEBUG(LOG.immediateProcessingLog, CStdString("Got chunk")); + + audioTapeRef->Write(); + + if (audioTapeRef->IsReadyForBatchProcessing()) + { + // Pass the tape to the tape processor chain + TapeProcessorRegistry::instance()->RunProcessingChain(audioTapeRef); + } + } + } + catch (CStdString& e) + { + LOG4CXX_ERROR(LOG.immediateProcessingLog, CStdString("ImmediateProcessing: ") + e); + } + } + LOG4CXX_INFO(LOG.immediateProcessingLog, CStdString("Exiting thread")); +} + diff --git a/orkbasecxx/ImmediateProcessing.h b/orkbasecxx/ImmediateProcessing.h new file mode 100644 index 0000000..a31a319 --- /dev/null +++ b/orkbasecxx/ImmediateProcessing.h @@ -0,0 +1,37 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __IMMEDIATEPROCESSING_H__ +#define __IMMEDIATEPROCESSING_H__ + +#include "ThreadSafeQueue.h" +#include "AudioTape.h" + +class DLL_IMPORT_EXPORT_ORKBASE ImmediateProcessing +{ +public: + ImmediateProcessing(); + static ImmediateProcessing* GetInstance(); + static void ThreadHandler(void *args); + + void AddAudioTape(AudioTapeRef audioTapeRef); + void SetQueueSize(int size); +private: + static ImmediateProcessing m_immediateProcessingSingleton; + ThreadSafeQueue m_audioTapeQueue; + + time_t m_lastQueueFullTime; +}; + +#endif + diff --git a/orkbasecxx/OrkBase.dsp b/orkbasecxx/OrkBase.dsp index 596ce99..7b5e008 100644 --- a/orkbasecxx/OrkBase.dsp +++ b/orkbasecxx/OrkBase.dsp @@ -243,6 +243,14 @@ SOURCE=.\messages\PingMsg.h # End Source File # Begin Source File +SOURCE=.\messages\RecordMsg.cpp +# End Source File +# Begin Source File + +SOURCE=.\messages\RecordMsg.h +# End Source File +# Begin Source File + SOURCE=.\Messages\SyncMessage.cpp !IF "$(CFG)" == "OrkBase - Win32 Release" @@ -529,6 +537,30 @@ SOURCE=.\AudioTape.h # End Source File # Begin Source File +SOURCE=.\BatchProcessing.cpp +# End Source File +# Begin Source File + +SOURCE=.\BatchProcessing.h +# End Source File +# Begin Source File + +SOURCE=.\CapturePluginProxy.cpp +# End Source File +# Begin Source File + +SOURCE=.\CapturePluginProxy.h +# End Source File +# Begin Source File + +SOURCE=.\CapturePort.cpp +# End Source File +# Begin Source File + +SOURCE=.\CapturePort.h +# End Source File +# Begin Source File + SOURCE=.\Config.cpp # End Source File # Begin Source File @@ -545,6 +577,14 @@ SOURCE=.\ConfigManager.h # End Source File # Begin Source File +SOURCE=.\Daemon.cpp +# End Source File +# Begin Source File + +SOURCE=.\Daemon.h +# End Source File +# Begin Source File + SOURCE=.\dll.h # End Source File # Begin Source File @@ -565,6 +605,14 @@ SOURCE=.\g711.h # End Source File # Begin Source File +SOURCE=.\ImmediateProcessing.cpp +# End Source File +# Begin Source File + +SOURCE=.\ImmediateProcessing.h +# End Source File +# Begin Source File + SOURCE=.\LogManager.cpp # End Source File # Begin Source File @@ -648,6 +696,14 @@ SOURCE=.\OrkClient.h # End Source File # Begin Source File +SOURCE=.\Reporting.cpp +# End Source File +# Begin Source File + +SOURCE=.\Reporting.h +# End Source File +# Begin Source File + SOURCE=.\StdString.h # End Source File # Begin Source File diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp new file mode 100644 index 0000000..9aa053d --- /dev/null +++ b/orkbasecxx/Reporting.cpp @@ -0,0 +1,263 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ +#pragma warning( disable: 4786 ) + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include "ConfigManager.h" +#include "Reporting.h" +#include "LogManager.h" +#include "messages/Message.h" +#include "messages/TapeMsg.h" +#include "OrkClient.h" +#include "Daemon.h" + + +TapeProcessorRef Reporting::m_singleton; + +void Reporting::Initialize() +{ + if(m_singleton.get() == NULL) + { + m_singleton.reset(new Reporting()); + TapeProcessorRegistry::instance()->RegisterTapeProcessor(m_singleton); + } +} + +Reporting* Reporting::Instance() +{ + return (Reporting*)m_singleton.get(); +} + +Reporting::Reporting() +{ + m_queueFullError = false; + numTapesToSkip = 0; +} + +CStdString __CDECL__ Reporting::GetName() +{ + return "Reporting"; +} + +TapeProcessorRef Reporting::Instanciate() +{ + return m_singleton; +} + +void __CDECL__ Reporting::SkipTapes(int number) +{ + MutexSentinel sentinel(m_mutex); + numTapesToSkip++; +} + +bool Reporting::IsSkip() +{ + MutexSentinel sentinel(m_mutex); + if(numTapesToSkip) + { + numTapesToSkip--; + return true; + } + return false; +} + +void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) +{ + if (m_audioTapeQueue.push(audioTapeRef)) + { + LOG4CXX_DEBUG(LOG.reportingLog, CStdString("added audiotape to queue:") + audioTapeRef->GetIdentifier()); + m_queueFullError = false; + } + else + { + if(m_queueFullError == false) + { + m_queueFullError = true; + LOG4CXX_ERROR(LOG.reportingLog, CStdString("queue full")); + } + } +} + +void Reporting::ThreadHandler(void *args) +{ + CStdString processorName("Reporting"); + TapeProcessorRef reporting = TapeProcessorRegistry::instance()->GetNewTapeProcessor(processorName); + if(reporting.get() == NULL) + { + LOG4CXX_ERROR(LOG.reportingLog, "Could not instanciate Reporting"); + return; + } + Reporting* pReporting = (Reporting*)(reporting->Instanciate().get()); + + bool stop = false; + bool reportError = true; + time_t reportErrorLastTime = 0; + bool error = false; + + for(;stop == false;) + { + try + { + AudioTapeRef audioTapeRef = pReporting->m_audioTapeQueue.pop(); + + if(audioTapeRef.get() == NULL) + { + if(Daemon::Singleton()->IsStopping()) + { + stop = true; + } + } + else + { + + MessageRef msgRef; + audioTapeRef->GetMessage(msgRef); + TapeMsg* ptapeMsg = (TapeMsg*)msgRef.get(); + //bool startMsg = false; + bool realtimeMessage = false; + + if(msgRef.get() && CONFIG.m_enableReporting) + { + //if(ptapeMsg->m_stage.Equals("START")) + //{ + // startMsg = true; + //} + if(ptapeMsg->m_stage.Equals("start") || ptapeMsg->m_stage.Equals("stop")) + { + realtimeMessage = true; + } + + CStdString msgAsSingleLineString = msgRef->SerializeSingleLine(); + LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString); + + OrkHttpSingleLineClient c; + TapeResponseRef tr(new TapeResponse()); + audioTapeRef->m_tapeResponse = tr; + + bool success = false; + + while (!success && !pReporting->IsSkip()) + { + if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)(*tr.get()), CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + { + success = true; + reportError = true; // reenable error reporting + if(error) + { + error = false; + LOG4CXX_ERROR(LOG.reportingLog, CStdString("Orktrack successfully contacted")); + } + + if(tr->m_deleteTape) + { + CStdString tapeFilename = audioTapeRef->GetFilename(); + + CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + tapeFilename; + if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) + { + LOG4CXX_INFO(LOG.reportingLog, "Deleted tape: " + tapeFilename); + } + else + { + LOG4CXX_DEBUG(LOG.reportingLog, "Could not delete tape: " + tapeFilename); + } + + } + //else + //{ + // if(!startMsg) + // { + // // Pass the tape to the next processor + // pReporting->Runsftp NextProcessor(audioTapeRef); + // } + //} + } + else + { + error = true; + + if( reportError || ((time(NULL) - reportErrorLastTime) > 60) ) // at worst, one error is reported every minute + { + reportError = false; + reportErrorLastTime = time(NULL); + LOG4CXX_ERROR(LOG.reportingLog, CStdString("Could not contact orktrack")); + } + if(realtimeMessage) + { + success = true; // No need to resend realtime messages + } + else + { + ACE_OS::sleep(2); // Make sure orktrack is not flooded in case of a problem + } + } + } + } + } + } + catch (CStdString& e) + { + LOG4CXX_ERROR(LOG.reportingLog, CStdString("Exception: ") + e); + } + } + LOG4CXX_INFO(LOG.reportingLog, CStdString("Exiting thread")); +} + +//======================================================= +#define REPORTING_SKIP_TAPE_CLASS "reportingskiptape" + +ReportingSkipTapeMsg::ReportingSkipTapeMsg() +{ + m_number = 1; +} + + +void ReportingSkipTapeMsg::Define(Serializer* s) +{ + CStdString thisClass(REPORTING_SKIP_TAPE_CLASS); + s->StringValue(OBJECT_TYPE_TAG, thisClass, true); + s->IntValue("num", (int&)m_number, false); +} + + +CStdString ReportingSkipTapeMsg::GetClassName() +{ + return CStdString(REPORTING_SKIP_TAPE_CLASS); +} + +ObjectRef ReportingSkipTapeMsg::NewInstance() +{ + return ObjectRef(new ReportingSkipTapeMsg); +} + +ObjectRef ReportingSkipTapeMsg::Process() +{ + bool success = true; + CStdString logMsg; + + Reporting* reporting = Reporting::Instance(); + if(reporting) + { + reporting->SkipTapes(m_number); + } + + SimpleResponseMsg* msg = new SimpleResponseMsg; + ObjectRef ref(msg); + msg->m_success = success; + msg->m_comment = logMsg; + return ref; +} + + + diff --git a/orkbasecxx/Reporting.h b/orkbasecxx/Reporting.h new file mode 100644 index 0000000..7f7778d --- /dev/null +++ b/orkbasecxx/Reporting.h @@ -0,0 +1,64 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __REPORTING_H__ +#define __REPORTING_H__ + +#include "ThreadSafeQueue.h" +#include "TapeProcessor.h" +#include "AudioTape.h" + +class DLL_IMPORT_EXPORT_ORKBASE Reporting : public TapeProcessor +{ +public: + static void Initialize(); + static Reporting* Instance(); + + CStdString __CDECL__ GetName(); + TapeProcessorRef __CDECL__ Instanciate(); + void __CDECL__ AddAudioTape(AudioTapeRef& audioTapeRef); + void __CDECL__ SkipTapes(int number); + + //static Reporting* GetInstance(); + static void ThreadHandler(void *args); + +private: + Reporting(); + bool IsSkip(); + + //static Reporting m_reportingSingleton; + static TapeProcessorRef m_singleton; + + ThreadSafeQueue m_audioTapeQueue; + bool m_queueFullError; + int numTapesToSkip; + ACE_Thread_Mutex m_mutex; +}; + +class DLL_IMPORT_EXPORT_ORKBASE ReportingSkipTapeMsg : public SyncMessage +{ +public: + ReportingSkipTapeMsg(); + + void Define(Serializer* s); + inline void Validate() {}; + + CStdString GetClassName(); + ObjectRef NewInstance(); + ObjectRef Process(); + + int m_number; +}; + +#endif + diff --git a/orkbasecxx/messages/Makefile.am b/orkbasecxx/messages/Makefile.am index aba1c8e..8c56f9e 100644 --- a/orkbasecxx/messages/Makefile.am +++ b/orkbasecxx/messages/Makefile.am @@ -2,8 +2,8 @@ METASOURCES = AUTO noinst_LTLIBRARIES = libmessages.la libmessages_la_SOURCES = AsyncMessage.cpp Message.cpp \ SyncMessage.cpp CaptureMsg.cpp DeleteTapeMsg.cpp \ - PingMsg.cpp TapeMsg.cpp + PingMsg.cpp TapeMsg.cpp RecordMsg.cpp #libmessages_la_LIBADD = -L/projects/ext/xmlrpc++/xmlrpc++0.7/ -lXmlRpc -INCLUDES = -I@top_srcdir@ +INCLUDES = -I@top_srcdir@ -I../../orkaudio AM_CXXFLAGS = -D_REENTRANT diff --git a/orkbasecxx/messages/Message.h b/orkbasecxx/messages/Message.h index 1abfcc0..de1275b 100644 --- a/orkbasecxx/messages/Message.h +++ b/orkbasecxx/messages/Message.h @@ -32,6 +32,8 @@ #define TIMESTAMP_PARAM "timestamp" #define CAPTURE_PORT_PARAM "captureport" #define FILENAME_PARAM "filename" +#define ORKUID_PARAM "orkuid" +#define PARTY_PARAM "party" #define SUCCESS_PARAM "sucess" #define SUCCESS_DEFAULT true diff --git a/orkbasecxx/messages/RecordMsg.cpp b/orkbasecxx/messages/RecordMsg.cpp new file mode 100644 index 0000000..46f3778 --- /dev/null +++ b/orkbasecxx/messages/RecordMsg.cpp @@ -0,0 +1,51 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#include "RecordMsg.h" +#include "messages/AsyncMessage.h" +#include "CapturePluginProxy.h" + +#define RECORD_CLASS "record" + +void RecordMsg::Define(Serializer* s) +{ + CStdString recordClass(RECORD_CLASS); + s->StringValue(OBJECT_TYPE_TAG, recordClass, true); + s->StringValue(PARTY_PARAM, m_party, true); +} + + +CStdString RecordMsg::GetClassName() +{ + return CStdString(RECORD_CLASS); +} + +ObjectRef RecordMsg::NewInstance() +{ + return ObjectRef(new RecordMsg); +} + +ObjectRef RecordMsg::Process() +{ + SimpleResponseMsg* msg = new SimpleResponseMsg; + ObjectRef ref(msg); + CStdString logMsg; + + logMsg.Format("Starting capture for %s", m_party); + CapturePluginProxy::Singleton()->StartCapture(m_party); + msg->m_success = true; + msg->m_comment = logMsg; + + return ref; +} + diff --git a/orkbasecxx/messages/RecordMsg.h b/orkbasecxx/messages/RecordMsg.h new file mode 100644 index 0000000..35803be --- /dev/null +++ b/orkbasecxx/messages/RecordMsg.h @@ -0,0 +1,34 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + * This program is free software, distributed under the terms of + * the GNU General Public License. + * Please refer to http://www.gnu.org/copyleft/gpl.html + * + */ + +#ifndef __RECORDMSG_H__ +#define __RECORDMSG_H__ + +#include "messages/SyncMessage.h" +#include "AudioCapture.h" + +class DLL_IMPORT_EXPORT_ORKBASE RecordMsg : public SyncMessage +{ +public: + void Define(Serializer* s); + inline void Validate() {}; + + CStdString GetClassName(); + ObjectRef NewInstance(); + ObjectRef Process(); + + CStdString m_party; +}; + +#endif + -- cgit v1.2.3