diff options
Diffstat (limited to 'orkbasecxx/MultiThreadedServer.cpp')
-rw-r--r-- | orkbasecxx/MultiThreadedServer.cpp | 136 |
1 files changed, 128 insertions, 8 deletions
diff --git a/orkbasecxx/MultiThreadedServer.cpp b/orkbasecxx/MultiThreadedServer.cpp index cec42db..60154b6 100644 --- a/orkbasecxx/MultiThreadedServer.cpp +++ b/orkbasecxx/MultiThreadedServer.cpp @@ -1,6 +1,6 @@ /* * Oreka -- A media capture and retrieval platform - * + * * Copyright (C) 2005, orecx LLC * * http://www.orecx.com @@ -26,6 +26,8 @@ #include "MultiThreadedServer.h" +#include "EventStreaming.h" + log4cxx::LoggerPtr CommandLineServer::s_log; // This is run at the start of each connection @@ -64,7 +66,7 @@ void CommandLineServer::run(void* args) int CommandLineServer::svc(void) { - for (bool active = true;active == true;) + for (bool active = true;active == true;) { char buf[2048]; ACE_Time_Value timeout; @@ -181,16 +183,16 @@ int HttpServer::svc(void) { throw (CStdString("Malformed http request")); ; } - *stopUrl = '\0'; // Remove post-URL trailing stuff + *stopUrl = '\0'; // Remove post-URL trailing stuff CStdString url(buf+startUrlOffset); int queryOffset = url.Find("?"); - if (queryOffset > 0) + if (queryOffset > 0) { // Strip beginning of URL in case the command is received as an URL "query" of the form: // http://hostname/service/command?type=ping url = url.Right(url.size() - queryOffset - 1); } - + CStdString className = UrlSerializer::FindClass(url); ObjectRef objRef = ObjectFactory::GetSingleton()->NewInstance(className); @@ -208,9 +210,9 @@ int HttpServer::svc(void) DOMImplementation* impl = DOMImplementationRegistry::getDOMImplementation(XStr("Core").unicodeForm()); XERCES_CPP_NAMESPACE::DOMDocument* myDoc; myDoc = impl->createDocument( - 0, // root element namespace URI. - XStr("response").unicodeForm(), // root element name - 0); // document type object (DTD). + 0, // root element namespace URI. + XStr("response").unicodeForm(), // root element name + 0); // document type object (DTD). response->SerializeDom(myDoc); CStdString pingResponse = DomSerializer::DomNodeToString(myDoc); @@ -245,3 +247,121 @@ int HttpServer::svc(void) return 0; } +//============================================== + +log4cxx::LoggerPtr EventStreamingServer::s_log; + +int EventStreamingServer::open(void *void_acceptor) +{ + return this->activate (THR_DETACHED); +} + +void EventStreamingServer::run(void* args) +{ + unsigned short tcpPort = (unsigned short)(ACE_UINT64)args; + CStdString tcpPortString = IntToString(tcpPort); + EventStreamingAcceptor peer_acceptor; + ACE_INET_Addr addr (tcpPort); + ACE_Reactor reactor; + + s_log = log4cxx::Logger::getLogger("interface.eventstreamingserver"); + + if (peer_acceptor.open (addr, &reactor) == -1) + { + LOG4CXX_ERROR(s_log, CStdString("Failed to start event streaming server on port:") + tcpPortString); + } + else + { + LOG4CXX_INFO(s_log, CStdString("Started event streaming server on port:")+tcpPortString); + for(;;) + { + reactor.handle_events(); + } + } +} + +int EventStreamingServer::svc(void) +{ + ACE_Time_Value timeout; + char buf[2048]; + CStdString logMsg; + CStdString sessionId; + int messagesSent = 0; + + ssize_t size = peer().recv(buf, 2040); + + if(size <= 5) + { + CStdString notFound("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nNot found\r\n"); + peer().send(notFound, notFound.GetLength()); + return 0; + } + + try + { + int startUrlOffset = 5; + char* stopUrl = ACE_OS::strstr(buf+startUrlOffset, " HTTP"); + + if(!stopUrl) + { + throw (CStdString("Malformed http request")); + } + + CStdString header; + struct tm date = {0}; + time_t now = time(NULL); + CStdString rfc822Date; + + ACE_OS::gmtime_r(&now, &date); + rfc822Date.Format("Tue, %.2d Nov %.4d %.2d:%.2d:%.2d GMT", date.tm_mday, (date.tm_year+1900), date.tm_hour, date.tm_min, date.tm_sec); + header.Format("HTTP/1.1 200 OK\r\nLast-Modified:%s\r\nContent-Type:text/plain\r\n\r\n", rfc822Date); + peer().send(header, header.GetLength()); + + time_t startTime = time(NULL); + time_t lastSentTime = time(NULL); + + sessionId = EventStreamingSingleton::instance()->GetNewSessionId() + " -"; + logMsg.Format("%s Event streaming start", sessionId); + LOG4CXX_INFO(s_log, logMsg); + + EventStreamingSessionRef session(new EventStreamingSession()); + EventStreamingSingleton::instance()->AddSession(session); + + int sendRes = 0; + while(sendRes >= 0) + { + session->WaitForMessages(); + + while(session->GetNumMessages() && sendRes >= 0) + { + MessageRef message; + + session->GetTapeMessage(message); + if(message.get()) + { + CStdString msgAsSingleLineString; + + msgAsSingleLineString.Format("%s\r\n", message->SerializeSingleLine()); + sendRes = peer().send(msgAsSingleLineString, msgAsSingleLineString.GetLength()); + if(sendRes >= 0) + { + messagesSent += 1; + } + } + } + } + + EventStreamingSingleton::instance()->RemoveSession(session); + logMsg.Format("%s Stream client stop - sent %d messages in %d sec", sessionId, messagesSent, (time(NULL) - startTime)); + LOG4CXX_INFO(s_log, logMsg); + } + catch (CStdString& e) + { + CStdString error("HTTP/1.0 404 not found\r\nContent-type: text/html\r\n\r\nError\r\n"); + error = error + e + "\r\n"; + LOG4CXX_ERROR(s_log, e); + peer().send(error, error.GetLength()); + } + + return 0; +} |