diff options
author | Gerald Begumisa <ben_g@users.sourceforge.net> | 2009-01-07 13:07:31 +0000 |
---|---|---|
committer | Gerald Begumisa <ben_g@users.sourceforge.net> | 2009-01-07 13:07:31 +0000 |
commit | 0e2a2e49077b79bd52d6c83c5202e452e7eea091 (patch) | |
tree | 58ae466e2c74be585548f10ffff96560099c9370 | |
parent | ded03e95a1eb78cb16ef3a4a0dc9bb140bd748ba (diff) |
Modified the orkaudio API, adding the record and pause HTTP commands. The record command commences or un-pauses recording while the pause command pauses recording - discarding RTP packets from when the pause command is issued. Both commands require the orkuid and party to be specified as HTTP parameters. Note that this represents a change in the arguments required for the StartCapture function in DLLs. Also added an event streaming feature which streams out all tape messages as they are reported in real-time to a client connected. Clients should connect via HTTP, on port 59150. The port is configurable by setting the parameter EventStreamingServerPort in config.xml
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@590 09dcff7a-b715-0410-9601-b79a96267cd0
-rw-r--r-- | orkaudio/OrkAudio.cpp | 10 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h | 3 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/voip/RtpSession.cpp | 94 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/voip/RtpSession.h | 3 | ||||
-rw-r--r-- | orkaudio/audiocaptureplugins/voip/VoIp.cpp | 45 | ||||
-rw-r--r-- | orkbasecxx/AudioCapturePlugin.h | 3 | ||||
-rw-r--r-- | orkbasecxx/CapturePluginProxy.cpp | 26 | ||||
-rw-r--r-- | orkbasecxx/CapturePluginProxy.h | 4 | ||||
-rw-r--r-- | orkbasecxx/Config.cpp | 2 | ||||
-rw-r--r-- | orkbasecxx/Config.h | 3 | ||||
-rw-r--r-- | orkbasecxx/EventStreaming.cpp | 109 | ||||
-rw-r--r-- | orkbasecxx/EventStreaming.h | 68 | ||||
-rw-r--r-- | orkbasecxx/Makefile.am | 2 | ||||
-rw-r--r-- | orkbasecxx/MultiThreadedServer.cpp | 136 | ||||
-rw-r--r-- | orkbasecxx/MultiThreadedServer.h | 20 | ||||
-rw-r--r-- | orkbasecxx/Reporting.cpp | 28 | ||||
-rw-r--r-- | orkbasecxx/messages/RecordMsg.cpp | 43 | ||||
-rw-r--r-- | orkbasecxx/messages/RecordMsg.h | 15 |
18 files changed, 584 insertions, 30 deletions
diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp index 0083f8b..edda45a 100644 --- a/orkaudio/OrkAudio.cpp +++ b/orkaudio/OrkAudio.cpp @@ -44,7 +44,7 @@ #include "filters/audiogain/AudioGain.h" #include "TapeProcessor.h" #include <list> - +#include "EventStreaming.h" static volatile bool serviceStop = false; @@ -213,6 +213,8 @@ void MainThread() ObjectFactory::GetSingleton()->RegisterObject(objRef); objRef.reset(new RecordMsg); ObjectFactory::GetSingleton()->RegisterObject(objRef); + objRef.reset(new PauseMsg); + ObjectFactory::GetSingleton()->RegisterObject(objRef); //objRef.reset(new TestMsg); //ObjectFactory::GetSingleton()->RegisterObject(objRef); @@ -276,6 +278,12 @@ void MainThread() LOG4CXX_INFO(LOG.rootLog, CStdString("Failed to create Http server")); } + // Create streaming server on port 59150 (default) + if(!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(EventStreamingServer::run), (void *)CONFIG.m_eventStreamingServerPort)) + { + LOG4CXX_INFO(LOG.rootLog, CStdString("Failed to create event streaming server")); + } + if(capturePluginOk) { CapturePluginProxy::Singleton()->Run(); diff --git a/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h b/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h index 4697317..a6defae 100644 --- a/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h +++ b/orkaudio/audiocaptureplugins/common/AudioCapturePluginCommon.h @@ -30,8 +30,9 @@ DLL_EXPORT void __CDECL__ Run(); DLL_EXPORT void __CDECL__ Initialize(); DLL_EXPORT void __CDECL__ Shutdown(); DLL_EXPORT void __CDECL__ Configure(DOMNode*); -DLL_EXPORT void __CDECL__ StartCapture(CStdString& party); +DLL_EXPORT void __CDECL__ StartCapture(CStdString& party, CStdString& orkuid); DLL_EXPORT void __CDECL__ StopCapture(CStdString& party); +DLL_EXPORT void __CDECL__ PauseCapture(CStdString& party, CStdString& orkuid); } #endif diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp index cf1ba08..a8b2d40 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.cpp +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.cpp @@ -2383,6 +2383,36 @@ void RtpSessions::Hoover(time_t now) } } +void RtpSessions::StartCaptureOrkuid(CStdString& orkuid) +{ + std::map<CStdString, RtpSessionRef>::iterator pair; + bool found = false; + CStdString logMsg; + RtpSessionRef session; + + for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++) + { + session = pair->second; + + if(session->OrkUidMatches(orkuid)) + { + session->m_keep = true; + found = true; + } + } + + if(found) + { + logMsg.Format("[%s] StartCaptureOrkuid: Started capture, orkuid:%s", session->m_trackingId, orkuid); + } + else + { + logMsg.Format("StartCaptureOrkuid: No session has orkuid:%s", orkuid); + } + + LOG4CXX_INFO(m_log, logMsg); +} + void RtpSessions::StartCapture(CStdString& party) { std::map<CStdString, RtpSessionRef>::iterator pair; @@ -2403,16 +2433,76 @@ void RtpSessions::StartCapture(CStdString& party) if(found) { - logMsg.Format("[%s] Started capture, party:%s", session->m_trackingId, party); + logMsg.Format("[%s] StartCapture: Started capture, party:%s", session->m_trackingId, party); } else { - logMsg.Format("No session has party %s", party); + logMsg.Format("StartCapture: No session has party %s", party); } LOG4CXX_INFO(m_log, logMsg); } +void RtpSessions::PauseCapture(CStdString& party) +{ + std::map<CStdString, RtpSessionRef>::iterator pair; + bool found = false; + CStdString logMsg; + RtpSessionRef session; + + for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++) + { + session = pair->second; + + if (session->PartyMatches(party)) + { + session->m_keep = false; + found = true; + } + } + + if(found) + { + logMsg.Format("[%s] PauseCapture: Paused capture, party:%s", session->m_trackingId, party); + } + else + { + logMsg.Format("PauseCapture: No session has party %s", party); + } + + LOG4CXX_INFO(m_log, logMsg); +} + +void RtpSessions::PauseCaptureOrkuid(CStdString& orkuid) +{ + std::map<CStdString, RtpSessionRef>::iterator pair; + bool found = false; + CStdString logMsg; + RtpSessionRef session; + + for(pair = m_byIpAndPort.begin(); pair != m_byIpAndPort.end() && found == false; pair++) + { + session = pair->second; + + if(session->OrkUidMatches(orkuid)) + { + session->m_keep = false; + found = true; + } + } + + if(found) + { + logMsg.Format("[%s] PauseCaptureOrkuid: Paused capture, orkuid:%s", session->m_trackingId, orkuid); + } + else + { + logMsg.Format("PauseCaptureOrkuid: No session has orkuid:%s", orkuid); + } + + LOG4CXX_INFO(m_log, logMsg); +} + //========================================================== SipInviteInfo::SipInviteInfo() { diff --git a/orkaudio/audiocaptureplugins/voip/RtpSession.h b/orkaudio/audiocaptureplugins/voip/RtpSession.h index a24c882..6695985 100644 --- a/orkaudio/audiocaptureplugins/voip/RtpSession.h +++ b/orkaudio/audiocaptureplugins/voip/RtpSession.h @@ -257,6 +257,9 @@ public: void Hoover(time_t now); EndpointInfoRef GetEndpointInfo(struct in_addr endpointIp); void StartCapture(CStdString& party); + void StartCaptureOrkuid(CStdString& orkuid); + void PauseCapture(CStdString& party); + void PauseCaptureOrkuid(CStdString& orkuid); private: RtpSessionRef findByEndpointIp(struct in_addr endpointIpAddr, int passThruPartyId = 0); diff --git a/orkaudio/audiocaptureplugins/voip/VoIp.cpp b/orkaudio/audiocaptureplugins/voip/VoIp.cpp index 25cda38..63fa355 100644 --- a/orkaudio/audiocaptureplugins/voip/VoIp.cpp +++ b/orkaudio/audiocaptureplugins/voip/VoIp.cpp @@ -83,8 +83,9 @@ public: void Initialize(); void Run(); void Shutdown(); - void StartCapture(CStdString& port); + void StartCapture(CStdString& port, CStdString& orkuid); void StopCapture(CStdString& port); + void PauseCapture(CStdString& port, CStdString& orkuid); void ReportPcapStats(); pcap_t* OpenDevice(CStdString& name); void AddPcapDeviceToMap(CStdString& deviceName, pcap_t* pcapHandle); @@ -3199,7 +3200,12 @@ void VoIp::Shutdown() #endif } -void VoIp::StartCapture(CStdString& port) +void VoIp::StartCapture(CStdString& port, CStdString& orkuid) +{ + ; +} + +void VoIp::PauseCapture(CStdString& port, CStdString& orkuid) { ; } @@ -3225,15 +3231,42 @@ void __CDECL__ Shutdown() VoIpSingleton::instance()->Shutdown(); } -void __CDECL__ StartCapture(CStdString& party) +void __CDECL__ StartCapture(CStdString& party, CStdString& orkuid) { CStdString logMsg; - //logMsg.Format("StartCapture:%s", party); - //LOG4CXX_INFO(s_voipPluginLog, logMsg); + logMsg.Format("StartCapture: party:%s orkuid:%s", party, orkuid); + LOG4CXX_INFO(s_voipPluginLog, logMsg); MutexSentinel mutexSentinel(s_mutex); - RtpSessionsSingleton::instance()->StartCapture(party); + + if(orkuid.size()) + { + RtpSessionsSingleton::instance()->StartCaptureOrkuid(orkuid); + } + else + { + RtpSessionsSingleton::instance()->StartCapture(party); + } +} + +void __CDECL__ PauseCapture(CStdString& party, CStdString& orkuid) +{ + CStdString logMsg; + + logMsg.Format("PauseCapture: party:%s orkuid:%s", party, orkuid); + LOG4CXX_INFO(s_voipPluginLog, logMsg); + + MutexSentinel mutexSentinel(s_mutex); + + if(orkuid.size()) + { + RtpSessionsSingleton::instance()->PauseCaptureOrkuid(orkuid); + } + else + { + RtpSessionsSingleton::instance()->PauseCapture(party); + } } void __CDECL__ StopCapture(CStdString& party) diff --git a/orkbasecxx/AudioCapturePlugin.h b/orkbasecxx/AudioCapturePlugin.h index 3875dd2..c7fc38c 100644 --- a/orkbasecxx/AudioCapturePlugin.h +++ b/orkbasecxx/AudioCapturePlugin.h @@ -41,8 +41,9 @@ typedef void (__CDECL__* InitializeFunction)(); typedef void (__CDECL__* RunFunction)(); typedef void (__CDECL__* ShutdownFunction)(); typedef void (__CDECL__* ConfigureFunction)(DOMNode*); -typedef void (__CDECL__* StartCaptureFunction)(CStdString& port); +typedef void (__CDECL__* StartCaptureFunction)(CStdString& port, CStdString& orkUid); typedef void (__CDECL__* StopCaptureFunction)(CStdString& port); +typedef void (__CDECL__* PauseCaptureFunction)(CStdString& port, CStdString& orkUid); diff --git a/orkbasecxx/CapturePluginProxy.cpp b/orkbasecxx/CapturePluginProxy.cpp index 925c14a..f5db16c 100644 --- a/orkbasecxx/CapturePluginProxy.cpp +++ b/orkbasecxx/CapturePluginProxy.cpp @@ -122,7 +122,15 @@ bool CapturePluginProxy::Init() m_stopCaptureFunction = (StopCaptureFunction)m_dll.symbol("StopCapture"); if (m_stopCaptureFunction) { - m_loaded = true; + m_pauseCaptureFunction = (PauseCaptureFunction)m_dll.symbol("PauseCapture"); + if(m_stopCaptureFunction) + { + m_loaded = true; + } + else + { + LOG4CXX_ERROR(LOG.rootLog, CStdString("Could not find PauseCapture function in ") + pluginPath); + } } else { @@ -180,11 +188,11 @@ void CapturePluginProxy::Shutdown() } } -void CapturePluginProxy::StartCapture(CStdString& party) +void CapturePluginProxy::StartCapture(CStdString& party, CStdString& orkuid) { if(m_loaded) { - m_startCaptureFunction(party); + m_startCaptureFunction(party, orkuid); } else { @@ -204,6 +212,18 @@ void CapturePluginProxy::StopCapture(CStdString& party) } } +void CapturePluginProxy::PauseCapture(CStdString& party, CStdString& orkuid) +{ + if(m_loaded) + { + m_pauseCaptureFunction(party, orkuid); + } + else + { + throw(CStdString("PauseCapture: Capture plugin not yet loaded")); + } +} + void __CDECL__ CapturePluginProxy::AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort) { // find the right port and give it the audio chunk diff --git a/orkbasecxx/CapturePluginProxy.h b/orkbasecxx/CapturePluginProxy.h index 631976d..a3d4257 100644 --- a/orkbasecxx/CapturePluginProxy.h +++ b/orkbasecxx/CapturePluginProxy.h @@ -28,7 +28,8 @@ public: void Run(); void Shutdown(); - void StartCapture(CStdString& party); + void StartCapture(CStdString& party, CStdString& orkuid); + void PauseCapture(CStdString& party, CStdString& orkuid); void StopCapture(CStdString& party); static void __CDECL__ AudioChunkCallBack(AudioChunkRef chunkRef, CStdString& capturePort); @@ -44,6 +45,7 @@ private: RunFunction m_runFunction; StartCaptureFunction m_startCaptureFunction; StopCaptureFunction m_stopCaptureFunction; + PauseCaptureFunction m_pauseCaptureFunction; ACE_DLL m_dll; bool m_loaded; diff --git a/orkbasecxx/Config.cpp b/orkbasecxx/Config.cpp index f3f5cb8..db8622f 100644 --- a/orkbasecxx/Config.cpp +++ b/orkbasecxx/Config.cpp @@ -76,6 +76,7 @@ Config::Config() m_audioGain = AUDIO_GAIN_DEFAULT; m_audioGainChannel1 = AUDIO_GAIN_CHANNEL_1_DEFAULT; m_audioGainChannel2 = AUDIO_GAIN_CHANNEL_2_DEFAULT; + m_eventStreamingServerPort = STREAMING_SERVER_PORT_DEFAULT; } void Config::Define(Serializer* s) @@ -156,6 +157,7 @@ void Config::Define(Serializer* s) s->DoubleValue(AUDIO_GAIN_PARAM, m_audioGain); s->DoubleValue(AUDIO_GAIN_CHANNEL_1_PARAM, m_audioGainChannel1); s->DoubleValue(AUDIO_GAIN_CHANNEL_2_PARAM, m_audioGainChannel2); + s->IntValue(STREAMING_SERVER_PORT_PARAM, m_eventStreamingServerPort); } void Config::Validate() diff --git a/orkbasecxx/Config.h b/orkbasecxx/Config.h index 8e7634c..0d8d565 100644 --- a/orkbasecxx/Config.h +++ b/orkbasecxx/Config.h @@ -96,6 +96,8 @@ #define COMMAND_LINE_SERVER_PORT_DEFAULT 59130 #define HTTP_SERVER_PORT_PARAM "HttpServerPort" #define HTTP_SERVER_PORT_DEFAULT 59140 +#define STREAMING_SERVER_PORT_PARAM "EventStreamingServerPort" +#define STREAMING_SERVER_PORT_DEFAULT 59150 #define LOOKBACK_RECORDING_PARAM "LookBackRecording" #define LOOKBACK_RECORDING_DEFAULT true #define ALLOW_AUTOMATIC_RECORDING_PARAM "AllowAutomaticRecording" @@ -175,6 +177,7 @@ public: CStdString m_remoteProcessingServiceName; int m_commandLineServerPort; int m_httpServerPort; + int m_eventStreamingServerPort; bool m_lookBackRecording; bool m_allowAutomaticRecording; int m_captureFileSizeLimitKb; diff --git a/orkbasecxx/EventStreaming.cpp b/orkbasecxx/EventStreaming.cpp new file mode 100644 index 0000000..4074e14 --- /dev/null +++ b/orkbasecxx/EventStreaming.cpp @@ -0,0 +1,109 @@ +/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#pragma warning( disable: 4786 ) // disables truncated symbols in browse-info warning
+
+#define _WINSOCKAPI_ // prevents the inclusion of winsock.h
+
+#include "EventStreaming.h"
+#include "ConfigManager.h"
+
+//==========================================================
+
+EventStreamingSession::EventStreamingSession()
+{
+}
+
+void EventStreamingSession::AddTapeMessage(MessageRef& message)
+{
+ if(m_messages.size() > 10000)
+ {
+ m_messages.pop_front();
+ }
+ m_messages.push_back(message);
+ m_semaphore.release();
+}
+
+void EventStreamingSession::GetTapeMessage(MessageRef& message)
+{
+ MutexSentinel mutexSentinel(m_mutex);
+
+ if(m_messages.size() > 0)
+ {
+ message = m_messages.front();
+ m_messages.pop_front();
+ }
+}
+
+int EventStreamingSession::GetNumMessages()
+{
+ MutexSentinel mutexSentinel(m_mutex);
+
+ return m_messages.size();
+}
+
+void EventStreamingSession::WaitForMessages()
+{
+ m_semaphore.acquire();
+}
+
+//==============================================
+
+EventStreaming::EventStreaming()
+{
+}
+
+void EventStreaming::GetLiveSessions(std::list<EventStreamingSessionRef>& sessions)
+{
+ MutexSentinel sentinel(m_mutex);
+
+ for(std::list<EventStreamingSessionRef>::iterator it = m_sessions.begin(); it != m_sessions.end(); it++)
+ {
+ EventStreamingSessionRef session = *it;
+
+ sessions.push_back(session);
+ }
+}
+
+void EventStreaming::AddSession(EventStreamingSessionRef& session)
+{
+ MutexSentinel sentinel(m_mutex);
+ m_sessions.push_back(session);
+}
+
+void EventStreaming::RemoveSession(EventStreamingSessionRef& session)
+{
+ MutexSentinel sentinel(m_mutex);
+ m_sessions.remove(session);
+}
+
+int EventStreaming::GetNumSessions()
+{
+ MutexSentinel sentinel(m_mutex);
+ return m_sessions.size();
+}
+
+CStdString EventStreaming::GetNewSessionId()
+{
+ return m_alphaCounter.GetNext();
+}
+
+void EventStreaming::AddTapeMessage(MessageRef& message)
+{
+ MutexSentinel sentinel(m_mutex);
+
+ for(std::list<EventStreamingSessionRef>::iterator it = m_sessions.begin(); it != m_sessions.end(); it++)
+ {
+ (*it)->AddTapeMessage(message);
+ }
+}
+
diff --git a/orkbasecxx/EventStreaming.h b/orkbasecxx/EventStreaming.h new file mode 100644 index 0000000..da0daa3 --- /dev/null +++ b/orkbasecxx/EventStreaming.h @@ -0,0 +1,68 @@ +/*
+ * Oreka -- A media capture and retrieval platform
+ *
+ * Copyright (C) 2005, orecx LLC
+ *
+ * http://www.orecx.com
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License.
+ * Please refer to http://www.gnu.org/copyleft/gpl.html
+ *
+ */
+#include "LogManager.h"
+#include "Filter.h"
+#include <math.h>
+#include "Utils.h"
+#include <queue>
+#include <list>
+#include "boost/shared_ptr.hpp"
+#include "ace/Singleton.h"
+#include "ace/Thread_Mutex.h"
+#include "ace/Thread_Semaphore.h"
+#include "AudioCapture.h"
+#include "ConfigManager.h"
+#include "CapturePluginProxy.h"
+#include "AudioTape.h"
+
+//==========================================================
+
+class EventStreamingSession
+{
+public:
+ EventStreamingSession();
+
+ void AddTapeMessage(MessageRef& message);
+ void GetTapeMessage(MessageRef& message);
+ int GetNumMessages();
+ void WaitForMessages();
+private:
+ std::list<MessageRef> m_messages;
+ ACE_Thread_Mutex m_mutex;
+ ACE_Thread_Semaphore m_semaphore;
+};
+typedef boost::shared_ptr<EventStreamingSession> EventStreamingSessionRef;
+
+//==========================================================
+
+class EventStreaming
+{
+public:
+ EventStreaming();
+
+ void GetLiveSessions(std::list<EventStreamingSessionRef>& sessions);
+ void AddSession(EventStreamingSessionRef& session);
+ void RemoveSession(EventStreamingSessionRef& session);
+ int GetNumSessions();
+ CStdString GetNewSessionId();
+ void AddTapeMessage(MessageRef& message);
+
+private:
+ AlphaCounter m_alphaCounter;
+ ACE_Thread_Mutex m_mutex;
+ std::list<EventStreamingSessionRef> m_sessions;
+};
+typedef ACE_Singleton<EventStreaming, ACE_Thread_Mutex> EventStreamingSingleton;
+
+//==========================================================
+
diff --git a/orkbasecxx/Makefile.am b/orkbasecxx/Makefile.am index 3f30d49..2810683 100644 --- a/orkbasecxx/Makefile.am +++ b/orkbasecxx/Makefile.am @@ -15,7 +15,7 @@ liborkbase_la_SOURCES = Filter.cpp g711.c \ CapturePluginProxy.cpp CapturePort.cpp \ Daemon.cpp ImmediateProcessing.cpp \ Reporting.cpp TapeFileNaming.cpp \ - PartyFilter.cpp + PartyFilter.cpp EventStreaming.cpp #INCLUDES = -I/projects/ext/xmlrpc++/xmlrpc++0.7/src SUBDIRS = messages serializers audiofile filters liborkbase_la_LIBADD = $(top_builddir)/serializers/libserializers.la \ diff --git a/orkbasecxx/MultiThreadedServer.cpp b/orkbasecxx/MultiThreadedServer.cpp index cec42db..60154b6 100644 --- a/orkbasecxx/MultiThreadedServer.cpp +++ b/orkbasecxx/MultiThreadedServer.cpp @@ -1,6 +1,6 @@ /* * Oreka -- A media capture and retrieval platform - * + * * Copyright (C) 2005, orecx LLC * * http://www.orecx.com @@ -26,6 +26,8 @@ #include "MultiThreadedServer.h" +#include "EventStreaming.h" + log4cxx::LoggerPtr CommandLineServer::s_log; // This is run at the start of each connection @@ -64,7 +66,7 @@ void CommandLineServer::run(void* args) int CommandLineServer::svc(void) { - for (bool active = true;active == true;) + for (bool active = true;active == true;) { char buf[2048]; ACE_Time_Value timeout; @@ -181,16 +183,16 @@ int HttpServer::svc(void) { throw (CStdString("Malformed http request")); ; } - *stopUrl = '\0'; // Remove post-URL trailing stuff + *stopUrl = '\0'; // Remove post-URL trailing stuff CStdString url(buf+startUrlOffset); int queryOffset = url.Find("?"); - if (queryOffset > 0) + if (queryOffset > 0) { // Strip beginning of URL in case the command is received as an URL "query" of the form: // http://hostname/service/command?type=ping url = url.Right(url.size() - queryOffset - 1); } - + CStdString className = UrlSerializer::FindClass(url); ObjectRef objRef = ObjectFactory::GetSingleton()->NewInstance(className); @@ -208,9 +210,9 @@ int HttpServer::svc(void) DOMImplementation* impl = DOMImplementationRegistry::getDOMImplementation(XStr("Core").unicodeForm()); XERCES_CPP_NAMESPACE::DOMDocument* myDoc; myDoc = impl->createDocument( - 0, // root element namespace URI. - XStr("response").unicodeForm(), // root element name - 0); // document type object (DTD). + 0, // root element namespace URI. + XStr("response").unicodeForm(), // root element name + 0); // document type object (DTD). response->SerializeDom(myDoc); CStdString pingResponse = DomSerializer::DomNodeToString(myDoc); @@ -245,3 +247,121 @@ int HttpServer::svc(void) return 0; } +//============================================== + +log4cxx::LoggerPtr EventStreamingServer::s_log; + +int EventStreamingServer::open(void *void_acceptor) +{ + return this->activate (THR_DETACHED); +} + +void EventStreamingServer::run(void* args) +{ + unsigned short tcpPort = (unsigned short)(ACE_UINT64)args; + CStdString tcpPortString = IntToString(tcpPort); + EventStreamingAcceptor peer_acceptor; + ACE_INET_Addr addr (tcpPort); + ACE_Reactor reactor; + + s_log = log4cxx::Logger::getLogger("interface.eventstreamingserver"); + + if (peer_acceptor.open (addr, &reactor) == -1) + { + LOG4CXX_ERROR(s_log, CStdString("Failed to start event streaming server on port:") + tcpPortString); + } + else + { + LOG4CXX_INFO(s_log, CStdString("Started event streaming server on port:")+tcpPortString); + for(;;) + { + reactor.handle_events(); + } + } +} + +int EventStreamingServer::svc(void) +{ + ACE_Time_Value timeout; + char buf[2048]; + CStdString logMsg; + CStdString sessionId; + int messagesSent = 0; + + ssize_t size = peer().recv(buf, 2040); + + if(size <= 5) + { + CStdString notFound("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nNot found\r\n"); + peer().send(notFound, notFound.GetLength()); + return 0; + } + + try + { + int startUrlOffset = 5; + char* stopUrl = ACE_OS::strstr(buf+startUrlOffset, " HTTP"); + + if(!stopUrl) + { + throw (CStdString("Malformed http request")); + } + + CStdString header; + struct tm date = {0}; + time_t now = time(NULL); + CStdString rfc822Date; + + ACE_OS::gmtime_r(&now, &date); + rfc822Date.Format("Tue, %.2d Nov %.4d %.2d:%.2d:%.2d GMT", date.tm_mday, (date.tm_year+1900), date.tm_hour, date.tm_min, date.tm_sec); + header.Format("HTTP/1.1 200 OK\r\nLast-Modified:%s\r\nContent-Type:text/plain\r\n\r\n", rfc822Date); + peer().send(header, header.GetLength()); + + time_t startTime = time(NULL); + time_t lastSentTime = time(NULL); + + sessionId = EventStreamingSingleton::instance()->GetNewSessionId() + " -"; + logMsg.Format("%s Event streaming start", sessionId); + LOG4CXX_INFO(s_log, logMsg); + + EventStreamingSessionRef session(new EventStreamingSession()); + EventStreamingSingleton::instance()->AddSession(session); + + int sendRes = 0; + while(sendRes >= 0) + { + session->WaitForMessages(); + + while(session->GetNumMessages() && sendRes >= 0) + { + MessageRef message; + + session->GetTapeMessage(message); + if(message.get()) + { + CStdString msgAsSingleLineString; + + msgAsSingleLineString.Format("%s\r\n", message->SerializeSingleLine()); + sendRes = peer().send(msgAsSingleLineString, msgAsSingleLineString.GetLength()); + if(sendRes >= 0) + { + messagesSent += 1; + } + } + } + } + + EventStreamingSingleton::instance()->RemoveSession(session); + logMsg.Format("%s Stream client stop - sent %d messages in %d sec", sessionId, messagesSent, (time(NULL) - startTime)); + LOG4CXX_INFO(s_log, logMsg); + } + catch (CStdString& e) + { + CStdString error("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nError\r\n"); + error = error + e + "\r\n"; + LOG4CXX_ERROR(s_log, e); + peer().send(error, error.GetLength()); + } + + return 0; +} diff --git a/orkbasecxx/MultiThreadedServer.h b/orkbasecxx/MultiThreadedServer.h index f6ea6dc..58c0622 100644 --- a/orkbasecxx/MultiThreadedServer.h +++ b/orkbasecxx/MultiThreadedServer.h @@ -61,5 +61,25 @@ private: }; typedef ACE_Acceptor<HttpServer, ACE_SOCK_ACCEPTOR> HttpAcceptor; +//========================================================== + +/** This server is a lightweight http server that prints out the single line + format of all events from a given port, one thread per connection e.g + http://localhost:23000/message=streamevents +*/ +class EventStreamingServer : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> +{ +public: + virtual int open (void *); + /** daemon thread */ + static void run(void *args); + /** service routine */ + virtual int svc (void); + +private: + static log4cxx::LoggerPtr s_log; +}; +typedef ACE_Acceptor<EventStreamingServer, ACE_SOCK_ACCEPTOR> EventStreamingAcceptor; + #endif diff --git a/orkbasecxx/Reporting.cpp b/orkbasecxx/Reporting.cpp index 3af76d7..d0ffa40 100644 --- a/orkbasecxx/Reporting.cpp +++ b/orkbasecxx/Reporting.cpp @@ -23,6 +23,7 @@ #include "Daemon.h" #include "CapturePluginProxy.h" #include "ace/Thread_Manager.h" +#include "EventStreaming.h" #ifdef WIN32 # ifndef snprintf @@ -186,6 +187,28 @@ void Reporting::AddTapeMessage(MessageRef& messageRef) } } } + + // Send this message to the event streaming system + reportingMsgRef.reset(new TapeMsg); + pRptTapeMsg = (TapeMsg*)reportingMsgRef.get(); + + pRptTapeMsg->m_recId = pTapeMsg->m_recId; + pRptTapeMsg->m_fileName = pTapeMsg->m_fileName; + pRptTapeMsg->m_stage = pTapeMsg->m_stage; + pRptTapeMsg->m_capturePort = pTapeMsg->m_capturePort; + pRptTapeMsg->m_localParty = pTapeMsg->m_localParty; + pRptTapeMsg->m_localEntryPoint = pTapeMsg->m_localEntryPoint; + pRptTapeMsg->m_remoteParty = pTapeMsg->m_remoteParty; + pRptTapeMsg->m_direction = pTapeMsg->m_direction; + pRptTapeMsg->m_duration = pTapeMsg->m_duration; + pRptTapeMsg->m_timestamp = pTapeMsg->m_timestamp; + pRptTapeMsg->m_localIp = pTapeMsg->m_localIp; + pRptTapeMsg->m_remoteIp = pTapeMsg->m_remoteIp; + pRptTapeMsg->m_nativeCallId = pTapeMsg->m_nativeCallId; + // Copy the tags! + std::copy(pTapeMsg->m_tags.begin(), pTapeMsg->m_tags.end(), std::inserter(pRptTapeMsg->m_tags, pRptTapeMsg->m_tags.begin())); + + EventStreamingSingleton::instance()->AddTapeMessage(reportingMsgRef); } void Reporting::AddAudioTape(AudioTapeRef& audioTapeRef) @@ -348,8 +371,9 @@ void ReportingThread::Run() // Tape is wanted if(CONFIG.m_lookBackRecording == false && CONFIG.m_allowAutomaticRecording && ptapeMsg->m_stage.Equals("start")) { - CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty); - CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty); + CStdString orkuid = ""; + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_localParty, orkuid); + CapturePluginProxy::Singleton()->StartCapture(ptapeMsg->m_remoteParty, orkuid); } } //else diff --git a/orkbasecxx/messages/RecordMsg.cpp b/orkbasecxx/messages/RecordMsg.cpp index 46f3778..b72300f 100644 --- a/orkbasecxx/messages/RecordMsg.cpp +++ b/orkbasecxx/messages/RecordMsg.cpp @@ -16,15 +16,50 @@ #include "CapturePluginProxy.h" #define RECORD_CLASS "record" +#define PAUSE_CLASS "pause" + +void PauseMsg::Define(Serializer* s) +{ + CStdString pauseClass(PAUSE_CLASS); + s->StringValue(OBJECT_TYPE_TAG, pauseClass, true); + s->StringValue(PARTY_PARAM, m_party, false); + s->StringValue(ORKUID_PARAM, m_orkuid, false); +} + +CStdString PauseMsg::GetClassName() +{ + return CStdString(PAUSE_CLASS); +} + +ObjectRef PauseMsg::NewInstance() +{ + return ObjectRef(new PauseMsg); +} + +ObjectRef PauseMsg::Process() +{ + SimpleResponseMsg* msg = new SimpleResponseMsg; + ObjectRef ref(msg); + CStdString logMsg; + + logMsg.Format("Pausing capture for party:%s orkuid:%s", m_party, m_orkuid); + CapturePluginProxy::Singleton()->PauseCapture(m_party, m_orkuid); + msg->m_success = true; + msg->m_comment = logMsg; + + return ref; +} + +//=================================================== void RecordMsg::Define(Serializer* s) { CStdString recordClass(RECORD_CLASS); s->StringValue(OBJECT_TYPE_TAG, recordClass, true); - s->StringValue(PARTY_PARAM, m_party, true); + s->StringValue(PARTY_PARAM, m_party, false); + s->StringValue(ORKUID_PARAM, m_orkuid, false); } - CStdString RecordMsg::GetClassName() { return CStdString(RECORD_CLASS); @@ -41,8 +76,8 @@ ObjectRef RecordMsg::Process() ObjectRef ref(msg); CStdString logMsg; - logMsg.Format("Starting capture for %s", m_party); - CapturePluginProxy::Singleton()->StartCapture(m_party); + logMsg.Format("Starting capture for party:%s orkuid:%s", m_party, m_orkuid); + CapturePluginProxy::Singleton()->StartCapture(m_party, m_orkuid); msg->m_success = true; msg->m_comment = logMsg; diff --git a/orkbasecxx/messages/RecordMsg.h b/orkbasecxx/messages/RecordMsg.h index 35803be..c374476 100644 --- a/orkbasecxx/messages/RecordMsg.h +++ b/orkbasecxx/messages/RecordMsg.h @@ -28,6 +28,21 @@ public: ObjectRef Process(); CStdString m_party; + CStdString m_orkuid; +}; + +class DLL_IMPORT_EXPORT_ORKBASE PauseMsg : public SyncMessage +{ +public: + void Define(Serializer* s); + inline void Validate() {}; + + CStdString GetClassName(); + ObjectRef NewInstance(); + ObjectRef Process(); + + CStdString m_party; + CStdString m_orkuid; }; #endif |