summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenri Herscher <henri@oreka.org>2006-03-19 05:40:53 +0000
committerHenri Herscher <henri@oreka.org>2006-03-19 05:40:53 +0000
commit5298d5fadf5ba92a302a0aebd946c093dc54ba61 (patch)
tree5f48d576310af09d4209fa90c24c4c213a0bf53d
parent0ecbeb5613451527fa088821a48b9a64b6c7d637 (diff)
Default immediate processing queue size is now 10000. Default batch processing queue size is now 20000. Both values are now configurable.
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@198 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r--orkaudio/BatchProcessing.cpp21
-rw-r--r--orkaudio/BatchProcessing.h2
-rw-r--r--orkaudio/Config.cpp4
-rw-r--r--orkaudio/Config.h7
-rw-r--r--orkaudio/ImmediateProcessing.cpp14
-rw-r--r--orkaudio/ImmediateProcessing.h1
-rw-r--r--orkaudio/ThreadSafeQueue.h7
7 files changed, 48 insertions, 8 deletions
diff --git a/orkaudio/BatchProcessing.cpp b/orkaudio/BatchProcessing.cpp
index 6d65afe..43e0563 100644
--- a/orkaudio/BatchProcessing.cpp
+++ b/orkaudio/BatchProcessing.cpp
@@ -46,6 +46,11 @@ void BatchProcessing::AddAudioTape(AudioTapeRef audioTapeRef)
}
}
+void BatchProcessing::SetQueueSize(int size)
+{
+ m_audioTapeQueue.setSize(size);
+}
+
void BatchProcessing::TapeDropRegistration(CStdString& filename)
{
MutexSentinel sentinel(m_tapeDropMutex);
@@ -110,13 +115,17 @@ void BatchProcessing::ThreadHandler(void *args)
CStdString debug;
BatchProcessing* pBatchProcessing = BatchProcessing::GetInstance();
+
+ pBatchProcessing->SetQueueSize(CONFIG.m_batchProcessingQueueSize);
+
int threadId = 0;
{
MutexSentinel sentinel(pBatchProcessing->m_mutex);
threadId = pBatchProcessing->m_threadCount++;
}
CStdString threadIdString = IntToString(threadId);
- LOG4CXX_DEBUG(LOG.batchProcessingLog, CStdString("Created thread #") + threadIdString);
+ debug.Format("thread #%s starting - queue size:%d", threadIdString, CONFIG.m_batchProcessingQueueSize);
+ LOG4CXX_INFO(LOG.batchProcessingLog, debug);
bool stop = false;
@@ -151,7 +160,7 @@ void BatchProcessing::ThreadHandler(void *args)
else
{
// Let's work on the tape we have pulled
- CStdString threadIdString = IntToString(threadId);
+ //CStdString threadIdString = IntToString(threadId);
LOG4CXX_INFO(LOG.batchProcessingLog, CStdString("Th") + threadIdString + " processing: " + audioTapeRef->GetIdentifier());
fileRef->MoveOrig();
@@ -246,10 +255,10 @@ void BatchProcessing::ThreadHandler(void *args)
}
catch (CStdString& e)
{
- if(CONFIG.m_deleteNativeFile && fileRef.get() != NULL)
- {
- fileRef->Delete();
- }
+ //if(CONFIG.m_deleteNativeFile && fileRef.get() != NULL)
+ //{
+ // fileRef->Delete();
+ //}
LOG4CXX_ERROR(LOG.batchProcessingLog, CStdString("BatchProcessing: ") + e);
}
//catch(...)
diff --git a/orkaudio/BatchProcessing.h b/orkaudio/BatchProcessing.h
index 3c50bbf..ec9c88b 100644
--- a/orkaudio/BatchProcessing.h
+++ b/orkaudio/BatchProcessing.h
@@ -26,6 +26,8 @@ public:
static void ThreadHandler(void *args);
void AddAudioTape(AudioTapeRef audioTapeRef);
+ void SetQueueSize(int size);
+
/** Ask for a tape to be deleted from disk */
void TapeDropRegistration(CStdString& filename);
private:
diff --git a/orkaudio/Config.cpp b/orkaudio/Config.cpp
index 35c7b6f..4a9e295 100644
--- a/orkaudio/Config.cpp
+++ b/orkaudio/Config.cpp
@@ -38,6 +38,8 @@ Config::Config()
m_trackerTcpPort = TRACKER_TCP_PORT_DEFAULT;
m_trackerServicename = TRACKER_SERVICENAME_DEFAULT;
m_audioOutputPath = AUDIO_OUTPUT_PATH_DEFAULT;
+ m_immediateProcessingQueueSize = IMMEDIATE_PROCESSING_QUEUE_SIZE_DEFAULT;
+ m_batchProcessingQueueSize = BATCH_PROCESSING_QUEUE_SIZE_DEFAULT;
char hostname[40];
ACE_OS::hostname(hostname, 40);
@@ -72,6 +74,8 @@ void Config::Define(Serializer* s)
s->IntValue(REPORTING_RETRY_DELAY_PARAM, m_reportingRetryDelay);
s->IntValue(CLIENT_TIMEOUT_PARAM, m_clientTimeout);
s->StringValue(AUDIO_OUTPUT_PATH_PARAM, m_audioOutputPath);
+ s->IntValue(IMMEDIATE_PROCESSING_QUEUE_SIZE_PARAM, m_immediateProcessingQueueSize);
+ s->IntValue(BATCH_PROCESSING_QUEUE_SIZE_PARAM, m_batchProcessingQueueSize);
}
void Config::Validate()
diff --git a/orkaudio/Config.h b/orkaudio/Config.h
index 4d9f150..95a31d9 100644
--- a/orkaudio/Config.h
+++ b/orkaudio/Config.h
@@ -59,7 +59,10 @@
#define CLIENT_TIMEOUT_PARAM "ClientTimeout"
#define AUDIO_OUTPUT_PATH_PARAM "AudioOutputPath"
#define AUDIO_OUTPUT_PATH_DEFAULT "."
-
+#define IMMEDIATE_PROCESSING_QUEUE_SIZE_PARAM "ImmediateProcessingQueueSize"
+#define IMMEDIATE_PROCESSING_QUEUE_SIZE_DEFAULT 10000
+#define BATCH_PROCESSING_QUEUE_SIZE_PARAM "BatchProcessingQueueSize"
+#define BATCH_PROCESSING_QUEUE_SIZE_DEFAULT 20000
class Config : public Object
{
@@ -94,6 +97,8 @@ public:
int m_reportingRetryDelay;
int m_clientTimeout;
CStdString m_audioOutputPath;
+ int m_immediateProcessingQueueSize;
+ int m_batchProcessingQueueSize;
};
diff --git a/orkaudio/ImmediateProcessing.cpp b/orkaudio/ImmediateProcessing.cpp
index 1298117..5aeb1dc 100644
--- a/orkaudio/ImmediateProcessing.cpp
+++ b/orkaudio/ImmediateProcessing.cpp
@@ -17,6 +17,7 @@
#include "ace/OS_NS_unistd.h"
#include "BatchProcessing.h"
#include "Daemon.h"
+#include "ConfigManager.h"
ImmediateProcessing ImmediateProcessing::m_immediateProcessingSingleton;
@@ -34,10 +35,23 @@ void ImmediateProcessing::AddAudioTape(AudioTapeRef audioTapeRef)
}
}
+void ImmediateProcessing::SetQueueSize(int size)
+{
+ m_audioTapeQueue.setSize(size);
+}
+
+
void ImmediateProcessing::ThreadHandler(void *args)
{
+ CStdString logMsg;
+
ImmediateProcessing* pImmediateProcessing = ImmediateProcessing::GetInstance();
+ pImmediateProcessing->SetQueueSize(CONFIG.m_immediateProcessingQueueSize);
+
+ logMsg.Format("thread starting - queue size:%d", CONFIG.m_immediateProcessingQueueSize);
+ LOG4CXX_INFO(LOG.immediateProcessingLog, logMsg);
+
bool stop = false;
for(;stop == false;)
diff --git a/orkaudio/ImmediateProcessing.h b/orkaudio/ImmediateProcessing.h
index 617f105..7912535 100644
--- a/orkaudio/ImmediateProcessing.h
+++ b/orkaudio/ImmediateProcessing.h
@@ -24,6 +24,7 @@ public:
static void ThreadHandler(void *args);
void AddAudioTape(AudioTapeRef audioTapeRef);
+ void SetQueueSize(int size);
private:
static ImmediateProcessing m_immediateProcessingSingleton;
ThreadSafeQueue<AudioTapeRef> m_audioTapeQueue;
diff --git a/orkaudio/ThreadSafeQueue.h b/orkaudio/ThreadSafeQueue.h
index 7d0c3e3..c14b5b9 100644
--- a/orkaudio/ThreadSafeQueue.h
+++ b/orkaudio/ThreadSafeQueue.h
@@ -26,7 +26,7 @@
template <class T> class ThreadSafeQueue
{
public:
- ThreadSafeQueue(int size = 2000)
+ ThreadSafeQueue(int size = 10000)
{
m_size = size;
m_semaphore.acquire(); // reset count to zero
@@ -35,6 +35,7 @@ public:
bool push(T &);
T pop();
int numElements();
+ void setSize(int size);
private:
int m_size;
@@ -82,6 +83,10 @@ template <class T> int ThreadSafeQueue<T>::numElements()
return m_queue.size();
}
+template <class T> void ThreadSafeQueue<T>::setSize(int size)
+{
+ m_size = size;
+}
#endif // __THREADSAFEQUEUE_H__