From 06be8463b683333c79845402d55168ef1b582fa9 Mon Sep 17 00:00:00 2001 From: Matthew Jordan Date: Fri, 24 May 2013 20:44:07 +0000 Subject: Migrate a large number of AMI events over to Stasis-Core This patch moves a number of AMI events over to the Stasis-Core message bus. This includes: * ChanSpyStart/Stop * MonitorStart/Stop * MusicOnHoldStart/Stop * FullyBooted/Reload * All Voicemail/MWI related events In addition, it adds some Stasis-Core and AMI support for generic AMI messages, refactors the message router in AMI to use a single router with topic forwarding for the topics that AMI cares about, and refactors MWI message types and topics to be more name compliant. Review: https://reviewboard.asterisk.org/r/2532 (closes issue ASTERISK-21462) git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@389733 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/manager.c | 171 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 157 insertions(+), 14 deletions(-) (limited to 'main/manager.c') diff --git a/main/manager.c b/main/manager.c index c28e6169b..96fbdae61 100644 --- a/main/manager.c +++ b/main/manager.c @@ -92,6 +92,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/stringfields.h" #include "asterisk/presencestate.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" #include "asterisk/test.h" #include "asterisk/json.h" #include "asterisk/bridging.h" @@ -1062,6 +1063,12 @@ static int block_sockets; static int unauth_sessions = 0; static struct stasis_subscription *acl_change_sub; +/*! \brief A \ref stasis_topic that all topics AMI cares about will be forwarded to */ +static struct stasis_topic *manager_topic; + +/*! \brief The \ref stasis_message_router for all \ref stasis messages */ +static struct stasis_message_router *stasis_router; + #define MGR_SHOW_TERMINAL_WIDTH 80 #define MAX_VARS 128 @@ -1225,6 +1232,12 @@ AO2_GLOBAL_OBJ_STATIC(event_docs); static enum add_filter_result manager_add_filter(const char *filter_pattern, struct ao2_container *whitefilters, struct ao2_container *blackfilters); +/*! + * @{ \brief Define AMI message types. + */ +STASIS_MESSAGE_TYPE_DEFN(ast_manager_get_generic_type); +/*! @} */ + /*! * \internal * \brief Find a registered action object. @@ -1249,6 +1262,89 @@ static struct manager_action *action_find(const char *name) return act; } +struct stasis_topic *ast_manager_get_topic(void) +{ + return manager_topic; +} + +struct stasis_message_router *ast_manager_get_message_router(void) +{ + return stasis_router; +} + +struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_exclusion_cb exclusion_cb) +{ + struct ast_str *output_str = ast_str_create(32); + struct ast_json *value; + const char *key; + if (!output_str) { + return NULL; + } + + ast_json_object_foreach(blob, key, value) { + if (exclusion_cb && exclusion_cb(key)) { + continue; + } + ast_str_append(&output_str, 0, "%s: %s\r\n", key, ast_json_string_get(value)); + if (!output_str) { + return NULL; + } + } + + return output_str; +} + +static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + struct ast_json_payload *payload = stasis_message_data(message); + int class_type = ast_json_integer_get(ast_json_object_get(payload->json, "class_type")); + const char *type = ast_json_string_get(ast_json_object_get(payload->json, "type")); + struct ast_json *event = ast_json_object_get(payload->json, "event"); + RAII_VAR(struct ast_str *, event_buffer, NULL, ast_free); + + event_buffer = ast_manager_str_from_json_object(event, NULL); + if (!event_buffer) { + ast_log(AST_LOG_WARNING, "Error while creating payload for event %s\n", type); + return; + } + manager_event(class_type, type, "%s", ast_str_buffer(event_buffer)); +} + +int ast_manager_publish_message(struct ast_json *obj) +{ + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + RAII_VAR(struct ast_json_payload *, payload, NULL, ao2_cleanup); + struct ast_json *type = ast_json_object_get(obj, "type"); + struct ast_json *class_type = ast_json_object_get(obj, "class_type"); + struct ast_json *event = ast_json_object_get(obj, "event"); + + if (!type) { + ast_log(AST_LOG_ERROR, "Attempt to send generic manager event without type field\n"); + return -1; + } + if (!class_type) { + ast_log(AST_LOG_ERROR, "Attempt to send generic manager event without class type field\n"); + return -1; + } + if (!event) { + ast_log(AST_LOG_ERROR, "Attempt to send generic manager event without event payload\n"); + return -1; + } + + payload = ast_json_payload_create(obj); + if (!payload) { + return -1; + } + message = stasis_message_create(ast_manager_get_generic_type(), payload); + if (!message) { + return -1; + } + stasis_publish(ast_manager_get_topic(), message); + return 0; +} + /*! \brief Add a custom hook to be called when an event is fired */ void ast_manager_register_hook(struct manager_custom_hook *hook) { @@ -5034,24 +5130,29 @@ static int action_corestatus(struct mansession *s, const struct message *m) static int action_reload(struct mansession *s, const struct message *m) { const char *module = astman_get_header(m, "Module"); - int res = ast_module_reload(S_OR(module, NULL)); + enum ast_module_reload_result res = ast_module_reload(S_OR(module, NULL)); switch (res) { - case -1: - astman_send_error(s, m, "A reload is in progress"); - break; - case 0: + case AST_MODULE_RELOAD_NOT_FOUND: astman_send_error(s, m, "No such module"); break; - case 1: + case AST_MODULE_RELOAD_NOT_IMPLEMENTED: astman_send_error(s, m, "Module does not support reload"); break; - case 2: - astman_send_ack(s, m, "Module Reloaded"); - break; - default: + case AST_MODULE_RELOAD_ERROR: astman_send_error(s, m, "An unknown error occurred"); break; + case AST_MODULE_RELOAD_IN_PROGRESS: + astman_send_error(s, m, "A reload is in progress"); + break; + case AST_MODULE_RELOAD_UNINITIALIZED: + astman_send_error(s, m, "Module not initialized"); + break; + case AST_MODULE_RELOAD_QUEUED: + case AST_MODULE_RELOAD_SUCCESS: + /* Treat a queued request as success */ + astman_send_ack(s, m, "Module Reloaded"); + break; } return 0; } @@ -7526,6 +7627,14 @@ static void manager_shutdown(void) ao2_t_global_obj_release(event_docs, "Dispose of event_docs"); #endif + if (stasis_router) { + stasis_message_router_unsubscribe_and_join(stasis_router); + stasis_router = NULL; + } + ao2_cleanup(manager_topic); + manager_topic = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_manager_get_generic_type); + ast_tcptls_server_stop(&ami_desc); ast_tcptls_server_stop(&amis_desc); @@ -7552,6 +7661,31 @@ static void manager_shutdown(void) } } + +/*! \brief Initialize all \ref stasis topics and routers used by the various + * sub-components of AMI + */ +static int manager_subscriptions_init(void) +{ + STASIS_MESSAGE_TYPE_INIT(ast_manager_get_generic_type); + manager_topic = stasis_topic_create("manager_topic"); + if (!manager_topic) { + return -1; + } + stasis_router = stasis_message_router_create(manager_topic); + if (!stasis_router) { + return -1; + } + + if (stasis_message_router_add(stasis_router, + ast_manager_get_generic_type(), + manager_generic_msg_cb, + NULL)) { + return -1; + } + return 0; +} + static int __init_manager(int reload, int by_external_config) { struct ast_config *ucfg = NULL, *cfg = NULL; @@ -7573,8 +7707,19 @@ static int __init_manager(int reload, int by_external_config) manager_enabled = 0; - if (manager_channels_init()) { - return -1; + if (!reload) { + if (manager_subscriptions_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager subscriptions\n"); + return -1; + } + if (manager_channels_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager channel handling\n"); + return -1; + } + if (manager_mwi_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager MWI handling\n"); + return -1; + } } if (manager_bridging_init()) { @@ -8025,8 +8170,6 @@ static int __init_manager(int reload, int by_external_config) httptimeout = newhttptimeout; } - manager_event(EVENT_FLAG_SYSTEM, "Reload", "Module: Manager\r\nStatus: %s\r\nMessage: Manager reload Requested\r\n", manager_enabled ? "Enabled" : "Disabled"); - ast_tcptls_server_start(&ami_desc); if (tls_was_enabled && !ami_tls_cfg.enabled) { ast_tcptls_server_stop(&amis_desc); -- cgit v1.2.3