From d860e8b3295b218f142dd04ef5b58a44db2bca03 Mon Sep 17 00:00:00 2001 From: beg_g Date: Thu, 1 Oct 2009 15:41:51 +0000 Subject: Added an initialization message which is sent to orktrack over HTTP when orkaudio is first started. git-svn-id: https://oreka.svn.sourceforge.net/svnroot/oreka/trunk@643 09dcff7a-b715-0410-9601-b79a96267cd0 --- orkaudio/OrkAudio.cpp | 2 + orkbasecxx/Makefile.am | 3 +- orkbasecxx/OrkTrack.cpp | 88 +++++++++++++++++++++++++++++++++++++++++ orkbasecxx/OrkTrack.h | 27 +++++++++++++ orkbasecxx/messages/InitMsg.cpp | 66 +++++++++++++++++++++++++++++++ orkbasecxx/messages/InitMsg.h | 60 ++++++++++++++++++++++++++++ orkbasecxx/messages/Makefile.am | 2 +- 7 files changed, 246 insertions(+), 2 deletions(-) create mode 100644 orkbasecxx/OrkTrack.cpp create mode 100644 orkbasecxx/OrkTrack.h create mode 100644 orkbasecxx/messages/InitMsg.cpp create mode 100644 orkbasecxx/messages/InitMsg.h diff --git a/orkaudio/OrkAudio.cpp b/orkaudio/OrkAudio.cpp index 0505250..6b497d0 100644 --- a/orkaudio/OrkAudio.cpp +++ b/orkaudio/OrkAudio.cpp @@ -46,6 +46,7 @@ #include "TapeProcessor.h" #include #include "EventStreaming.h" +#include "OrkTrack.h" static volatile bool serviceStop = false; @@ -296,6 +297,7 @@ void MainThread() CapturePluginProxy::Singleton()->Run(); } + OrkTrack::Initialize(); //ACE_Thread_Manager::instance ()->wait (); while(!Daemon::Singleton()->IsStopping()) diff --git a/orkbasecxx/Makefile.am b/orkbasecxx/Makefile.am index 95ad73f..8849d23 100644 --- a/orkbasecxx/Makefile.am +++ b/orkbasecxx/Makefile.am @@ -15,7 +15,8 @@ liborkbase_la_SOURCES = Filter.cpp g711.c \ CapturePluginProxy.cpp CapturePort.cpp \ Daemon.cpp ImmediateProcessing.cpp \ Reporting.cpp TapeFileNaming.cpp \ - PartyFilter.cpp EventStreaming.cpp + PartyFilter.cpp EventStreaming.cpp \ + OrkTrack.cpp #INCLUDES = -I/projects/ext/xmlrpc++/xmlrpc++0.7/src SUBDIRS = messages serializers audiofile filters liborkbase_la_LIBADD = $(top_builddir)/serializers/libserializers.la \ diff --git a/orkbasecxx/OrkTrack.cpp b/orkbasecxx/OrkTrack.cpp new file mode 100644 index 0000000..f0531c1 --- /dev/null +++ b/orkbasecxx/OrkTrack.cpp @@ -0,0 +1,88 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + */ + +#include "OrkTrack.h" +#include "LogManager.h" +#include "ace/Thread_Manager.h" +#include "messages/InitMsg.h" +#include "ConfigManager.h" + +#ifdef WIN32 +#define snprintf _snprintf +#endif + +void OrkTrack::Initialize() +{ + CStdString logMsg; + + for(std::list::iterator it = CONFIG.m_trackerHostname.begin(); it != CONFIG.m_trackerHostname.end(); it++) + { + CStdString trackerHostname = *it; + OrkTrackHost *oth = (OrkTrackHost *)malloc(sizeof(OrkTrackHost)); + + memset(oth, 0, sizeof(OrkTrackHost)); + snprintf(oth->m_serverHostname, sizeof(oth->m_serverHostname), "%s", trackerHostname.c_str()); + oth->m_serverPort = CONFIG.m_trackerTcpPort; + + if (!ACE_Thread_Manager::instance()->spawn(ACE_THR_FUNC(OrkTrack::Run), (void*)oth, THR_DETACHED)) + { + logMsg.Format("OrkTrack::Initialize(): Failed to start thread for %s,%d", oth->m_serverHostname, oth->m_serverPort); + LOG4CXX_WARN(LOG.rootLog, logMsg); + free(oth); + } + } +} + +void OrkTrack::Run(void* args) +{ + OrkTrackHostRef hostRef; + OrkTrackHost *pHost = (OrkTrackHost *)args; + InitMsgRef msgRef(new InitMsg()); + char host[255]; + time_t reportErrorLastTime = 0; + CStdString serverHostname; + CStdString logMsg; + + ACE_OS::hostname(host, sizeof(host)); + + hostRef.reset(pHost); + serverHostname = hostRef->m_serverHostname; + msgRef->m_name.Format("orkaudio-%s", host); + msgRef->m_hostname = host; + msgRef->m_type = "A"; + msgRef->m_tcpPort = 59140; + msgRef->m_contextPath = "/audio"; + msgRef->m_absolutePath = CONFIG.m_audioOutputPath; + + OrkHttpSingleLineClient c; + SimpleResponseMsg response; + CStdString msgAsSingleLineString = msgRef->SerializeSingleLine(); + bool success = false; + + while (!success) + { + if (c.Execute((SyncMessage&)(*msgRef.get()), (AsyncMessage&)response, serverHostname, hostRef->m_serverPort, CONFIG.m_trackerServicename, CONFIG.m_clientTimeout)) + { + success = true; + logMsg.Format("OrkTrack::Run(): [%s,%d] success:%s comment:%s", hostRef->m_serverHostname, hostRef->m_serverPort, (response.m_success == true ? "true" : "false"), response.m_comment); + LOG4CXX_INFO(LOG.rootLog, logMsg); + } + else + { + if(((time(NULL) - reportErrorLastTime) > 60)) + { + reportErrorLastTime = time(NULL); + logMsg.Format("OrkTrack::Run(): [%s,%d] Could not contact orktrack", hostRef->m_serverHostname, hostRef->m_serverPort); + LOG4CXX_WARN(LOG.rootLog, logMsg); + } + + ACE_OS::sleep(CONFIG.m_clientTimeout + 10); + } + } +} diff --git a/orkbasecxx/OrkTrack.h b/orkbasecxx/OrkTrack.h new file mode 100644 index 0000000..33eff4b --- /dev/null +++ b/orkbasecxx/OrkTrack.h @@ -0,0 +1,27 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + */ +#ifndef __ORKTRACK_H__ +#define __ORKTRACK_H__ 1 + +#include "boost/shared_ptr.hpp" +#include "OrkClient.h" + +struct OrkTrackHost { + char m_serverHostname[256]; + int m_serverPort; +}; +typedef boost::shared_ptr OrkTrackHostRef; + +class DLL_IMPORT_EXPORT_ORKBASE OrkTrack { +public: + static void Initialize(); + static void Run(void *args); +}; + +#endif diff --git a/orkbasecxx/messages/InitMsg.cpp b/orkbasecxx/messages/InitMsg.cpp new file mode 100644 index 0000000..35a03be --- /dev/null +++ b/orkbasecxx/messages/InitMsg.cpp @@ -0,0 +1,66 @@ +/* + * Oreka -- A media capture and retrieval platform + * + * Copyright (C) 2005, orecx LLC + * + * http://www.orecx.com + * + */ +#pragma warning( disable: 4786 ) // disables truncated symbols in browse-info warning + +#define _WINSOCKAPI_ // prevents the inclusion of winsock.h + +#include "InitMsg.h" +#include "ConfigManager.h" + +InitMsg::InitMsg() +{ + m_tcpPort = 59140; + m_fileServePort = 8080; + m_streamingPort = 59120; + m_local = false; + m_sshPort = 22; +} + +void InitMsg::Define(Serializer* s) +{ + CStdString initMessageName(INIT_MESSAGE_NAME); + s->StringValue(OBJECT_TYPE_TAG, initMessageName, true); + s->StringValue(NAME_PARAM, m_name, true); + s->StringValue(HOSTNAME_PARAM, m_hostname, true); + s->StringValue(TYPE_PARAM, m_type, true); + + s->IntValue(TCP_PORT_PARAM, m_tcpPort); + s->StringValue(PROTOCOL_PARAM, m_protocol); + s->IntValue(FILE_SERVE_PORT_PARAM, m_fileServePort); + s->StringValue(CONTEXT_PATH_PARAM, m_contextPath); + s->StringValue(SERVE_PATH_PARAM, m_servePath); + s->StringValue(ABSOLUTE_PATH_PARAM, m_absolutePath); + s->IntValue(STREAMING_PORT_PARAM, m_streamingPort); + s->BoolValue(LOCAL_PARAM, m_local); + s->StringValue(USERNAME_PARAM, m_username); + s->StringValue(PASSWORD_PARAM, m_password); + s->IntValue(SSH_PORT_PARAM, m_sshPort); + + DefineMessage(s); +} + +void InitMsg::Validate() +{ +} + +CStdString InitMsg::GetClassName() +{ + return CStdString(INIT_MESSAGE_NAME); +} + +ObjectRef InitMsg::NewInstance() +{ + return ObjectRef(new InitMsg); +} + +ObjectRef InitMsg::Process() +{ + return ObjectRef(); +} + diff --git a/orkbasecxx/messages/InitMsg.h b/orkbasecxx/messages/InitMsg.h new file mode 100644 index 0000000..f0e1584 --- /dev/null +++ b/orkbasecxx/messages/InitMsg.h @@ -0,0 +1,60 @@ +/* + * RfbRecorder -- A Simple RFB recording program + * + * Copyright (C) 2007, orecx LLC + * + * http://www.orecx.com/ + * + */ +#ifndef __INITMSG_H__ +#define __INITMSG_H__ 1 + +#include "messages/SyncMessage.h" +#include "messages/AsyncMessage.h" + +#define INIT_MESSAGE_NAME "init" +#define NAME_PARAM "name" +#define HOSTNAME_PARAM "hostname" +#define TYPE_PARAM "type" +#define TCP_PORT_PARAM "tcpport" +#define PROTOCOL_PARAM "protocol" +#define FILE_SERVE_PORT_PARAM "fileserveport" +#define CONTEXT_PATH_PARAM "contextpath" +#define SERVE_PATH_PARAM "servepath" +#define ABSOLUTE_PATH_PARAM "absolutepath" +#define STREAMING_PORT_PARAM "streamingport" +#define LOCAL_PARAM "local" +#define USERNAME_PARAM "username" +#define PASSWORD_PARAM "password" +#define SSH_PORT_PARAM "sshport" + +class DLL_IMPORT_EXPORT_ORKBASE InitMsg : public SyncMessage +{ +public: + InitMsg(); + + void Define(Serializer* s); + void Validate(); + + CStdString GetClassName(); + ObjectRef NewInstance(); + ObjectRef Process(); + + CStdString m_name; + CStdString m_hostname; + CStdString m_type; + int m_tcpPort; + CStdString m_protocol; + int m_fileServePort; + CStdString m_contextPath; + CStdString m_servePath; + CStdString m_absolutePath; + int m_streamingPort; + bool m_local; + CStdString m_username; + CStdString m_password; + int m_sshPort; +}; +typedef boost::shared_ptr InitMsgRef; + +#endif diff --git a/orkbasecxx/messages/Makefile.am b/orkbasecxx/messages/Makefile.am index 8c56f9e..7504a1c 100644 --- a/orkbasecxx/messages/Makefile.am +++ b/orkbasecxx/messages/Makefile.am @@ -2,7 +2,7 @@ METASOURCES = AUTO noinst_LTLIBRARIES = libmessages.la libmessages_la_SOURCES = AsyncMessage.cpp Message.cpp \ SyncMessage.cpp CaptureMsg.cpp DeleteTapeMsg.cpp \ - PingMsg.cpp TapeMsg.cpp RecordMsg.cpp + PingMsg.cpp TapeMsg.cpp RecordMsg.cpp InitMsg.cpp #libmessages_la_LIBADD = -L/projects/ext/xmlrpc++/xmlrpc++0.7/ -lXmlRpc INCLUDES = -I@top_srcdir@ -I../../orkaudio -- cgit v1.2.3