summaryrefslogtreecommitdiff
path: root/orkbasecxx/MultiThreadedServer.cpp
diff options
context:
space:
mode:
authorGerald Begumisa <ben_g@users.sourceforge.net>2009-01-07 13:07:31 +0000
committerGerald Begumisa <ben_g@users.sourceforge.net>2009-01-07 13:07:31 +0000
commit0e2a2e49077b79bd52d6c83c5202e452e7eea091 (patch)
tree58ae466e2c74be585548f10ffff96560099c9370 /orkbasecxx/MultiThreadedServer.cpp
parentded03e95a1eb78cb16ef3a4a0dc9bb140bd748ba (diff)
Modified the orkaudio API, adding the record and pause HTTP commands. The record command commences or un-pauses recording while the pause command pauses recording - discarding RTP packets from when the pause command is issued. Both commands require the orkuid and party to be specified as HTTP parameters. Note that this represents a change in the arguments required for the StartCapture function in DLLs. Also added an event streaming feature which streams out all tape messages as they are reported in real-time to a client connected. Clients should connect via HTTP, on port 59150. The port is configurable by setting the parameter EventStreamingServerPort in config.xml
git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@590 09dcff7a-b715-0410-9601-b79a96267cd0
Diffstat (limited to 'orkbasecxx/MultiThreadedServer.cpp')
-rw-r--r--orkbasecxx/MultiThreadedServer.cpp136
1 files changed, 128 insertions, 8 deletions
diff --git a/orkbasecxx/MultiThreadedServer.cpp b/orkbasecxx/MultiThreadedServer.cpp
index cec42db..60154b6 100644
--- a/orkbasecxx/MultiThreadedServer.cpp
+++ b/orkbasecxx/MultiThreadedServer.cpp
@@ -1,6 +1,6 @@
/*
* Oreka -- A media capture and retrieval platform
- *
+ *
* Copyright (C) 2005, orecx LLC
*
* http://www.orecx.com
@@ -26,6 +26,8 @@
#include "MultiThreadedServer.h"
+#include "EventStreaming.h"
+
log4cxx::LoggerPtr CommandLineServer::s_log;
// This is run at the start of each connection
@@ -64,7 +66,7 @@ void CommandLineServer::run(void* args)
int CommandLineServer::svc(void)
{
- for (bool active = true;active == true;)
+ for (bool active = true;active == true;)
{
char buf[2048];
ACE_Time_Value timeout;
@@ -181,16 +183,16 @@ int HttpServer::svc(void)
{
throw (CStdString("Malformed http request")); ;
}
- *stopUrl = '\0'; // Remove post-URL trailing stuff
+ *stopUrl = '\0'; // Remove post-URL trailing stuff
CStdString url(buf+startUrlOffset);
int queryOffset = url.Find("?");
- if (queryOffset > 0)
+ if (queryOffset > 0)
{
// Strip beginning of URL in case the command is received as an URL "query" of the form:
// http://hostname/service/command?type=ping
url = url.Right(url.size() - queryOffset - 1);
}
-
+
CStdString className = UrlSerializer::FindClass(url);
ObjectRef objRef = ObjectFactory::GetSingleton()->NewInstance(className);
@@ -208,9 +210,9 @@ int HttpServer::svc(void)
DOMImplementation* impl = DOMImplementationRegistry::getDOMImplementation(XStr("Core").unicodeForm());
XERCES_CPP_NAMESPACE::DOMDocument* myDoc;
myDoc = impl->createDocument(
- 0, // root element namespace URI.
- XStr("response").unicodeForm(), // root element name
- 0); // document type object (DTD).
+ 0, // root element namespace URI.
+ XStr("response").unicodeForm(), // root element name
+ 0); // document type object (DTD).
response->SerializeDom(myDoc);
CStdString pingResponse = DomSerializer::DomNodeToString(myDoc);
@@ -245,3 +247,121 @@ int HttpServer::svc(void)
return 0;
}
+//==============================================
+
+log4cxx::LoggerPtr EventStreamingServer::s_log;
+
+int EventStreamingServer::open(void *void_acceptor)
+{
+ return this->activate (THR_DETACHED);
+}
+
+void EventStreamingServer::run(void* args)
+{
+ unsigned short tcpPort = (unsigned short)(ACE_UINT64)args;
+ CStdString tcpPortString = IntToString(tcpPort);
+ EventStreamingAcceptor peer_acceptor;
+ ACE_INET_Addr addr (tcpPort);
+ ACE_Reactor reactor;
+
+ s_log = log4cxx::Logger::getLogger("interface.eventstreamingserver");
+
+ if (peer_acceptor.open (addr, &reactor) == -1)
+ {
+ LOG4CXX_ERROR(s_log, CStdString("Failed to start event streaming server on port:") + tcpPortString);
+ }
+ else
+ {
+ LOG4CXX_INFO(s_log, CStdString("Started event streaming server on port:")+tcpPortString);
+ for(;;)
+ {
+ reactor.handle_events();
+ }
+ }
+}
+
+int EventStreamingServer::svc(void)
+{
+ ACE_Time_Value timeout;
+ char buf[2048];
+ CStdString logMsg;
+ CStdString sessionId;
+ int messagesSent = 0;
+
+ ssize_t size = peer().recv(buf, 2040);
+
+ if(size <= 5)
+ {
+ CStdString notFound("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nNot found\r\n");
+ peer().send(notFound, notFound.GetLength());
+ return 0;
+ }
+
+ try
+ {
+ int startUrlOffset = 5;
+ char* stopUrl = ACE_OS::strstr(buf+startUrlOffset, " HTTP");
+
+ if(!stopUrl)
+ {
+ throw (CStdString("Malformed http request"));
+ }
+
+ CStdString header;
+ struct tm date = {0};
+ time_t now = time(NULL);
+ CStdString rfc822Date;
+
+ ACE_OS::gmtime_r(&now, &date);
+ rfc822Date.Format("Tue, %.2d Nov %.4d %.2d:%.2d:%.2d GMT", date.tm_mday, (date.tm_year+1900), date.tm_hour, date.tm_min, date.tm_sec);
+ header.Format("HTTP/1.1 200 OK\r\nLast-Modified:%s\r\nContent-Type:text/plain\r\n\r\n", rfc822Date);
+ peer().send(header, header.GetLength());
+
+ time_t startTime = time(NULL);
+ time_t lastSentTime = time(NULL);
+
+ sessionId = EventStreamingSingleton::instance()->GetNewSessionId() + " -";
+ logMsg.Format("%s Event streaming start", sessionId);
+ LOG4CXX_INFO(s_log, logMsg);
+
+ EventStreamingSessionRef session(new EventStreamingSession());
+ EventStreamingSingleton::instance()->AddSession(session);
+
+ int sendRes = 0;
+ while(sendRes >= 0)
+ {
+ session->WaitForMessages();
+
+ while(session->GetNumMessages() && sendRes >= 0)
+ {
+ MessageRef message;
+
+ session->GetTapeMessage(message);
+ if(message.get())
+ {
+ CStdString msgAsSingleLineString;
+
+ msgAsSingleLineString.Format("%s\r\n", message->SerializeSingleLine());
+ sendRes = peer().send(msgAsSingleLineString, msgAsSingleLineString.GetLength());
+ if(sendRes >= 0)
+ {
+ messagesSent += 1;
+ }
+ }
+ }
+ }
+
+ EventStreamingSingleton::instance()->RemoveSession(session);
+ logMsg.Format("%s Stream client stop - sent %d messages in %d sec", sessionId, messagesSent, (time(NULL) - startTime));
+ LOG4CXX_INFO(s_log, logMsg);
+ }
+ catch (CStdString& e)
+ {
+ CStdString error("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nError\r\n");
+ error = error + e + "\r\n";
+ LOG4CXX_ERROR(s_log, e);
+ peer().send(error, error.GetLength());
+ }
+
+ return 0;
+}