diff options
author | Henri Herscher <henri@oreka.org> | 2006-03-19 05:40:53 +0000 |
---|---|---|
committer | Henri Herscher <henri@oreka.org> | 2006-03-19 05:40:53 +0000 |
commit | 5298d5fadf5ba92a302a0aebd946c093dc54ba61 (patch) | |
tree | 5f48d576310af09d4209fa90c24c4c213a0bf53d /orkaudio | |
parent | 0ecbeb5613451527fa088821a48b9a64b6c7d637 (diff) |
Default immediate processing queue size is now 10000. Default batch processing queue size is now 20000. Both values are now configurable.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@198 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkaudio')
-rw-r--r-- | orkaudio/BatchProcessing.cpp | 21 | ||||
-rw-r--r-- | orkaudio/BatchProcessing.h | 2 | ||||
-rw-r--r-- | orkaudio/Config.cpp | 4 | ||||
-rw-r--r-- | orkaudio/Config.h | 7 | ||||
-rw-r--r-- | orkaudio/ImmediateProcessing.cpp | 14 | ||||
-rw-r--r-- | orkaudio/ImmediateProcessing.h | 1 | ||||
-rw-r--r-- | orkaudio/ThreadSafeQueue.h | 7 |
7 files changed, 48 insertions, 8 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp index 6d65afe..43e0563 100644 --- a/orkaudio/BatchProcessing.cpp +++ b/orkaudio/BatchProcessing.cpp @@ -46,6 +46,11 @@ void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef) } } +void BatchProcessing::SetQueueSize(int size) +{ + m_audioTapeQueue.setSize(size); +} + void BatchProcessing::TapeDropRegistration(CStdString& filename) { MutexSentinel sentinel(m_tapeDropMutex); @@ -110,13 +115,17 @@ void BatchProcessing::ThreadHandler(void *args) CStdString debug; BatchProcessing* pBatchProcessing = BatchProcessing::GetInstance(); + + pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize); + int threadId = 0; { MutexSentinel sentinel(pBatchProcessing->m_mutex); threadId = pBatchProcessing->m_threadCount++; } CStdString threadIdString = IntToString(threadId); - LOG4CXX_DEBUG(LOG.batchProcessingLog, CStdString("Created thread #") + threadIdString); + debug.Format("thread #%s starting - queue size:%d", threadIdString, CONFIG.m_batchProcessingQueueSize); + LOG4CXX_INFO(LOG.batchProcessingLog, debug); bool stop = false; @@ -151,7 +160,7 @@ void BatchProcessing::ThreadHandler(void *args) else { // Let's work on the tape we have pulled - CStdString threadIdString = IntToString(threadId); + //CStdString threadIdString = IntToString(threadId); LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier()); fileRef->MoveOrig(); @@ -246,10 +255,10 @@ void BatchProcessing::ThreadHandler(void *args) } catch (CStdString& e) { - if(CONFIG.m_deleteNativeFile && fileRef.get() != NULL) - { - fileRef->Delete(); - } + //if(CONFIG.m_deleteNativeFile && fileRef.get() != NULL) + //{ + // fileRef->Delete(); + //} LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("BatchProcessing: ") + e); } //catch(...) diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h index 3c50bbf..ec9c88b 100644 --- a/orkaudio/BatchProcessing.h +++ b/orkaudio/BatchProcessing.h @@ -26,6 +26,8 @@ public: static void ThreadHandler(void *args); void AddAudioTape(AudioTapeRef audioTapeRef); + void SetQueueSize(int size); + /** Ask for a tape to be deleted from disk */ void TapeDropRegistration(CStdString& filename); private: diff --git a/orkaudio/Config.cpp b/orkaudio/Config.cpp index 35c7b6f..4a9e295 100644 --- a/orkaudio/Config.cpp +++ b/orkaudio/Config.cpp @@ -38,6 +38,8 @@ Config::Config() m_trackerTcpPort = TRACKER_TCP_PORT_DEFAULT; m_trackerServicename = TRACKER_SERVICENAME_DEFAULT; m_audioOutputPath = AUDIO_OUTPUT_PATH_DEFAULT; + m_immediateProcessingQueueSize = IMMEDIATE_PROCESSING_QUEUE_SIZE_DEFAULT; + m_batchProcessingQueueSize = BATCH_PROCESSING_QUEUE_SIZE_DEFAULT; char hostname[40]; ACE_OS::hostname(hostname, 40); @@ -72,6 +74,8 @@ void Config::Define(Serializer* s) s->IntValue(REPORTING_RETRY_DELAY_PARAM, m_reportingRetryDelay); s->IntValue(CLIENT_TIMEOUT_PARAM, m_clientTimeout); s->StringValue(AUDIO_OUTPUT_PATH_PARAM, m_audioOutputPath); + s->IntValue(IMMEDIATE_PROCESSING_QUEUE_SIZE_PARAM, m_immediateProcessingQueueSize); + s->IntValue(BATCH_PROCESSING_QUEUE_SIZE_PARAM, m_batchProcessingQueueSize); } void Config::Validate() diff --git a/orkaudio/Config.h b/orkaudio/Config.h index 4d9f150..95a31d9 100644 --- a/orkaudio/Config.h +++ b/orkaudio/Config.h @@ -59,7 +59,10 @@ #define CLIENT_TIMEOUT_PARAM "ClientTimeout" #define AUDIO_OUTPUT_PATH_PARAM "AudioOutputPath" #define AUDIO_OUTPUT_PATH_DEFAULT "." - +#define IMMEDIATE_PROCESSING_QUEUE_SIZE_PARAM "ImmediateProcessingQueueSize" +#define IMMEDIATE_PROCESSING_QUEUE_SIZE_DEFAULT 10000 +#define BATCH_PROCESSING_QUEUE_SIZE_PARAM "BatchProcessingQueueSize" +#define BATCH_PROCESSING_QUEUE_SIZE_DEFAULT 20000 class Config : public Object { @@ -94,6 +97,8 @@ public: int m_reportingRetryDelay; int m_clientTimeout; CStdString m_audioOutputPath; + int m_immediateProcessingQueueSize; + int m_batchProcessingQueueSize; }; diff --git a/orkaudio/ImmediateProcessing.cpp b/orkaudio/ImmediateProcessing.cpp index 1298117..5aeb1dc 100644 --- a/orkaudio/ImmediateProcessing.cpp +++ b/orkaudio/ImmediateProcessing.cpp @@ -17,6 +17,7 @@ #include "ace/OS_NS_unistd.h" #include "BatchProcessing.h" #include "Daemon.h" +#include "ConfigManager.h" ImmediateProcessing ImmediateProcessing::m_immediateProcessingSingleton; @@ -34,10 +35,23 @@ void ImmediateProcessing::AddAudioTape(AudioTapeRef audioTapeRef) } } +void ImmediateProcessing::SetQueueSize(int size) +{ + m_audioTapeQueue.setSize(size); +} + + void ImmediateProcessing::ThreadHandler(void *args) { + CStdString logMsg; + ImmediateProcessing* pImmediateProcessing = ImmediateProcessing::GetInstance(); + pImmediateProcessing->SetQueueSize(CONFIG.m_immediateProcessingQueueSize); + + logMsg.Format("thread starting - queue size:%d", CONFIG.m_immediateProcessingQueueSize); + LOG4CXX_INFO(LOG.immediateProcessingLog, logMsg); + bool stop = false; for(;stop == false;) diff --git a/orkaudio/ImmediateProcessing.h b/orkaudio/ImmediateProcessing.h index 617f105..7912535 100644 --- a/orkaudio/ImmediateProcessing.h +++ b/orkaudio/ImmediateProcessing.h @@ -24,6 +24,7 @@ public: static void ThreadHandler(void *args); void AddAudioTape(AudioTapeRef audioTapeRef); + void SetQueueSize(int size); private: static ImmediateProcessing m_immediateProcessingSingleton; ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue; diff --git a/orkaudio/ThreadSafeQueue.h b/orkaudio/ThreadSafeQueue.h index 7d0c3e3..c14b5b9 100644 --- a/orkaudio/ThreadSafeQueue.h +++ b/orkaudio/ThreadSafeQueue.h @@ -26,7 +26,7 @@ template <class T> class ThreadSafeQueue { public: - ThreadSafeQueue(int size = 2000) + ThreadSafeQueue(int size = 10000) { m_size = size; m_semaphore.acquire(); // reset count to zero @@ -35,6 +35,7 @@ public: bool push(T &); T pop(); int numElements(); + void setSize(int size); private: int m_size; @@ -82,6 +83,10 @@ template <class T> int ThreadSafeQueue<T>::numElements() return m_queue.size(); } +template <class T> void ThreadSafeQueue<T>::setSize(int size) +{ + m_size = size; +} #endif // __THREADSAFEQUEUE_H__ |