summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGerald Begumisa <ben_g@users.sourceforge.net>2009-01-07 13:07:31 +0000
committerGerald Begumisa <ben_g@users.sourceforge.net>2009-01-07 13:07:31 +0000
commit0e2a2e49077b79bd52d6c83c5202e452e7eea091 (patch)
tree58ae466e2c74be585548f10ffff96560099c9370
parentded03e95a1eb78cb16ef3a4a0dc9bb140bd748ba (diff)
Modified the orkaudio API, adding the record and pause HTTP commands. The record command commences or un-pauses recording while the pause command pauses recording - discarding RTP packets from when the pause command is issued. Both commands require the orkuid and party to be specified as HTTP parameters. Note that this represents a change in the arguments required for the StartCapture function in DLLs. Also added an event streaming feature which streams out all tape messages as they are reported in real-time to a client connected. Clients should connect via HTTP, on port 59150. The port is configurable by setting the parameter EventStreamingServerPort in config.xml
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@590 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r--orkaudio/OrkAudio.cpp10
-rw-r--r--orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h3
-rw-r--r--orkaudio/audiocaptureplugins/voip/RtpSession.cpp94
-rw-r--r--orkaudio/audiocaptureplugins/voip/RtpSession.h3
-rw-r--r--orkaudio/audiocaptureplugins/voip/VoIp.cpp45
-rw-r--r--orkbasecxx/AudioCapturePlugin.h3
-rw-r--r--orkbasecxx/CapturePluginProxy.cpp26
-rw-r--r--orkbasecxx/CapturePluginProxy.h4
-rw-r--r--orkbasecxx/Config.cpp2
-rw-r--r--orkbasecxx/Config.h3
-rw-r--r--orkbasecxx/EventStreaming.cpp109
-rw-r--r--orkbasecxx/EventStreaming.h68
-rw-r--r--orkbasecxx/Makefile.am2
-rw-r--r--orkbasecxx/MultiThreadedServer.cpp136
-rw-r--r--orkbasecxx/MultiThreadedServer.h20
-rw-r--r--orkbasecxx/Reporting.cpp28
-rw-r--r--orkbasecxx/messages/RecordMsg.cpp43
-rw-r--r--orkbasecxx/messages/RecordMsg.h15
18 files changed, 584 insertions, 30 deletions
diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp
index 0083f8b..edda45a 100644
--- a/orkaudio/OrkAudio.cpp
+++ b/orkaudio/OrkAudio.cpp
@@ -44,7 +44,7 @@
#include "filters/audiogain/AudioGain.h"
#include "TapeProcessor.h"
#include <list>
-
+#include "EventStreaming.h"
static volatile bool serviceStop = false;
@@ -213,6 +213,8 @@ void MainThread()
ObjectFactory::GetSingleton()->RegisterObject(objRef);
objRef.reset(new RecordMsg);
ObjectFactory::GetSingleton()->RegisterObject(objRef);
+ objRef.reset(new PauseMsg);
+ ObjectFactory::GetSingleton()->RegisterObject(objRef);
//objRef.reset(new TestMsg);
//ObjectFactory::GetSingleton()->RegisterObject(objRef);
@@ -276,6 +278,12 @@ void MainThread()
LOG4CXX_INFO(LOG.rootLog, CStdString("Failed to create Http server"));
}
+ // Create streaming server on port 59150 (default)
+ if(!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(EventStreamingServer::run), (void *)CONFIG.m_eventStreamingServerPort))
+ {
+ LOG4CXX_INFO(LOG.rootLog, CStdString("Failed to create event streaming server"));
+ }
+
if(capturePluginOk)
{
CapturePluginProxy::Singleton()->Run();
diff --git a/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h b/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h
index 4697317..a6defae 100644
--- a/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h
+++ b/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h
@@ -30,8 +30,9 @@ DLL_EXPORT void __CDECL__ Run();
DLL_EXPORT void __CDECL__ Initialize();
DLL_EXPORT void __CDECL__ Shutdown();
DLL_EXPORT void __CDECL__ Configure(DOMNode*);
-DLL_EXPORT void __CDECL__ StartCapture(CStdString& party);
+DLL_EXPORT void __CDECL__ StartCapture(CStdString& party, CStdString& orkuid);
DLL_EXPORT void __CDECL__ StopCapture(CStdString& party);
+DLL_EXPORT void __CDECL__ PauseCapture(CStdString& party, CStdString& orkuid);
}
#endif
diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
index cf1ba08..a8b2d40 100644
--- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
+++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp
@@ -2383,6 +2383,36 @@ void RtpSessions::Hoover(time_t now)
}
}
+void RtpSessions::StartCaptureOrkuid(CStdString& orkuid)
+{
+ std::map<CStdString, RtpSessionRef>::iterator pair;
+ bool found = false;
+ CStdString logMsg;
+ RtpSessionRef session;
+
+ for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++)
+ {
+ session = pair->second;
+
+ if(session->OrkUidMatches(orkuid))
+ {
+ session->m_keep = true;
+ found = true;
+ }
+ }
+
+ if(found)
+ {
+ logMsg.Format("[%s] StartCaptureOrkuid: Started capture, orkuid:%s", session->m_trackingId, orkuid);
+ }
+ else
+ {
+ logMsg.Format("StartCaptureOrkuid: No session has orkuid:%s", orkuid);
+ }
+
+ LOG4CXX_INFO(m_log, logMsg);
+}
+
void RtpSessions::StartCapture(CStdString& party)
{
std::map<CStdString, RtpSessionRef>::iterator pair;
@@ -2403,16 +2433,76 @@ void RtpSessions::StartCapture(CStdString& party)
if(found)
{
- logMsg.Format("[%s] Started capture, party:%s", session->m_trackingId, party);
+ logMsg.Format("[%s] StartCapture: Started capture, party:%s", session->m_trackingId, party);
}
else
{
- logMsg.Format("No session has party %s", party);
+ logMsg.Format("StartCapture: No session has party %s", party);
}
LOG4CXX_INFO(m_log, logMsg);
}
+void RtpSessions::PauseCapture(CStdString& party)
+{
+ std::map<CStdString, RtpSessionRef>::iterator pair;
+ bool found = false;
+ CStdString logMsg;
+ RtpSessionRef session;
+
+ for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++)
+ {
+ session = pair->second;
+
+ if (session->PartyMatches(party))
+ {
+ session->m_keep = false;
+ found = true;
+ }
+ }
+
+ if(found)
+ {
+ logMsg.Format("[%s] PauseCapture: Paused capture, party:%s", session->m_trackingId, party);
+ }
+ else
+ {
+ logMsg.Format("PauseCapture: No session has party %s", party);
+ }
+
+ LOG4CXX_INFO(m_log, logMsg);
+}
+
+void RtpSessions::PauseCaptureOrkuid(CStdString& orkuid)
+{
+ std::map<CStdString, RtpSessionRef>::iterator pair;
+ bool found = false;
+ CStdString logMsg;
+ RtpSessionRef session;
+
+ for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++)
+ {
+ session = pair->second;
+
+ if(session->OrkUidMatches(orkuid))
+ {
+ session->m_keep = false;
+ found = true;
+ }
+ }
+
+ if(found)
+ {
+ logMsg.Format("[%s] PauseCaptureOrkuid: Paused capture, orkuid:%s", session->m_trackingId, orkuid);
+ }
+ else
+ {
+ logMsg.Format("PauseCaptureOrkuid: No session has orkuid:%s", orkuid);
+ }
+
+ LOG4CXX_INFO(m_log, logMsg);
+}
+
//==========================================================
SipInviteInfo::SipInviteInfo()
{
diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.h b/orkaudio/audiocaptureplugins/voip/RtpSession.h
index a24c882..6695985 100644
--- a/orkaudio/audiocaptureplugins/voip/RtpSession.h
+++ b/orkaudio/audiocaptureplugins/voip/RtpSession.h
@@ -257,6 +257,9 @@ public:
void Hoover(time_t now);
EndpointInfoRef GetEndpointInfo(struct in_addr endpointIp);
void StartCapture(CStdString& party);
+ void StartCaptureOrkuid(CStdString& orkuid);
+ void PauseCapture(CStdString& party);
+ void PauseCaptureOrkuid(CStdString& orkuid);
private:
RtpSessionRef findByEndpointIp(struct in_addr endpointIpAddr, int passThruPartyId = 0);
diff --git a/orkaudio/audiocaptureplugins/voip/VoIp.cpp b/orkaudio/audiocaptureplugins/voip/VoIp.cpp
index 25cda38..63fa355 100644
--- a/orkaudio/audiocaptureplugins/voip/VoIp.cpp
+++ b/orkaudio/audiocaptureplugins/voip/VoIp.cpp
@@ -83,8 +83,9 @@ public:
void Initialize();
void Run();
void Shutdown();
- void StartCapture(CStdString& port);
+ void StartCapture(CStdString& port, CStdString& orkuid);
void StopCapture(CStdString& port);
+ void PauseCapture(CStdString& port, CStdString& orkuid);
void ReportPcapStats();
pcap_t* OpenDevice(CStdString& name);
void AddPcapDeviceToMap(CStdString& deviceName, pcap_t* pcapHandle);
@@ -3199,7 +3200,12 @@ void VoIp::Shutdown()
#endif
}
-void VoIp::StartCapture(CStdString& port)
+void VoIp::StartCapture(CStdString& port, CStdString& orkuid)
+{
+ ;
+}
+
+void VoIp::PauseCapture(CStdString& port, CStdString& orkuid)
{
;
}
@@ -3225,15 +3231,42 @@ void __CDECL__ Shutdown()
VoIpSingleton::instance()->Shutdown();
}
-void __CDECL__ StartCapture(CStdString& party)
+void __CDECL__ StartCapture(CStdString& party, CStdString& orkuid)
{
CStdString logMsg;
- //logMsg.Format("StartCapture:%s", party);
- //LOG4CXX_INFO(s_voipPluginLog, logMsg);
+ logMsg.Format("StartCapture: party:%s orkuid:%s", party, orkuid);
+ LOG4CXX_INFO(s_voipPluginLog, logMsg);
MutexSentinel mutexSentinel(s_mutex);
- RtpSessionsSingleton::instance()->StartCapture(party);
+
+ if(orkuid.size())
+ {
+ RtpSessionsSingleton::instance()->StartCaptureOrkuid(orkuid);
+ }
+ else
+ {
+ RtpSessionsSingleton::instance()->StartCapture(party);
+ }
+}
+
+void __CDECL__ PauseCapture(CStdString& party, CStdString& orkuid)
+{
+ CStdString logMsg;
+
+ logMsg.Format("PauseCapture: party:%s orkuid:%s", party, orkuid);
+ LOG4CXX_INFO(s_voipPluginLog, logMsg);
+
+ MutexSentinel mutexSentinel(s_mutex);
+
+ if(orkuid.size())
+ {
+ RtpSessionsSingleton::instance()->PauseCaptureOrkuid(orkuid);
+ }
+ else
+ {
+ RtpSessionsSingleton::instance()->PauseCapture(party);
+ }
}
void __CDECL__ StopCapture(CStdString& party)
diff --git a/orkbasecxx/AudioCapturePlugin.h b/orkbasecxx/AudioCapturePlugin.h
index 3875dd2..c7fc38c 100644
--- a/orkbasecxx/AudioCapturePlugin.h
+++ b/orkbasecxx/AudioCapturePlugin.h
@@ -41,8 +41,9 @@ typedef void (__CDECL__* InitializeFunction)();
typedef void (__CDECL__* RunFunction)();
typedef void (__CDECL__* ShutdownFunction)();
typedef void (__CDECL__* ConfigureFunction)(DOMNode*);
-typedef void (__CDECL__* StartCaptureFunction)(CStdString& port);
+typedef void (__CDECL__* StartCaptureFunction)(CStdString& port, CStdString& orkUid);
typedef void (__CDECL__* StopCaptureFunction)(CStdString& port);
+typedef void (__CDECL__* PauseCaptureFunction)(CStdString& port, CStdString& orkUid);
diff --git a/orkbasecxx/CapturePluginProxy.cpp b/orkbasecxx/CapturePluginProxy.cpp
index 925c14a..f5db16c 100644
--- a/orkbasecxx/CapturePluginProxy.cpp
+++ b/orkbasecxx/CapturePluginProxy.cpp
@@ -122,7 +122,15 @@ bool CapturePluginProxy::Init()
m_stopCaptureFunction = (StopCaptureFunction)m_dll.symbol("StopCapture");
if (m_stopCaptureFunction)
{
- m_loaded = true;
+ m_pauseCaptureFunction = (PauseCaptureFunction)m_dll.symbol("PauseCapture");
+ if(m_stopCaptureFunction)
+ {
+ m_loaded = true;
+ }
+ else
+ {
+ LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find PauseCapture function in ") + pluginPath);
+ }
}
else
{
@@ -180,11 +188,11 @@ void CapturePluginProxy::Shutdown()
}
}
-void CapturePluginProxy::StartCapture(CStdString& party)
+void CapturePluginProxy::StartCapture(CStdString& party, CStdString& orkuid)
{
if(m_loaded)
{
- m_startCaptureFunction(party);
+ m_startCaptureFunction(party, orkuid);
}
else
{
@@ -204,6 +212,18 @@ void CapturePluginProxy::StopCapture(CStdString& party)
}
}
+void CapturePluginProxy::PauseCapture(CStdString& party, CStdString& orkuid)
+{
+ if(m_loaded)
+ {
+ m_pauseCaptureFunction(party, orkuid);
+ }
+ else
+ {
+ throw(CStdString("PauseCapture: Capture plugin not yet loaded"));
+ }
+}
+
void __CDECL__ CapturePluginProxy::AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort)
{
// find the right port and give it the audio chunk
diff --git a/orkbasecxx/CapturePluginProxy.h b/orkbasecxx/CapturePluginProxy.h
index 631976d..a3d4257 100644
--- a/orkbasecxx/CapturePluginProxy.h
+++ b/orkbasecxx/CapturePluginProxy.h
@@ -28,7 +28,8 @@ public:
void Run();
void Shutdown();
- void StartCapture(CStdString& party);
+ void StartCapture(CStdString& party, CStdString& orkuid);
+ void PauseCapture(CStdString& party, CStdString& orkuid);
void StopCapture(CStdString& party);
static void __CDECL__ AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort);
@@ -44,6 +45,7 @@ private:
RunFunction m_runFunction;
StartCaptureFunction m_startCaptureFunction;
StopCaptureFunction m_stopCaptureFunction;
+ PauseCaptureFunction m_pauseCaptureFunction;
ACE_DLL m_dll;
bool m_loaded;
diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp
index f3f5cb8..db8622f 100644
--- a/orkbasecxx/Config.cpp
+++ b/orkbasecxx/Config.cpp
@@ -76,6 +76,7 @@ Config::Config()
m_audioGain = AUDIO_GAIN_DEFAULT;
m_audioGainChannel1 = AUDIO_GAIN_CHANNEL_1_DEFAULT;
m_audioGainChannel2 = AUDIO_GAIN_CHANNEL_2_DEFAULT;
+ m_eventStreamingServerPort = STREAMING_SERVER_PORT_DEFAULT;
}
void Config::Define(Serializer* s)
@@ -156,6 +157,7 @@ void Config::Define(Serializer* s)
s->DoubleValue(AUDIO_GAIN_PARAM, m_audioGain);
s->DoubleValue(AUDIO_GAIN_CHANNEL_1_PARAM, m_audioGainChannel1);
s->DoubleValue(AUDIO_GAIN_CHANNEL_2_PARAM, m_audioGainChannel2);
+ s->IntValue(STREAMING_SERVER_PORT_PARAM, m_eventStreamingServerPort);
}
void Config::Validate()
diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h
index 8e7634c..0d8d565 100644
--- a/orkbasecxx/Config.h
+++ b/orkbasecxx/Config.h
@@ -96,6 +96,8 @@
#define COMMAND_LINE_SERVER_PORT_DEFAULT 59130
#define HTTP_SERVER_PORT_PARAM "HttpServerPort"
#define HTTP_SERVER_PORT_DEFAULT 59140
+#define STREAMING_SERVER_PORT_PARAM "EventStreamingServerPort"
+#define STREAMING_SERVER_PORT_DEFAULT 59150
#define LOOKBACK_RECORDING_PARAM "LookBackRecording"
#define LOOKBACK_RECORDING_DEFAULT true
#define ALLOW_AUTOMATIC_RECORDING_PARAM "AllowAutomaticRecording"
@@ -175,6 +177,7 @@ public:
CStdString m_remoteProcessingServiceName;
int m_commandLineServerPort;
int m_httpServerPort;
+ int m_eventStreamingServerPort;
bool m_lookBackRecording;
bool m_allowAutomaticRecording;
int m_captureFileSizeLimitKb;
diff --git a/orkbasecxx/EventStreaming.cpp b/orkbasecxx/EventStreaming.cpp
new file mode 100644
index 0000000..4074e14
--- /dev/null
+++ b/orkbasecxx/EventStreaming.cpp
@@ -0,0 +1,109 @@
+/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#pragma warning( disable: 4786 ) // disables truncated symbols in browse-info warning
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include "EventStreaming.h"
+#include "ConfigManager.h"
+
+//==========================================================
+
+EventStreamingSession::EventStreamingSession()
+{
+}
+
+void EventStreamingSession::AddTapeMessage(MessageRef& message)
+{
+ if(m_messages.size() > 10000)
+ {
+ m_messages.pop_front();
+ }
+ m_messages.push_back(message);
+ m_semaphore.release();
+}
+
+void EventStreamingSession::GetTapeMessage(MessageRef& message)
+{
+ MutexSentinel mutexSentinel(m_mutex);
+
+ if(m_messages.size() > 0)
+ {
+ message = m_messages.front();
+ m_messages.pop_front();
+ }
+}
+
+int EventStreamingSession::GetNumMessages()
+{
+ MutexSentinel mutexSentinel(m_mutex);
+
+ return m_messages.size();
+}
+
+void EventStreamingSession::WaitForMessages()
+{
+ m_semaphore.acquire();
+}
+
+//==============================================
+
+EventStreaming::EventStreaming()
+{
+}
+
+void EventStreaming::GetLiveSessions(std::list<EventStreamingSessionRef>& sessions)
+{
+ MutexSentinel sentinel(m_mutex);
+
+ for(std::list<EventStreamingSessionRef>::iterator it = m_sessions.begin(); it != m_sessions.end(); it++)
+ {
+ EventStreamingSessionRef session = *it;
+
+ sessions.push_back(session);
+ }
+}
+
+void EventStreaming::AddSession(EventStreamingSessionRef& session)
+{
+ MutexSentinel sentinel(m_mutex);
+ m_sessions.push_back(session);
+}
+
+void EventStreaming::RemoveSession(EventStreamingSessionRef& session)
+{
+ MutexSentinel sentinel(m_mutex);
+ m_sessions.remove(session);
+}
+
+int EventStreaming::GetNumSessions()
+{
+ MutexSentinel sentinel(m_mutex);
+ return m_sessions.size();
+}
+
+CStdString EventStreaming::GetNewSessionId()
+{
+ return m_alphaCounter.GetNext();
+}
+
+void EventStreaming::AddTapeMessage(MessageRef& message)
+{
+ MutexSentinel sentinel(m_mutex);
+
+ for(std::list<EventStreamingSessionRef>::iterator it = m_sessions.begin(); it != m_sessions.end(); it++)
+ {
+ (*it)->AddTapeMessage(message);
+ }
+}
+
diff --git a/orkbasecxx/EventStreaming.h b/orkbasecxx/EventStreaming.h
new file mode 100644
index 0000000..da0daa3
--- /dev/null
+++ b/orkbasecxx/EventStreaming.h
@@ -0,0 +1,68 @@
+/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#include "LogManager.h"
+#include "Filter.h"
+#include <math.h>
+#include "Utils.h"
+#include <queue>
+#include <list>
+#include "boost/shared_ptr.hpp"
+#include "ace/Singleton.h"
+#include "ace/Thread_Mutex.h"
+#include "ace/Thread_Semaphore.h"
+#include "AudioCapture.h"
+#include "ConfigManager.h"
+#include "CapturePluginProxy.h"
+#include "AudioTape.h"
+
+//==========================================================
+
+class EventStreamingSession
+{
+public:
+ EventStreamingSession();
+
+ void AddTapeMessage(MessageRef& message);
+ void GetTapeMessage(MessageRef& message);
+ int GetNumMessages();
+ void WaitForMessages();
+private:
+ std::list<MessageRef> m_messages;
+ ACE_Thread_Mutex m_mutex;
+ ACE_Thread_Semaphore m_semaphore;
+};
+typedef boost::shared_ptr<EventStreamingSession> EventStreamingSessionRef;
+
+//==========================================================
+
+class EventStreaming
+{
+public:
+ EventStreaming();
+
+ void GetLiveSessions(std::list<EventStreamingSessionRef>& sessions);
+ void AddSession(EventStreamingSessionRef& session);
+ void RemoveSession(EventStreamingSessionRef& session);
+ int GetNumSessions();
+ CStdString GetNewSessionId();
+ void AddTapeMessage(MessageRef& message);
+
+private:
+ AlphaCounter m_alphaCounter;
+ ACE_Thread_Mutex m_mutex;
+ std::list<EventStreamingSessionRef> m_sessions;
+};
+typedef ACE_Singleton<EventStreaming, ACE_Thread_Mutex> EventStreamingSingleton;
+
+//==========================================================
+
diff --git a/orkbasecxx/Makefile.am b/orkbasecxx/Makefile.am
index 3f30d49..2810683 100644
--- a/orkbasecxx/Makefile.am
+++ b/orkbasecxx/Makefile.am
@@ -15,7 +15,7 @@ liborkbase_la_SOURCES = Filter.cpp g711.c \
CapturePluginProxy.cpp CapturePort.cpp \
Daemon.cpp ImmediateProcessing.cpp \
Reporting.cpp TapeFileNaming.cpp \
- PartyFilter.cpp
+ PartyFilter.cpp EventStreaming.cpp
#INCLUDES = -I/projects/ext/xmlrpc++/xmlrpc++0.7/src
SUBDIRS = messages serializers audiofile filters
liborkbase_la_LIBADD = $(top_builddir)/serializers/libserializers.la \
diff --git a/orkbasecxx/MultiThreadedServer.cpp b/orkbasecxx/MultiThreadedServer.cpp
index cec42db..60154b6 100644
--- a/orkbasecxx/MultiThreadedServer.cpp
+++ b/orkbasecxx/MultiThreadedServer.cpp
@@ -1,6 +1,6 @@
/*
* Oreka -- A media capture and retrieval platform
- *
+ *
* Copyright (C) 2005, orecx LLC
*
* http://www.orecx.com
@@ -26,6 +26,8 @@
#include "MultiThreadedServer.h"
+#include "EventStreaming.h"
+
log4cxx::LoggerPtr CommandLineServer::s_log;
// This is run at the start of each connection
@@ -64,7 +66,7 @@ void CommandLineServer::run(void* args)
int CommandLineServer::svc(void)
{
- for (bool active = true;active == true;)
+ for (bool active = true;active == true;)
{
char buf[2048];
ACE_Time_Value timeout;
@@ -181,16 +183,16 @@ int HttpServer::svc(void)
{
throw (CStdString("Malformed http request")); ;
}
- *stopUrl = '\0'; // Remove post-URL trailing stuff
+ *stopUrl = '\0'; // Remove post-URL trailing stuff
CStdString url(buf+startUrlOffset);
int queryOffset = url.Find("?");
- if (queryOffset > 0)
+ if (queryOffset > 0)
{
// Strip beginning of URL in case the command is received as an URL "query" of the form:
// http://hostname/service/command?type=ping
url = url.Right(url.size() - queryOffset - 1);
}
-
+
CStdString className = UrlSerializer::FindClass(url);
ObjectRef objRef = ObjectFactory::GetSingleton()->NewInstance(className);
@@ -208,9 +210,9 @@ int HttpServer::svc(void)
DOMImplementation* impl = DOMImplementationRegistry::getDOMImplementation(XStr("Core").unicodeForm());
XERCES_CPP_NAMESPACE::DOMDocument* myDoc;
myDoc = impl->createDocument(
- 0, // root element namespace URI.
- XStr("response").unicodeForm(), // root element name
- 0); // document type object (DTD).
+ 0, // root element namespace URI.
+ XStr("response").unicodeForm(), // root element name
+ 0); // document type object (DTD).
response->SerializeDom(myDoc);
CStdString pingResponse = DomSerializer::DomNodeToString(myDoc);
@@ -245,3 +247,121 @@ int HttpServer::svc(void)
return 0;
}
+//==============================================
+
+log4cxx::LoggerPtr EventStreamingServer::s_log;
+
+int EventStreamingServer::open(void *void_acceptor)
+{
+ return this->activate (THR_DETACHED);
+}
+
+void EventStreamingServer::run(void* args)
+{
+ unsigned short tcpPort = (unsigned short)(ACE_UINT64)args;
+ CStdString tcpPortString = IntToString(tcpPort);
+ EventStreamingAcceptor peer_acceptor;
+ ACE_INET_Addr addr (tcpPort);
+ ACE_Reactor reactor;
+
+ s_log = log4cxx::Logger::getLogger("interface.eventstreamingserver");
+
+ if (peer_acceptor.open (addr, &reactor) == -1)
+ {
+ LOG4CXX_ERROR(s_log, CStdString("Failed to start event streaming server on port:") + tcpPortString);
+ }
+ else
+ {
+ LOG4CXX_INFO(s_log, CStdString("Started event streaming server on port:")+tcpPortString);
+ for(;;)
+ {
+ reactor.handle_events();
+ }
+ }
+}
+
+int EventStreamingServer::svc(void)
+{
+ ACE_Time_Value timeout;
+ char buf[2048];
+ CStdString logMsg;
+ CStdString sessionId;
+ int messagesSent = 0;
+
+ ssize_t size = peer().recv(buf, 2040);
+
+ if(size <= 5)
+ {
+ CStdString notFound("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nNot found\r\n");
+ peer().send(notFound, notFound.GetLength());
+ return 0;
+ }
+
+ try
+ {
+ int startUrlOffset = 5;
+ char* stopUrl = ACE_OS::strstr(buf+startUrlOffset, " HTTP");
+
+ if(!stopUrl)
+ {
+ throw (CStdString("Malformed http request"));
+ }
+
+ CStdString header;
+ struct tm date = {0};
+ time_t now = time(NULL);
+ CStdString rfc822Date;
+
+ ACE_OS::gmtime_r(&now, &date);
+ rfc822Date.Format("Tue, %.2d Nov %.4d %.2d:%.2d:%.2d GMT", date.tm_mday, (date.tm_year+1900), date.tm_hour, date.tm_min, date.tm_sec);
+ header.Format("HTTP/1.1 200 OK\r\nLast-Modified:%s\r\nContent-Type:text/plain\r\n\r\n", rfc822Date);
+ peer().send(header, header.GetLength());
+
+ time_t startTime = time(NULL);
+ time_t lastSentTime = time(NULL);
+
+ sessionId = EventStreamingSingleton::instance()->GetNewSessionId() + " -";
+ logMsg.Format("%s Event streaming start", sessionId);
+ LOG4CXX_INFO(s_log, logMsg);
+
+ EventStreamingSessionRef session(new EventStreamingSession());
+ EventStreamingSingleton::instance()->AddSession(session);
+
+ int sendRes = 0;
+ while(sendRes >= 0)
+ {
+ session->WaitForMessages();
+
+ while(session->GetNumMessages() && sendRes >= 0)
+ {
+ MessageRef message;
+
+ session->GetTapeMessage(message);
+ if(message.get())
+ {
+ CStdString msgAsSingleLineString;
+
+ msgAsSingleLineString.Format("%s\r\n", message->SerializeSingleLine());
+ sendRes = peer().send(msgAsSingleLineString, msgAsSingleLineString.GetLength());
+ if(sendRes >= 0)
+ {
+ messagesSent += 1;
+ }
+ }
+ }
+ }
+
+ EventStreamingSingleton::instance()->RemoveSession(session);
+ logMsg.Format("%s Stream client stop - sent %d messages in %d sec", sessionId, messagesSent, (time(NULL) - startTime));
+ LOG4CXX_INFO(s_log, logMsg);
+ }
+ catch (CStdString& e)
+ {
+ CStdString error("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nError\r\n");
+ error = error + e + "\r\n";
+ LOG4CXX_ERROR(s_log, e);
+ peer().send(error, error.GetLength());
+ }
+
+ return 0;
+}
diff --git a/orkbasecxx/MultiThreadedServer.h b/orkbasecxx/MultiThreadedServer.h
index f6ea6dc..58c0622 100644
--- a/orkbasecxx/MultiThreadedServer.h
+++ b/orkbasecxx/MultiThreadedServer.h
@@ -61,5 +61,25 @@ private:
};
typedef ACE_Acceptor<HttpServer, ACE_SOCK_ACCEPTOR> HttpAcceptor;
+//==========================================================
+
+/** This server is a lightweight http server that prints out the single line
+ format of all events from a given port, one thread per connection e.g
+ http://localhost:23000/message=streamevents
+*/
+class EventStreamingServer : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
+{
+public:
+ virtual int open (void *);
+ /** daemon thread */
+ static void run(void *args);
+ /** service routine */
+ virtual int svc (void);
+
+private:
+ static log4cxx::LoggerPtr s_log;
+};
+typedef ACE_Acceptor<EventStreamingServer, ACE_SOCK_ACCEPTOR> EventStreamingAcceptor;
+
#endif
diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp
index 3af76d7..d0ffa40 100644
--- a/orkbasecxx/Reporting.cpp
+++ b/orkbasecxx/Reporting.cpp
@@ -23,6 +23,7 @@
#include "Daemon.h"
#include "CapturePluginProxy.h"
#include "ace/Thread_Manager.h"
+#include "EventStreaming.h"
#ifdef WIN32
# ifndef snprintf
@@ -186,6 +187,28 @@ void Reporting::AddTapeMessage(MessageRef& messageRef)
}
}
}
+
+ // Send this message to the event streaming system
+ reportingMsgRef.reset(new TapeMsg);
+ pRptTapeMsg = (TapeMsg*)reportingMsgRef.get();
+
+ pRptTapeMsg->m_recId = pTapeMsg->m_recId;
+ pRptTapeMsg->m_fileName = pTapeMsg->m_fileName;
+ pRptTapeMsg->m_stage = pTapeMsg->m_stage;
+ pRptTapeMsg->m_capturePort = pTapeMsg->m_capturePort;
+ pRptTapeMsg->m_localParty = pTapeMsg->m_localParty;
+ pRptTapeMsg->m_localEntryPoint = pTapeMsg->m_localEntryPoint;
+ pRptTapeMsg->m_remoteParty = pTapeMsg->m_remoteParty;
+ pRptTapeMsg->m_direction = pTapeMsg->m_direction;
+ pRptTapeMsg->m_duration = pTapeMsg->m_duration;
+ pRptTapeMsg->m_timestamp = pTapeMsg->m_timestamp;
+ pRptTapeMsg->m_localIp = pTapeMsg->m_localIp;
+ pRptTapeMsg->m_remoteIp = pTapeMsg->m_remoteIp;
+ pRptTapeMsg->m_nativeCallId = pTapeMsg->m_nativeCallId;
+ // Copy the tags!
+ std::copy(pTapeMsg->m_tags.begin(), pTapeMsg->m_tags.end(), std::inserter(pRptTapeMsg->m_tags, pRptTapeMsg->m_tags.begin()));
+
+ EventStreamingSingleton::instance()->AddTapeMessage(reportingMsgRef);
}
void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef)
@@ -348,8 +371,9 @@ void ReportingThread::Run()
// Tape is wanted
if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start"))
{
- CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty);
- CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty);
+ CStdString orkuid = "";
+ CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty, orkuid);
+ CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty, orkuid);
}
}
//else
diff --git a/orkbasecxx/messages/RecordMsg.cpp b/orkbasecxx/messages/RecordMsg.cpp
index 46f3778..b72300f 100644
--- a/orkbasecxx/messages/RecordMsg.cpp
+++ b/orkbasecxx/messages/RecordMsg.cpp
@@ -16,15 +16,50 @@
#include "CapturePluginProxy.h"
#define RECORD_CLASS "record"
+#define PAUSE_CLASS "pause"
+
+void PauseMsg::Define(Serializer* s)
+{
+ CStdString pauseClass(PAUSE_CLASS);
+ s->StringValue(OBJECT_TYPE_TAG, pauseClass, true);
+ s->StringValue(PARTY_PARAM, m_party, false);
+ s->StringValue(ORKUID_PARAM, m_orkuid, false);
+}
+
+CStdString PauseMsg::GetClassName()
+{
+ return CStdString(PAUSE_CLASS);
+}
+
+ObjectRef PauseMsg::NewInstance()
+{
+ return ObjectRef(new PauseMsg);
+}
+
+ObjectRef PauseMsg::Process()
+{
+ SimpleResponseMsg* msg = new SimpleResponseMsg;
+ ObjectRef ref(msg);
+ CStdString logMsg;
+
+ logMsg.Format("Pausing capture for party:%s orkuid:%s", m_party, m_orkuid);
+ CapturePluginProxy::Singleton()->PauseCapture(m_party, m_orkuid);
+ msg->m_success = true;
+ msg->m_comment = logMsg;
+
+ return ref;
+}
+
+//===================================================
void RecordMsg::Define(Serializer* s)
{
CStdString recordClass(RECORD_CLASS);
s->StringValue(OBJECT_TYPE_TAG, recordClass, true);
- s->StringValue(PARTY_PARAM, m_party, true);
+ s->StringValue(PARTY_PARAM, m_party, false);
+ s->StringValue(ORKUID_PARAM, m_orkuid, false);
}
-
CStdString RecordMsg::GetClassName()
{
return CStdString(RECORD_CLASS);
@@ -41,8 +76,8 @@ ObjectRef RecordMsg::Process()
ObjectRef ref(msg);
CStdString logMsg;
- logMsg.Format("Starting capture for %s", m_party);
- CapturePluginProxy::Singleton()->StartCapture(m_party);
+ logMsg.Format("Starting capture for party:%s orkuid:%s", m_party, m_orkuid);
+ CapturePluginProxy::Singleton()->StartCapture(m_party, m_orkuid);
msg->m_success = true;
msg->m_comment = logMsg;
diff --git a/orkbasecxx/messages/RecordMsg.h b/orkbasecxx/messages/RecordMsg.h
index 35803be..c374476 100644
--- a/orkbasecxx/messages/RecordMsg.h
+++ b/orkbasecxx/messages/RecordMsg.h
@@ -28,6 +28,21 @@ public:
ObjectRef Process();
CStdString m_party;
+ CStdString m_orkuid;
+};
+
+class DLL_IMPORT_EXPORT_ORKBASE PauseMsg : public SyncMessage
+{
+public:
+ void Define(Serializer* s);
+ inline void Validate() {};
+
+ CStdString GetClassName();
+ ObjectRef NewInstance();
+ ObjectRef Process();
+
+ CStdString m_party;
+ CStdString m_orkuid;
};
#endif