From 6e2b1a54ab3b3a088fa6414f4d9e4a9206aab734 Mon Sep 17 00:00:00 2001 From: Matthew Jordan Date: Wed, 2 Oct 2013 21:26:34 +0000 Subject: Only create Stasis subscriptions when enabled Subscribing to Stasis isn't free. As such, this patch makes AMI, CDR, and CEL - the "big 3" - only subscribe when enabled. Toggling their availability via a .conf file will unsubscribe/subscribe as appropriate. Review: https://reviewboard.asterisk.org/r/2888/ ........ Merged revisions 400312 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400313 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/cdr.c | 151 ++++++++++++++++++++++++++++++++++----------------------- main/cel.c | 136 ++++++++++++++++++++++++++++++++------------------- main/manager.c | 71 ++++++++++++++++----------- 3 files changed, 221 insertions(+), 137 deletions(-) diff --git a/main/cdr.c b/main/cdr.c index cbd441bbd..64520f774 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -221,6 +221,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static void cdr_detach(struct ast_cdr *cdr); static void cdr_submit_batch(int shutdown); +static int cdr_toggle_runtime_options(void); /*! \brief The configuration settings for this module */ struct module_config { @@ -2566,9 +2567,12 @@ struct ast_cdr_config *ast_cdr_get_config(void) void ast_cdr_set_config(struct ast_cdr_config *config) { RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup); + ao2_cleanup(mod_cfg->general); mod_cfg->general = config; ao2_ref(mod_cfg->general, +1); + + cdr_toggle_runtime_options(); } int ast_cdr_is_enabled(void) @@ -3847,6 +3851,63 @@ static void finalize_batch_mode(void) ast_cdr_engine_term(); } +/*! + * \brief Destroy the active Stasis subscriptions/router/topics + */ +static void destroy_subscriptions(void) +{ + stasis_message_router_unsubscribe_and_join(stasis_router); + stasis_router = NULL; + + ao2_cleanup(cdr_topic); + cdr_topic = NULL; + + channel_subscription = stasis_forward_cancel(channel_subscription); + bridge_subscription = stasis_forward_cancel(bridge_subscription); + parking_subscription = stasis_forward_cancel(parking_subscription); +} + +/*! + * \brief Create the Stasis subcriptions for CDRs + */ +static int create_subscriptions(void) +{ + /* Use the CDR topic to determine if we've already created this */ + if (cdr_topic) { + return 0; + } + + cdr_topic = stasis_topic_create("cdr_engine"); + if (!cdr_topic) { + return -1; + } + + channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); + if (!channel_subscription) { + return -1; + } + bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic); + if (!bridge_subscription) { + return -1; + } + parking_subscription = stasis_forward_all(ast_parking_topic(), cdr_topic); + if (!parking_subscription) { + return -1; + } + + stasis_router = stasis_message_router_create(cdr_topic); + if (!stasis_router) { + return -1; + } + stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); + stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL); + + return 0; +} + static int process_config(int reload) { RAII_VAR(struct module_config *, mod_cfg, module_config_alloc(), ao2_cleanup); @@ -3889,12 +3950,7 @@ static int process_config(int reload) static void cdr_engine_cleanup(void) { - channel_subscription = stasis_forward_cancel(channel_subscription); - bridge_subscription = stasis_forward_cancel(bridge_subscription); - parking_subscription = stasis_forward_cancel(parking_subscription); - stasis_message_router_unsubscribe_and_join(stasis_router); - ao2_cleanup(cdr_topic); - cdr_topic = NULL; + destroy_subscriptions(); } static void cdr_engine_shutdown(void) @@ -3960,10 +4016,35 @@ static void cdr_container_print_fn(void *v_obj, void *where, ao2_prnt_fn *prnt) } } -int ast_cdr_engine_init(void) +/*! + * \brief Checks if CDRs are enabled and enables/disables the necessary options + */ +static int cdr_toggle_runtime_options(void) { - RAII_VAR(struct module_config *, mod_cfg, NULL, ao2_cleanup); + RAII_VAR(struct module_config *, mod_cfg, + ao2_global_obj_ref(module_configs), ao2_cleanup); + if (ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) { + if (create_subscriptions()) { + destroy_subscriptions(); + ast_log(AST_LOG_ERROR, "Failed to create Stasis subscriptions\n"); + return -1; + } + if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) { + cdr_enable_batch_mode(mod_cfg->general); + } else { + ast_log(LOG_NOTICE, "CDR simple logging enabled.\n"); + } + } else { + destroy_subscriptions(); + ast_log(LOG_NOTICE, "CDR logging disabled.\n"); + } + + return 0; +} + +int ast_cdr_engine_init(void) +{ if (process_config(0)) { return -1; } @@ -3975,34 +4056,6 @@ int ast_cdr_engine_init(void) } ao2_container_register("cdrs_by_channel", active_cdrs_by_channel, cdr_container_print_fn); - cdr_topic = stasis_topic_create("cdr_engine"); - if (!cdr_topic) { - return -1; - } - - channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); - if (!channel_subscription) { - return -1; - } - bridge_subscription = stasis_forward_all(ast_bridge_topic_all_cached(), cdr_topic); - if (!bridge_subscription) { - return -1; - } - parking_subscription = stasis_forward_all(ast_parking_topic(), cdr_topic); - if (!parking_subscription) { - return -1; - } - - stasis_router = stasis_message_router_create(cdr_topic); - if (!stasis_router) { - return -1; - } - stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); - stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL); - sched = ast_sched_context_create(); if (!sched) { ast_log(LOG_ERROR, "Unable to create schedule context.\n"); @@ -4013,19 +4066,7 @@ int ast_cdr_engine_init(void) ast_register_cleanup(cdr_engine_cleanup); ast_register_atexit(cdr_engine_shutdown); - mod_cfg = ao2_global_obj_ref(module_configs); - - if (ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) { - if (ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) { - cdr_enable_batch_mode(mod_cfg->general); - } else { - ast_log(LOG_NOTICE, "CDR simple logging enabled.\n"); - } - } else { - ast_log(LOG_NOTICE, "CDR logging disabled.\n"); - } - - return 0; + return cdr_toggle_runtime_options(); } void ast_cdr_engine_term(void) @@ -4063,17 +4104,7 @@ int ast_cdr_engine_reload(void) } } - if (ast_test_flag(&mod_cfg->general->settings, CDR_ENABLED)) { - if (!ast_test_flag(&mod_cfg->general->settings, CDR_BATCHMODE)) { - ast_log(LOG_NOTICE, "CDR simple logging enabled.\n"); - } else { - cdr_enable_batch_mode(mod_cfg->general); - } - } else { - ast_log(LOG_NOTICE, "CDR logging disabled, data will be lost.\n"); - } - - return 0; + return cdr_toggle_runtime_options(); } diff --git a/main/cel.c b/main/cel.c index 0d78b5cce..0b5e816d1 100644 --- a/main/cel.c +++ b/main/cel.c @@ -547,17 +547,6 @@ static int apps_handler(const struct aco_option *opt, struct ast_variable *var, return 0; } -static int do_reload(void) -{ - if (aco_process_config(&cel_cfg_info, 1) == ACO_PROCESS_ERROR) { - return -1; - } - - ast_verb(3, "CEL logging %sabled.\n", ast_cel_check_enabled() ? "en" : "dis"); - - return 0; -} - const char *ast_cel_get_type_name(enum ast_cel_event_type type) { return S_OR(cel_event_types[type], "Unknown"); @@ -1374,23 +1363,31 @@ static void cel_local_cb( cel_report_event(localone, AST_CEL_LOCAL_OPTIMIZE, NULL, extra, NULL); } -static void ast_cel_engine_term(void) +static void destroy_subscriptions(void) { - aco_info_destroy(&cel_cfg_info); - ao2_global_obj_release(cel_configs); - stasis_message_router_unsubscribe_and_join(cel_state_router); - cel_state_router = NULL; - ao2_cleanup(cel_aggregation_topic); - cel_aggregation_topic = NULL; - ao2_cleanup(cel_topic); - cel_topic = NULL; + stasis_message_router_unsubscribe_and_join(cel_state_router); + cel_state_router = NULL; + + ao2_cleanup(cel_aggregation_topic); + cel_aggregation_topic = NULL; + ao2_cleanup(cel_topic); + cel_topic = NULL; + cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder); cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder); cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder); cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder); - ast_cli_unregister(&cli_status); - ao2_cleanup(cel_dialstatus_store); - cel_dialstatus_store = NULL; +} + +static void ast_cel_engine_term(void) +{ + destroy_subscriptions(); + + aco_info_destroy(&cel_cfg_info); + ao2_global_obj_release(cel_configs); + ast_cli_unregister(&cli_status); + ao2_cleanup(cel_dialstatus_store); + cel_dialstatus_store = NULL; ao2_cleanup(linkedids); linkedids = NULL; ao2_cleanup(cel_backends); @@ -1398,29 +1395,12 @@ static void ast_cel_engine_term(void) STASIS_MESSAGE_TYPE_CLEANUP(cel_generic_type); } -int ast_cel_engine_init(void) +/*! + * \brief Create the Stasis subscriptions for CEL + */ +static int create_subscriptions(void) { int ret = 0; - if (!(linkedids = ast_str_container_alloc(NUM_APP_BUCKETS))) { - return -1; - } - - if (!(cel_dialstatus_store = ao2_container_alloc(NUM_DIALSTATUS_BUCKETS, dialstatus_hash, dialstatus_cmp))) { - return -1; - } - - if (STASIS_MESSAGE_TYPE_INIT(cel_generic_type)) { - return -1; - } - - if (ast_cli_register(&cli_status)) { - return -1; - } - - cel_backends = ao2_container_alloc(BACKEND_BUCKETS, cel_backend_hash, cel_backend_cmp); - if (!cel_backends) { - return -1; - } cel_aggregation_topic = stasis_topic_create("cel_aggregation_topic"); if (!cel_aggregation_topic) { @@ -1515,11 +1495,33 @@ int ast_cel_engine_init(void) cel_local_cb, NULL); - /* If somehow we failed to add any routes, just shut down the whole - * thing and fail it. - */ if (ret) { - ast_cel_engine_term(); + ast_log(AST_LOG_ERROR, "Failed to register for Stasis messages\n"); + } + + return ret; +} + +int ast_cel_engine_init(void) +{ + if (!(linkedids = ast_str_container_alloc(NUM_APP_BUCKETS))) { + return -1; + } + + if (!(cel_dialstatus_store = ao2_container_alloc(NUM_DIALSTATUS_BUCKETS, dialstatus_hash, dialstatus_cmp))) { + return -1; + } + + if (STASIS_MESSAGE_TYPE_INIT(cel_generic_type)) { + return -1; + } + + if (ast_cli_register(&cli_status)) { + return -1; + } + + cel_backends = ao2_container_alloc(BACKEND_BUCKETS, cel_backend_hash, cel_backend_cmp); + if (!cel_backends) { return -1; } @@ -1548,7 +1550,34 @@ int ast_cel_engine_init(void) } } - ast_register_cleanup(ast_cel_engine_term); + if (ast_cel_check_enabled() && create_subscriptions()) { + return -1; + } + + ast_register_atexit(&ast_cel_engine_term); + return 0; +} + +static int do_reload(void) +{ + unsigned int was_enabled = ast_cel_check_enabled(); + unsigned int is_enabled; + + if (aco_process_config(&cel_cfg_info, 1) == ACO_PROCESS_ERROR) { + return -1; + } + + is_enabled = ast_cel_check_enabled(); + + if (!was_enabled && is_enabled) { + if (create_subscriptions()) { + return -1; + } + } else if (was_enabled && !is_enabled) { + destroy_subscriptions(); + } + + ast_verb(3, "CEL logging %sabled.\n", is_enabled ? "en" : "dis"); return 0; } @@ -1596,12 +1625,21 @@ void ast_cel_set_config(struct ast_cel_general_config *config) { RAII_VAR(struct cel_config *, mod_cfg, ao2_global_obj_ref(cel_configs), ao2_cleanup); RAII_VAR(struct ast_cel_general_config *, cleanup_config, mod_cfg->general, ao2_cleanup); + int was_enabled = ast_cel_check_enabled(); + int is_enabled; if (mod_cfg) { mod_cfg->general = config; if (mod_cfg->general) { ao2_ref(mod_cfg->general, +1); } + + is_enabled = ast_cel_check_enabled(); + if (!was_enabled && is_enabled) { + create_subscriptions(); + } else if (was_enabled && !is_enabled) { + destroy_subscriptions(); + } } } diff --git a/main/manager.c b/main/manager.c index 69def4b1f..f06b7dbca 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1106,6 +1106,7 @@ static int timestampevents; static int httptimeout = 60; static int broken_events_action = 0; static int manager_enabled = 0; +static int subscribed = 0; static int webmanager_enabled = 0; static int manager_debug = 0; /*!< enable some debugging code in the manager */ static int authtimeout; @@ -7794,16 +7795,8 @@ static void manager_shutdown(void) */ static int manager_subscriptions_init(void) { - int res; + int res = 0; - res = STASIS_MESSAGE_TYPE_INIT(ast_manager_get_generic_type); - if (res != 0) { - return -1; - } - manager_topic = stasis_topic_create("manager_topic"); - if (!manager_topic) { - return -1; - } rtp_topic_forwarder = stasis_forward_all(ast_rtp_topic(), manager_topic); if (!rtp_topic_forwarder) { return -1; @@ -7826,6 +7819,36 @@ static int manager_subscriptions_init(void) return 0; } +static int subscribe_all(void) +{ + if (manager_subscriptions_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager subscriptions\n"); + return -1; + } + if (manager_system_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager system handling\n"); + return -1; + } + if (manager_channels_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager channel handling\n"); + return -1; + } + if (manager_mwi_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager MWI handling\n"); + return -1; + } + if (manager_bridging_init()) { + return -1; + } + if (manager_endpoints_init()) { + ast_log(AST_LOG_ERROR, "Failed to initialize manager endpoints handling\n"); + return -1; + } + + subscribed = 1; + return 0; +} + static int __init_manager(int reload, int by_external_config) { struct ast_config *ucfg = NULL, *cfg = NULL; @@ -7848,27 +7871,12 @@ static int __init_manager(int reload, int by_external_config) manager_enabled = 0; if (!reload) { - if (manager_subscriptions_init()) { - ast_log(AST_LOG_ERROR, "Failed to initialize manager subscriptions\n"); - return -1; - } - if (manager_system_init()) { - ast_log(AST_LOG_ERROR, "Failed to initialize manager system handling\n"); - return -1; - } - if (manager_channels_init()) { - ast_log(AST_LOG_ERROR, "Failed to initialize manager channel handling\n"); - return -1; - } - if (manager_mwi_init()) { - ast_log(AST_LOG_ERROR, "Failed to initialize manager MWI handling\n"); - return -1; - } - if (manager_bridging_init()) { + int res = STASIS_MESSAGE_TYPE_INIT(ast_manager_get_generic_type); + if (res != 0) { return -1; } - if (manager_endpoints_init()) { - ast_log(AST_LOG_ERROR, "Failed to initialize manager endpoints handling\n"); + manager_topic = stasis_topic_create("manager_topic"); + if (!manager_topic) { return -1; } } @@ -8056,6 +8064,13 @@ static int __init_manager(int reload, int by_external_config) } } + if (manager_enabled && !subscribed) { + if (subscribe_all() != 0) { + ast_log(LOG_ERROR, "Manager subscription error\n"); + return -1; + } + } + ast_sockaddr_copy(&amis_desc_local_address_tmp, &amis_desc.local_address); /* if the amis address has not been set, default is the same as non secure ami */ -- cgit v1.2.3