From 6d301a8dadf98464952912db2646e68df15b347f Mon Sep 17 00:00:00 2001 From: Henri Herscher Date: Thu, 2 Mar 2006 16:11:12 +0000 Subject: OrkTrack can now ask for tape deletion git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@189 09dcff7a-b715-0410-9601-b79a96267cd0 --- orkaudio/AudioTape.cpp | 21 ++-- orkaudio/AudioTape.h | 2 + orkaudio/BatchProcessing.cpp | 219 +++++++++++++++++++++++++++++------------- orkaudio/BatchProcessing.h | 10 ++ orkaudio/CapturePort.cpp | 4 +- orkaudio/OrkAudio.cpp | 2 + orkaudio/Reporting.cpp | 12 ++- orkaudio/messages/TapeMsg.cpp | 24 +++++ orkaudio/messages/TapeMsg.h | 17 ++++ 9 files changed, 227 insertions(+), 84 deletions(-) diff --git a/orkaudio/AudioTape.cpp b/orkaudio/AudioTape.cpp index eaae2d2..7efce4d 100644 --- a/orkaudio/AudioTape.cpp +++ b/orkaudio/AudioTape.cpp @@ -240,20 +240,11 @@ void AudioTape::GetMessage(MessageRef& msgRef) msgRef.reset(new TapeMsg); TapeMsg* pTapeMsg = (TapeMsg*)msgRef.get(); - if(captureEventRef->m_type == CaptureEvent::EtStart || captureEventRef->m_type == CaptureEvent::EtStop) + if(captureEventRef->m_type == CaptureEvent::EtStop) { - if (captureEventRef->m_type == CaptureEvent::EtStart) - { - pTapeMsg->m_timestamp = m_beginDate; - } - else - { - pTapeMsg->m_timestamp = m_endDate; - } - pTapeMsg->m_recId = m_fileIdentifier; pTapeMsg->m_fileName = m_filePath + m_fileIdentifier + m_fileExtension; - pTapeMsg-> m_stage = CaptureEvent::EventTypeToString(captureEventRef->m_type); + pTapeMsg->m_stage = CaptureEvent::EventTypeToString(captureEventRef->m_type); pTapeMsg->m_capturePort = m_portId; pTapeMsg->m_localParty = m_localParty; pTapeMsg->m_localEntryPoint = m_localEntryPoint; @@ -264,7 +255,7 @@ void AudioTape::GetMessage(MessageRef& msgRef) } else { - // This should be a key-value pair message + // This should be a key-value pair message or a start event } } @@ -284,6 +275,12 @@ CStdString AudioTape::GetIdentifier() return m_fileIdentifier; } +CStdString AudioTape::GetFilename() +{ + return m_filePath + m_fileIdentifier + m_fileExtension; +} + + CStdString AudioTape::GetPath() { return m_filePath; diff --git a/orkaudio/AudioTape.h b/orkaudio/AudioTape.h index 899fda0..adf9b85 100644 --- a/orkaudio/AudioTape.h +++ b/orkaudio/AudioTape.h @@ -88,6 +88,8 @@ public: void GetMessage(MessageRef& msg); /** Returns an identifier for the tape which corresponds to the filename without extension */ CStdString GetIdentifier(); + /** Returns the full filename (including relative path) to the post-compression audio file */ + CStdString GetFilename(); CStdString GetPath(); AudioFileRef GetAudioFileRef(); bool IsReadyForBatchProcessing(); diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp index 5be8b74..1435bb4 100644 --- a/orkaudio/BatchProcessing.cpp +++ b/orkaudio/BatchProcessing.cpp @@ -24,6 +24,11 @@ BatchProcessing BatchProcessing::m_batchProcessingSingleton; BatchProcessing::BatchProcessing() { m_threadCount = 0; + + struct tm date = {0}; + time_t now = time(NULL); + ACE_OS::localtime_r(&now, &date); + m_currentDay = date.tm_mday; } @@ -41,6 +46,65 @@ void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef) } } +void BatchProcessing::TapeDropRegistration(CStdString& filename) +{ + MutexSentinel sentinel(m_tapeDropMutex); + + CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename; + if (ACE_OS::unlink((PCSTR)absoluteFilename) != 0) + { + LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename); + m_tapesToDrop.insert(std::make_pair(filename, time(NULL))); + } + else + { + LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename); + } +} + +bool BatchProcessing::DropTapeIfNeeded(CStdString& filename) +{ + bool shouldDrop = false; + + MutexSentinel sentinel(m_tapeDropMutex); + + std::map::iterator pair; + pair = m_tapesToDrop.find(filename); + if(pair != m_tapesToDrop.end()) + { + shouldDrop = true; + CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename; + if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0) + { + LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename); + m_tapesToDrop.erase(filename); + } + else + { + LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename); + } + } + + TapeDropHousekeeping(); + + return shouldDrop; +} + +void BatchProcessing::TapeDropHousekeeping() +{ + struct tm date = {0}; + time_t now = time(NULL); + ACE_OS::localtime_r(&now, &date); + if(m_currentDay != date.tm_mday) + { + // another day has passed away ... clear possible leftovers + m_currentDay = date.tm_mday; + m_tapesToDrop.clear(); + } +} + + + void BatchProcessing::ThreadHandler(void *args) { CStdString debug; @@ -73,89 +137,108 @@ void BatchProcessing::ThreadHandler(void *args) } else { - CStdString threadIdString = IntToString(threadId); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); - fileRef = audioTapeRef->GetAudioFileRef(); - fileRef->MoveOrig(); - fileRef->Open(AudioFile::READ); - AudioChunkRef chunkRef; - AudioChunkRef tmpChunkRef; - - switch(CONFIG.m_storageAudioFormat) + if(pBatchProcessing->DropTapeIfNeeded(audioTapeRef->GetFilename()) == true) { - case AudioTape::FfUlaw: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV)); - break; - case AudioTape::FfAlaw: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV)); - break; - case AudioTape::FfGsm: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV)); - break; - case AudioTape::FfPcmWav: - default: - outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV)); + // The tape we have pulled has been dropped in the meantime. just delete the capture file + if(CONFIG.m_deleteNativeFile) + { + fileRef->Delete(); + } } - CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier(); - outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate()); + else + { + // Let's work on the tape we have pulled + CStdString threadIdString = IntToString(threadId); + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); - FilterRef filter; - FilterRef decoder1; - FilterRef decoder2; + fileRef->MoveOrig(); + fileRef->Open(AudioFile::READ); - bool firstChunk = true; - bool voIpSession = false; + AudioChunkRef chunkRef; + AudioChunkRef tmpChunkRef; - while(fileRef->ReadChunkMono(chunkRef)) - { - AudioChunkDetails details = *chunkRef->GetDetails(); - if(firstChunk && details.m_rtpPayloadType != -1) + switch(CONFIG.m_storageAudioFormat) { - firstChunk = false; - CStdString filterName("RtpMixer"); - filter = FilterRegistry::instance()->GetNewFilter(filterName); - if(filter.get() == NULL) - { - debug = "BatchProcessing - Could not instanciate RTP mixer"; - throw(debug); - } - decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); - decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); - if(decoder1.get() == NULL || decoder2.get() == NULL) - { - debug.Format("BatchProcessing - Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType); - throw(debug); - } - voIpSession = true; + case AudioTape::FfUlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV)); + break; + case AudioTape::FfAlaw: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV)); + break; + case AudioTape::FfGsm: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV)); + break; + case AudioTape::FfPcmWav: + default: + outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV)); } - if(voIpSession) - { - if(details.m_channel == 2) + CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier(); + outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate()); + + FilterRef filter; + FilterRef decoder1; + FilterRef decoder2; + + bool firstChunk = true; + bool voIpSession = false; + + while(fileRef->ReadChunkMono(chunkRef)) + { + AudioChunkDetails details = *chunkRef->GetDetails(); + if(firstChunk && details.m_rtpPayloadType != -1) { - decoder2->AudioChunkIn(chunkRef); - decoder2->AudioChunkOut(tmpChunkRef); + firstChunk = false; + CStdString filterName("RtpMixer"); + filter = FilterRegistry::instance()->GetNewFilter(filterName); + if(filter.get() == NULL) + { + debug = "BatchProcessing - Could not instanciate RTP mixer"; + throw(debug); + } + decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); + decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType); + if(decoder1.get() == NULL || decoder2.get() == NULL) + { + debug.Format("BatchProcessing - Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType); + throw(debug); + } + voIpSession = true; } - else - { - decoder1->AudioChunkIn(chunkRef); - decoder1->AudioChunkOut(tmpChunkRef); + if(voIpSession) + { + if(details.m_channel == 2) + { + decoder2->AudioChunkIn(chunkRef); + decoder2->AudioChunkOut(tmpChunkRef); + } + else + { + decoder1->AudioChunkIn(chunkRef); + decoder1->AudioChunkOut(tmpChunkRef); + } + filter->AudioChunkIn(tmpChunkRef); + filter->AudioChunkOut(tmpChunkRef); } - filter->AudioChunkIn(tmpChunkRef); - filter->AudioChunkOut(tmpChunkRef); + outFileRef->WriteChunk(tmpChunkRef); + + // Give up CPU to make sure the actual recording always has priority + ACE_Time_Value yield; + yield.set(0,1); // 1 us + ACE_OS::sleep(yield); } - outFileRef->WriteChunk(tmpChunkRef); - } - fileRef->Close(); - outFileRef->Close(); + fileRef->Close(); + outFileRef->Close(); - if(CONFIG.m_deleteNativeFile) - { - fileRef->Delete(); - CStdString threadIdString = IntToString(threadId); - LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + if(CONFIG.m_deleteNativeFile) + { + fileRef->Delete(); + CStdString threadIdString = IntToString(threadId); + LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier()); + } + pBatchProcessing->DropTapeIfNeeded(audioTapeRef->GetFilename()); // maybe the tape was dropped while we were processing it } } } diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h index 73a62f0..3c50bbf 100644 --- a/orkaudio/BatchProcessing.h +++ b/orkaudio/BatchProcessing.h @@ -17,6 +17,7 @@ #include "ThreadSafeQueue.h" #include "AudioTape.h" #include "ace/Thread_Mutex.h" +#include class BatchProcessing { @@ -25,14 +26,23 @@ public: static void ThreadHandler(void *args); void AddAudioTape(AudioTapeRef audioTapeRef); + /** Ask for a tape to be deleted from disk */ + void TapeDropRegistration(CStdString& filename); private: BatchProcessing(); + bool DropTapeIfNeeded(CStdString&filename); + void TapeDropHousekeeping(); + static BatchProcessing m_batchProcessingSingleton; ThreadSafeQueue m_audioTapeQueue; size_t m_threadCount; ACE_Thread_Mutex m_mutex; + + ACE_Thread_Mutex m_tapeDropMutex; + std::map m_tapesToDrop; + int m_currentDay; }; #endif diff --git a/orkaudio/CapturePort.cpp b/orkaudio/CapturePort.cpp index 27e3943..044520f 100644 --- a/orkaudio/CapturePort.cpp +++ b/orkaudio/CapturePort.cpp @@ -150,8 +150,8 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef) audioTapeRef->SetShouldStop(); // force stop of previous tape } audioTapeRef.reset(new AudioTape(m_Id)); // Create a new tape - audioTapeRef->AddCaptureEvent(eventRef, true); - Reporting::GetInstance()->AddAudioTape(audioTapeRef); + audioTapeRef->AddCaptureEvent(eventRef, false); + //Reporting::GetInstance()->AddAudioTape(audioTapeRef); m_audioTapeRef = audioTapeRef; LOG4CXX_INFO(LOG.portLog, "#" + m_Id + ": start"); } diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp index 5e7cad3..14bf722 100644 --- a/orkaudio/OrkAudio.cpp +++ b/orkaudio/OrkAudio.cpp @@ -124,6 +124,8 @@ void MainThread() ObjectFactorySingleton::instance()->RegisterObject(objRef); objRef.reset(new TapeMsg); ObjectFactorySingleton::instance()->RegisterObject(objRef); + objRef.reset(new TapeResponse); + ObjectFactorySingleton::instance()->RegisterObject(objRef); objRef.reset(new SimpleResponseMsg); ObjectFactorySingleton::instance()->RegisterObject(objRef); objRef.reset(new DeleteTapeMsg); diff --git a/orkaudio/Reporting.cpp b/orkaudio/Reporting.cpp index 799a3e0..308b58c 100644 --- a/orkaudio/Reporting.cpp +++ b/orkaudio/Reporting.cpp @@ -15,8 +15,10 @@ #include "Reporting.h" #include "LogManager.h" #include "messages/Message.h" +#include "messages/TapeMsg.h" #include "OrkClient.h" #include "Daemon.h" +#include "BatchProcessing.h" Reporting Reporting::m_reportingSingleton; @@ -67,16 +69,22 @@ void Reporting::ThreadHandler(void *args) LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString); OrkHttpSingleLineClient c; - SimpleResponseMsg srm; + TapeResponse tr; bool success = false; bool firstError = true; while (!success) { - if (c.Execute((SyncMessage&)(*msgRef.get()), srm, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + if (c.Execute((SyncMessage&)(*msgRef.get()), tr, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) { success = true; + if(tr.m_deleteTape) + { + LOG4CXX_INFO(LOG.reportingLog, "Registered tape for removal: " + audioTapeRef->GetIdentifier()); + CStdString tapeFilename = audioTapeRef->GetFilename(); + BatchProcessing::GetInstance()->TapeDropRegistration(tapeFilename); + } } else { diff --git a/orkaudio/messages/TapeMsg.cpp b/orkaudio/messages/TapeMsg.cpp index c52b022..4f21703 100644 --- a/orkaudio/messages/TapeMsg.cpp +++ b/orkaudio/messages/TapeMsg.cpp @@ -57,3 +57,27 @@ ObjectRef TapeMsg::NewInstance() return ObjectRef(new TapeMsg); } + +//========================================================== +TapeResponse::TapeResponse() +{ + m_deleteTape = false; +} + + +void TapeResponse::Define(Serializer* s) +{ + SimpleResponseMsg::Define(s); + s->BoolValue("deletetape", m_deleteTape); +} + +CStdString TapeResponse::GetClassName() +{ + return CStdString("taperesponse"); +} + +ObjectRef TapeResponse::NewInstance() +{ + return ObjectRef(new TapeResponse); +} + diff --git a/orkaudio/messages/TapeMsg.h b/orkaudio/messages/TapeMsg.h index 9d0ec3f..5365e02 100644 --- a/orkaudio/messages/TapeMsg.h +++ b/orkaudio/messages/TapeMsg.h @@ -15,6 +15,7 @@ #define __TAPEMSG_H__ #include "messages/SyncMessage.h" +#include "messages/ASyncMessage.h" #include "AudioTape.h" #define TAPE_MESSAGE_NAME "tape" @@ -56,5 +57,21 @@ public: typedef boost::shared_ptr TapeMsgRef; +/** A TapeResponse is a response to TapeMsg +*/ +class TapeResponse : public SimpleResponseMsg +{ +public: + TapeResponse(); + void Define(Serializer* s); + inline void Validate() {}; + + CStdString GetClassName(); + ObjectRef NewInstance(); + inline ObjectRef Process() {return ObjectRef();}; + + bool m_deleteTape; +}; + #endif -- cgit v1.2.3