summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenri Herscher <henri@oreka.org>2006-03-02 16:11:12 +0000
committerHenri Herscher <henri@oreka.org>2006-03-02 16:11:12 +0000
commit6d301a8dadf98464952912db2646e68df15b347f (patch)
tree1f0f3bb33c567bfecbf1e302c657324bda2975e9
parent8dcf8fc31fca9820efe23b6aa82633624bbc15ff (diff)
OrkTrack can now ask for tape deletion
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@189 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r--orkaudio/AudioTape.cpp21
-rw-r--r--orkaudio/AudioTape.h2
-rw-r--r--orkaudio/BatchProcessing.cpp219
-rw-r--r--orkaudio/BatchProcessing.h10
-rw-r--r--orkaudio/CapturePort.cpp4
-rw-r--r--orkaudio/OrkAudio.cpp2
-rw-r--r--orkaudio/Reporting.cpp12
-rw-r--r--orkaudio/messages/TapeMsg.cpp24
-rw-r--r--orkaudio/messages/TapeMsg.h17
9 files changed, 227 insertions, 84 deletions
diff --git a/orkaudio/AudioTape.cpp b/orkaudio/AudioTape.cpp
index eaae2d2..7efce4d 100644
--- a/orkaudio/AudioTape.cpp
+++ b/orkaudio/AudioTape.cpp
@@ -240,20 +240,11 @@ void AudioTape::GetMessage(MessageRef& msgRef)
msgRef.reset(new TapeMsg);
TapeMsg* pTapeMsg = (TapeMsg*)msgRef.get();
- if(captureEventRef->m_type == CaptureEvent::EtStart || captureEventRef->m_type == CaptureEvent::EtStop)
+ if(captureEventRef->m_type == CaptureEvent::EtStop)
{
- if (captureEventRef->m_type == CaptureEvent::EtStart)
- {
- pTapeMsg->m_timestamp = m_beginDate;
- }
- else
- {
- pTapeMsg->m_timestamp = m_endDate;
- }
-
pTapeMsg->m_recId = m_fileIdentifier;
pTapeMsg->m_fileName = m_filePath + m_fileIdentifier + m_fileExtension;
- pTapeMsg-> m_stage = CaptureEvent::EventTypeToString(captureEventRef->m_type);
+ pTapeMsg->m_stage = CaptureEvent::EventTypeToString(captureEventRef->m_type);
pTapeMsg->m_capturePort = m_portId;
pTapeMsg->m_localParty = m_localParty;
pTapeMsg->m_localEntryPoint = m_localEntryPoint;
@@ -264,7 +255,7 @@ void AudioTape::GetMessage(MessageRef& msgRef)
}
else
{
- // This should be a key-value pair message
+ // This should be a key-value pair message or a start event
}
}
@@ -284,6 +275,12 @@ CStdString AudioTape::GetIdentifier()
return m_fileIdentifier;
}
+CStdString AudioTape::GetFilename()
+{
+ return m_filePath + m_fileIdentifier + m_fileExtension;
+}
+
+
CStdString AudioTape::GetPath()
{
return m_filePath;
diff --git a/orkaudio/AudioTape.h b/orkaudio/AudioTape.h
index 899fda0..adf9b85 100644
--- a/orkaudio/AudioTape.h
+++ b/orkaudio/AudioTape.h
@@ -88,6 +88,8 @@ public:
void GetMessage(MessageRef& msg);
/** Returns an identifier for the tape which corresponds to the filename without extension */
CStdString GetIdentifier();
+ /** Returns the full filename (including relative path) to the post-compression audio file */
+ CStdString GetFilename();
CStdString GetPath();
AudioFileRef GetAudioFileRef();
bool IsReadyForBatchProcessing();
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp
index 5be8b74..1435bb4 100644
--- a/orkaudio/BatchProcessing.cpp
+++ b/orkaudio/BatchProcessing.cpp
@@ -24,6 +24,11 @@ BatchProcessing BatchProcessing::m_batchProcessingSingleton;
BatchProcessing::BatchProcessing()
{
m_threadCount = 0;
+
+ struct tm date = {0};
+ time_t now = time(NULL);
+ ACE_OS::localtime_r(&now, &date);
+ m_currentDay = date.tm_mday;
}
@@ -41,6 +46,65 @@ void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef)
}
}
+void BatchProcessing::TapeDropRegistration(CStdString& filename)
+{
+ MutexSentinel sentinel(m_tapeDropMutex);
+
+ CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename;
+ if (ACE_OS::unlink((PCSTR)absoluteFilename) != 0)
+ {
+ LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename);
+ m_tapesToDrop.insert(std::make_pair(filename, time(NULL)));
+ }
+ else
+ {
+ LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename);
+ }
+}
+
+bool BatchProcessing::DropTapeIfNeeded(CStdString& filename)
+{
+ bool shouldDrop = false;
+
+ MutexSentinel sentinel(m_tapeDropMutex);
+
+ std::map<CStdString, time_t>::iterator pair;
+ pair = m_tapesToDrop.find(filename);
+ if(pair != m_tapesToDrop.end())
+ {
+ shouldDrop = true;
+ CStdString absoluteFilename = CONFIG.m_audioOutputPath + "/" + filename;
+ if (ACE_OS::unlink((PCSTR)absoluteFilename) == 0)
+ {
+ LOG4CXX_INFO(LOG.batchProcessingLog, "Deleted tape: " + filename);
+ m_tapesToDrop.erase(filename);
+ }
+ else
+ {
+ LOG4CXX_DEBUG(LOG.batchProcessingLog, "Could not deleted tape: " + filename);
+ }
+ }
+
+ TapeDropHousekeeping();
+
+ return shouldDrop;
+}
+
+void BatchProcessing::TapeDropHousekeeping()
+{
+ struct tm date = {0};
+ time_t now = time(NULL);
+ ACE_OS::localtime_r(&now, &date);
+ if(m_currentDay != date.tm_mday)
+ {
+ // another day has passed away ... clear possible leftovers
+ m_currentDay = date.tm_mday;
+ m_tapesToDrop.clear();
+ }
+}
+
+
+
void BatchProcessing::ThreadHandler(void *args)
{
CStdString debug;
@@ -73,89 +137,108 @@ void BatchProcessing::ThreadHandler(void *args)
}
else
{
- CStdString threadIdString = IntToString(threadId);
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier());
-
fileRef = audioTapeRef->GetAudioFileRef();
- fileRef->MoveOrig();
- fileRef->Open(AudioFile::READ);
- AudioChunkRef chunkRef;
- AudioChunkRef tmpChunkRef;
-
- switch(CONFIG.m_storageAudioFormat)
+ if(pBatchProcessing->DropTapeIfNeeded(audioTapeRef->GetFilename()) == true)
{
- case AudioTape::FfUlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
- break;
- case AudioTape::FfAlaw:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
- break;
- case AudioTape::FfGsm:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
- break;
- case AudioTape::FfPcmWav:
- default:
- outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
+ // The tape we have pulled has been dropped in the meantime. just delete the capture file
+ if(CONFIG.m_deleteNativeFile)
+ {
+ fileRef->Delete();
+ }
}
- CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier();
- outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
+ else
+ {
+ // Let's work on the tape we have pulled
+ CStdString threadIdString = IntToString(threadId);
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier());
- FilterRef filter;
- FilterRef decoder1;
- FilterRef decoder2;
+ fileRef->MoveOrig();
+ fileRef->Open(AudioFile::READ);
- bool firstChunk = true;
- bool voIpSession = false;
+ AudioChunkRef chunkRef;
+ AudioChunkRef tmpChunkRef;
- while(fileRef->ReadChunkMono(chunkRef))
- {
- AudioChunkDetails details = *chunkRef->GetDetails();
- if(firstChunk && details.m_rtpPayloadType != -1)
+ switch(CONFIG.m_storageAudioFormat)
{
- firstChunk = false;
- CStdString filterName("RtpMixer");
- filter = FilterRegistry::instance()->GetNewFilter(filterName);
- if(filter.get() == NULL)
- {
- debug = "BatchProcessing - Could not instanciate RTP mixer";
- throw(debug);
- }
- decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
- decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
- if(decoder1.get() == NULL || decoder2.get() == NULL)
- {
- debug.Format("BatchProcessing - Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType);
- throw(debug);
- }
- voIpSession = true;
+ case AudioTape::FfUlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ULAW | SF_FORMAT_WAV));
+ break;
+ case AudioTape::FfAlaw:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_ALAW | SF_FORMAT_WAV));
+ break;
+ case AudioTape::FfGsm:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_GSM610 | SF_FORMAT_WAV));
+ break;
+ case AudioTape::FfPcmWav:
+ default:
+ outFileRef.reset(new LibSndFileFile(SF_FORMAT_PCM_16 | SF_FORMAT_WAV));
}
- if(voIpSession)
- {
- if(details.m_channel == 2)
+ CStdString file = CONFIG.m_audioOutputPath + "/" + audioTapeRef->GetPath() + audioTapeRef->GetIdentifier();
+ outFileRef->Open(file, AudioFile::WRITE, false, fileRef->GetSampleRate());
+
+ FilterRef filter;
+ FilterRef decoder1;
+ FilterRef decoder2;
+
+ bool firstChunk = true;
+ bool voIpSession = false;
+
+ while(fileRef->ReadChunkMono(chunkRef))
+ {
+ AudioChunkDetails details = *chunkRef->GetDetails();
+ if(firstChunk && details.m_rtpPayloadType != -1)
{
- decoder2->AudioChunkIn(chunkRef);
- decoder2->AudioChunkOut(tmpChunkRef);
+ firstChunk = false;
+ CStdString filterName("RtpMixer");
+ filter = FilterRegistry::instance()->GetNewFilter(filterName);
+ if(filter.get() == NULL)
+ {
+ debug = "BatchProcessing - Could not instanciate RTP mixer";
+ throw(debug);
+ }
+ decoder1 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
+ decoder2 = FilterRegistry::instance()->GetNewFilter(details.m_rtpPayloadType);
+ if(decoder1.get() == NULL || decoder2.get() == NULL)
+ {
+ debug.Format("BatchProcessing - Could not find decoder for RTP payload type:%u", chunkRef->GetDetails()->m_rtpPayloadType);
+ throw(debug);
+ }
+ voIpSession = true;
}
- else
- {
- decoder1->AudioChunkIn(chunkRef);
- decoder1->AudioChunkOut(tmpChunkRef);
+ if(voIpSession)
+ {
+ if(details.m_channel == 2)
+ {
+ decoder2->AudioChunkIn(chunkRef);
+ decoder2->AudioChunkOut(tmpChunkRef);
+ }
+ else
+ {
+ decoder1->AudioChunkIn(chunkRef);
+ decoder1->AudioChunkOut(tmpChunkRef);
+ }
+ filter->AudioChunkIn(tmpChunkRef);
+ filter->AudioChunkOut(tmpChunkRef);
}
- filter->AudioChunkIn(tmpChunkRef);
- filter->AudioChunkOut(tmpChunkRef);
+ outFileRef->WriteChunk(tmpChunkRef);
+
+ // Give up CPU to make sure the actual recording always has priority
+ ACE_Time_Value yield;
+ yield.set(0,1); // 1 us
+ ACE_OS::sleep(yield);
}
- outFileRef->WriteChunk(tmpChunkRef);
- }
- fileRef->Close();
- outFileRef->Close();
+ fileRef->Close();
+ outFileRef->Close();
- if(CONFIG.m_deleteNativeFile)
- {
- fileRef->Delete();
- CStdString threadIdString = IntToString(threadId);
- LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
+ if(CONFIG.m_deleteNativeFile)
+ {
+ fileRef->Delete();
+ CStdString threadIdString = IntToString(threadId);
+ LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " deleting native: " + audioTapeRef->GetIdentifier());
+ }
+ pBatchProcessing->DropTapeIfNeeded(audioTapeRef->GetFilename()); // maybe the tape was dropped while we were processing it
}
}
}
diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h
index 73a62f0..3c50bbf 100644
--- a/orkaudio/BatchProcessing.h
+++ b/orkaudio/BatchProcessing.h
@@ -17,6 +17,7 @@
#include "ThreadSafeQueue.h"
#include "AudioTape.h"
#include "ace/Thread_Mutex.h"
+#include <map>
class BatchProcessing
{
@@ -25,14 +26,23 @@ public:
static void ThreadHandler(void *args);
void AddAudioTape(AudioTapeRef audioTapeRef);
+ /** Ask for a tape to be deleted from disk */
+ void TapeDropRegistration(CStdString& filename);
private:
BatchProcessing();
+ bool DropTapeIfNeeded(CStdString&filename);
+ void TapeDropHousekeeping();
+
static BatchProcessing m_batchProcessingSingleton;
ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
size_t m_threadCount;
ACE_Thread_Mutex m_mutex;
+
+ ACE_Thread_Mutex m_tapeDropMutex;
+ std::map<CStdString, time_t> m_tapesToDrop;
+ int m_currentDay;
};
#endif
diff --git a/orkaudio/CapturePort.cpp b/orkaudio/CapturePort.cpp
index 27e3943..044520f 100644
--- a/orkaudio/CapturePort.cpp
+++ b/orkaudio/CapturePort.cpp
@@ -150,8 +150,8 @@ void CapturePort::AddCaptureEvent(CaptureEventRef eventRef)
audioTapeRef->SetShouldStop(); // force stop of previous tape
}
audioTapeRef.reset(new AudioTape(m_Id)); // Create a new tape
- audioTapeRef->AddCaptureEvent(eventRef, true);
- Reporting::GetInstance()->AddAudioTape(audioTapeRef);
+ audioTapeRef->AddCaptureEvent(eventRef, false);
+ //Reporting::GetInstance()->AddAudioTape(audioTapeRef);
m_audioTapeRef = audioTapeRef;
LOG4CXX_INFO(LOG.portLog, "#" + m_Id + ": start");
}
diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp
index 5e7cad3..14bf722 100644
--- a/orkaudio/OrkAudio.cpp
+++ b/orkaudio/OrkAudio.cpp
@@ -124,6 +124,8 @@ void MainThread()
ObjectFactorySingleton::instance()->RegisterObject(objRef);
objRef.reset(new TapeMsg);
ObjectFactorySingleton::instance()->RegisterObject(objRef);
+ objRef.reset(new TapeResponse);
+ ObjectFactorySingleton::instance()->RegisterObject(objRef);
objRef.reset(new SimpleResponseMsg);
ObjectFactorySingleton::instance()->RegisterObject(objRef);
objRef.reset(new DeleteTapeMsg);
diff --git a/orkaudio/Reporting.cpp b/orkaudio/Reporting.cpp
index 799a3e0..308b58c 100644
--- a/orkaudio/Reporting.cpp
+++ b/orkaudio/Reporting.cpp
@@ -15,8 +15,10 @@
#include "Reporting.h"
#include "LogManager.h"
#include "messages/Message.h"
+#include "messages/TapeMsg.h"
#include "OrkClient.h"
#include "Daemon.h"
+#include "BatchProcessing.h"
Reporting Reporting::m_reportingSingleton;
@@ -67,16 +69,22 @@ void Reporting::ThreadHandler(void *args)
LOG4CXX_INFO(LOG.reportingLog, msgAsSingleLineString);
OrkHttpSingleLineClient c;
- SimpleResponseMsg srm;
+ TapeResponse tr;
bool success = false;
bool firstError = true;
while (!success)
{
- if (c.Execute((SyncMessage&)(*msgRef.get()), srm, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
+ if (c.Execute((SyncMessage&)(*msgRef.get()), tr, CONFIG.m_trackerHostname, CONFIG.m_trackerTcpPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout))
{
success = true;
+ if(tr.m_deleteTape)
+ {
+ LOG4CXX_INFO(LOG.reportingLog, "Registered tape for removal: " + audioTapeRef->GetIdentifier());
+ CStdString tapeFilename = audioTapeRef->GetFilename();
+ BatchProcessing::GetInstance()->TapeDropRegistration(tapeFilename);
+ }
}
else
{
diff --git a/orkaudio/messages/TapeMsg.cpp b/orkaudio/messages/TapeMsg.cpp
index c52b022..4f21703 100644
--- a/orkaudio/messages/TapeMsg.cpp
+++ b/orkaudio/messages/TapeMsg.cpp
@@ -57,3 +57,27 @@ ObjectRef TapeMsg::NewInstance()
return ObjectRef(new TapeMsg);
}
+
+//==========================================================
+TapeResponse::TapeResponse()
+{
+ m_deleteTape = false;
+}
+
+
+void TapeResponse::Define(Serializer* s)
+{
+ SimpleResponseMsg::Define(s);
+ s->BoolValue("deletetape", m_deleteTape);
+}
+
+CStdString TapeResponse::GetClassName()
+{
+ return CStdString("taperesponse");
+}
+
+ObjectRef TapeResponse::NewInstance()
+{
+ return ObjectRef(new TapeResponse);
+}
+
diff --git a/orkaudio/messages/TapeMsg.h b/orkaudio/messages/TapeMsg.h
index 9d0ec3f..5365e02 100644
--- a/orkaudio/messages/TapeMsg.h
+++ b/orkaudio/messages/TapeMsg.h
@@ -15,6 +15,7 @@
#define __TAPEMSG_H__
#include "messages/SyncMessage.h"
+#include "messages/ASyncMessage.h"
#include "AudioTape.h"
#define TAPE_MESSAGE_NAME "tape"
@@ -56,5 +57,21 @@ public:
typedef boost::shared_ptr<TapeMsg> TapeMsgRef;
+/** A TapeResponse is a response to TapeMsg
+*/
+class TapeResponse : public SimpleResponseMsg
+{
+public:
+ TapeResponse();
+ void Define(Serializer* s);
+ inline void Validate() {};
+
+ CStdString GetClassName();
+ ObjectRef NewInstance();
+ inline ObjectRef Process() {return ObjectRef();};
+
+ bool m_deleteTape;
+};
+
#endif