diff options
-rw-r--r-- | apps/app_cdr.c | 32 | ||||
-rw-r--r-- | apps/app_forkcdr.c | 42 | ||||
-rw-r--r-- | funcs/func_cdr.c | 90 | ||||
-rw-r--r-- | include/asterisk/cdr.h | 14 | ||||
-rw-r--r-- | main/cdr.c | 59 |
5 files changed, 149 insertions, 88 deletions
diff --git a/apps/app_cdr.c b/apps/app_cdr.c index 2793846f9..bf4961a53 100644 --- a/apps/app_cdr.c +++ b/apps/app_cdr.c @@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/module.h" #include "asterisk/app.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" /*** DOCUMENTATION <application name="NoCDR" language="en_US"> @@ -167,25 +168,22 @@ static void appcdr_callback(void *data, struct stasis_subscription *sub, struct static int publish_app_cdr_message(struct ast_channel *chan, struct app_cdr_message_payload *payload) { RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); - RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); - message = stasis_message_create(appcdr_message_type(), payload); - if (!message) { - ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n", - payload->channel_name); + if (!router) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n", + ast_channel_name(chan)); return -1; } - subscription = stasis_subscribe(ast_channel_topic(chan), appcdr_callback, NULL); - if (!subscription) { - ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n", + message = stasis_message_create(appcdr_message_type(), payload); + if (!message) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n", payload->channel_name); return -1; } + stasis_message_router_publish_sync(router, message); - stasis_publish(ast_channel_topic(chan), message); - - subscription = stasis_unsubscribe_and_join(subscription); return 0; } @@ -236,6 +234,11 @@ static int nocdr_exec(struct ast_channel *chan, const char *data) static int unload_module(void) { + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); + + if (router) { + stasis_message_router_remove(router, appcdr_message_type()); + } STASIS_MESSAGE_TYPE_CLEANUP(appcdr_message_type); ast_unregister_application(nocdr_app); ast_unregister_application(resetcdr_app); @@ -244,11 +247,18 @@ static int unload_module(void) static int load_module(void) { + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); int res = 0; + if (!router) { + return AST_MODULE_LOAD_FAILURE; + } + res |= STASIS_MESSAGE_TYPE_INIT(appcdr_message_type); res |= ast_register_application_xml(nocdr_app, nocdr_exec); res |= ast_register_application_xml(resetcdr_app, resetcdr_exec); + res |= stasis_message_router_add(router, appcdr_message_type(), + appcdr_callback, NULL); if (res) { return AST_MODULE_LOAD_FAILURE; diff --git a/apps/app_forkcdr.c b/apps/app_forkcdr.c index af5ae6a1c..932e862e1 100644 --- a/apps/app_forkcdr.c +++ b/apps/app_forkcdr.c @@ -41,6 +41,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/app.h" #include "asterisk/module.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" /*** DOCUMENTATION <application name="ForkCDR" language="en_US"> @@ -136,7 +137,7 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data) { RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup); - RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); char *parse; struct ast_flags flags = { 0, }; @@ -156,6 +157,12 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data) return -1; } + if (!router) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n", + ast_channel_name(chan)); + return -1; + } + payload->channel_name = ast_channel_name(chan); payload->flags = &flags; message = stasis_message_create(forkcdr_message_type(), payload); @@ -164,36 +171,41 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data) ast_channel_name(chan)); return -1; } - - subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL); - if (!subscription) { - ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n", - payload->channel_name); - return -1; - } - - stasis_publish(ast_channel_topic(chan), message); - - subscription = stasis_unsubscribe_and_join(subscription); + stasis_message_router_publish_sync(router, message); return 0; } static int unload_module(void) { - STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type); + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); - return ast_unregister_application(app); + if (router) { + stasis_message_router_remove(router, forkcdr_message_type()); + } + STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type); + ast_unregister_application(app); + return 0; } static int load_module(void) { + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); int res = 0; + if (!router) { + return AST_MODULE_LOAD_FAILURE; + } + res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type); res |= ast_register_application_xml(app, forkcdr_exec); + res |= stasis_message_router_add(router, forkcdr_message_type(), + forkcdr_callback, NULL); - return res; + if (res) { + return AST_MODULE_LOAD_FAILURE; + } + return AST_MODULE_LOAD_SUCCESS; } AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities"); diff --git a/funcs/func_cdr.c b/funcs/func_cdr.c index 3f248168c..61932bf76 100644 --- a/funcs/func_cdr.c +++ b/funcs/func_cdr.c @@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/app.h" #include "asterisk/cdr.h" #include "asterisk/stasis.h" +#include "asterisk/stasis_message_router.h" /*** DOCUMENTATION <function name="CDR" language="en_US"> @@ -207,6 +208,7 @@ struct cdr_func_payload { const char *cmd; const char *arguments; const char *value; + void *data; }; struct cdr_func_data { @@ -220,8 +222,8 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_prop_write_message_type); static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - struct cdr_func_data *output = data; struct cdr_func_payload *payload = stasis_message_data(message); + struct cdr_func_data *output; char *info; char *value = NULL; struct ast_flags flags = { 0 }; @@ -235,9 +237,9 @@ static void cdr_read_callback(void *data, struct stasis_subscription *sub, struc return; } - if (!payload || !output) { - return; - } + ast_assert(payload != NULL); + output = payload->data; + ast_assert(output != NULL); if (ast_strlen_zero(payload->arguments)) { ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)", @@ -441,6 +443,7 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse, payload->chan = chan; payload->cmd = cmd; payload->arguments = parse; + payload->data = &output; buf[0] = '\0';/* Ensure the buffer is initialized. */ output.buf = buf; @@ -460,18 +463,14 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse, if (ast_strlen_zero(ast_channel_name(chan))) { cdr_read_callback(NULL, NULL, message); } else { - RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); - subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output); - if (!subscription) { - ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n", + if (!router) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n", ast_channel_name(chan)); return -1; } - - stasis_publish(ast_channel_topic(chan), message); - - subscription = stasis_unsubscribe_and_join(subscription); + stasis_message_router_publish_sync(router, message); } return 0; @@ -482,8 +481,15 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse, { RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); RAII_VAR(struct cdr_func_payload *, payload, - ao2_alloc(sizeof(*payload), NULL), ao2_cleanup); - RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup); + ao2_alloc(sizeof(*payload), NULL), ao2_cleanup); + RAII_VAR(struct stasis_message_router *, router, + ast_cdr_message_router(), ao2_cleanup); + + if (!router) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n", + ast_channel_name(chan)); + return -1; + } if (!payload) { return -1; @@ -499,17 +505,7 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse, ast_channel_name(chan)); return -1; } - - subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL); - if (!subscription) { - ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n", - ast_channel_name(chan)); - return -1; - } - - stasis_publish(ast_channel_topic(chan), message); - - subscription = stasis_unsubscribe_and_join(subscription); + stasis_message_router_publish_sync(router, message); return 0; } @@ -520,7 +516,13 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); RAII_VAR(struct cdr_func_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup); - RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); + + if (!router) { + ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n", + ast_channel_name(chan)); + return -1; + } if (!payload) { return -1; @@ -536,17 +538,7 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse ast_channel_name(chan)); return -1; } - - subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL); - if (!subscription) { - ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n", - ast_channel_name(chan)); - return -1; - } - - stasis_publish(ast_channel_topic(chan), message); - - subscription = stasis_unsubscribe_and_join(subscription); + stasis_message_router_publish_sync(router, message); return 0; } @@ -565,8 +557,14 @@ static struct ast_custom_function cdr_prop_function = { static int unload_module(void) { + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); int res = 0; + if (router) { + stasis_message_router_remove(router, cdr_prop_write_message_type()); + stasis_message_router_remove(router, cdr_write_message_type()); + stasis_message_router_remove(router, cdr_read_message_type()); + } STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type); STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type); STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type); @@ -578,15 +576,29 @@ static int unload_module(void) static int load_module(void) { + RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup); int res = 0; + if (!router) { + return AST_MODULE_LOAD_FAILURE; + } + res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type); res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type); res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type); res |= ast_custom_function_register(&cdr_function); res |= ast_custom_function_register(&cdr_prop_function); - - return res; + res |= stasis_message_router_add(router, cdr_prop_write_message_type(), + cdr_prop_write_callback, NULL); + res |= stasis_message_router_add(router, cdr_write_message_type(), + cdr_write_callback, NULL); + res |= stasis_message_router_add(router, cdr_read_message_type(), + cdr_read_callback, NULL); + + if (res) { + return AST_MODULE_LOAD_FAILURE; + } + return AST_MODULE_LOAD_SUCCESS; } AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions"); diff --git a/include/asterisk/cdr.h b/include/asterisk/cdr.h index ad7e5cc69..5654c3847 100644 --- a/include/asterisk/cdr.h +++ b/include/asterisk/cdr.h @@ -467,6 +467,20 @@ int ast_cdr_is_enabled(void); */ struct ast_cdr *ast_cdr_alloc(void); +struct stasis_message_router; + +/*! + * \brief Return the message router for the CDR engine + * + * This returns the \ref stasis_message_router that the CDR engine + * uses for dispatching \ref stasis messages. The reference on the + * message router is bumped and must be released by the caller of + * this function. + * + * \retval NULL if the CDR engine is disabled or unavailable + * \retval the \ref stasis_message_router otherwise + */ +struct stasis_message_router *ast_cdr_message_router(void); /*! * \brief Duplicate a public CDR diff --git a/main/cdr.c b/main/cdr.c index 593f47152..d83d50692 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -3915,17 +3915,21 @@ static void finalize_batch_mode(void) ast_cdr_engine_term(); } +struct stasis_message_router *ast_cdr_message_router(void) +{ + if (!stasis_router) { + return NULL; + } + + ao2_bump(stasis_router); + return stasis_router; +} + /*! - * \brief Destroy the active Stasis subscriptions/router/topics + * \brief Destroy the active Stasis subscriptions */ static void destroy_subscriptions(void) { - stasis_message_router_unsubscribe_and_join(stasis_router); - stasis_router = NULL; - - ao2_cleanup(cdr_topic); - cdr_topic = NULL; - channel_subscription = stasis_forward_cancel(channel_subscription); bridge_subscription = stasis_forward_cancel(bridge_subscription); parking_subscription = stasis_forward_cancel(parking_subscription); @@ -3936,16 +3940,14 @@ static void destroy_subscriptions(void) */ static int create_subscriptions(void) { - /* Use the CDR topic to determine if we've already created this */ - if (cdr_topic) { - return 0; - } - - cdr_topic = stasis_topic_create("cdr_engine"); if (!cdr_topic) { return -1; } + if (channel_subscription || bridge_subscription || parking_subscription) { + return 0; + } + channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); if (!channel_subscription) { return -1; @@ -3959,16 +3961,6 @@ static int create_subscriptions(void) return -1; } - stasis_router = stasis_message_router_create(cdr_topic); - if (!stasis_router) { - return -1; - } - stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); - stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); - stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL); - return 0; } @@ -4019,6 +4011,12 @@ static void cdr_engine_cleanup(void) static void cdr_engine_shutdown(void) { + stasis_message_router_unsubscribe_and_join(stasis_router); + stasis_router = NULL; + + ao2_cleanup(cdr_topic); + cdr_topic = NULL; + ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb, NULL); finalize_batch_mode(); @@ -4113,6 +4111,21 @@ int ast_cdr_engine_init(void) return -1; } + cdr_topic = stasis_topic_create("cdr_engine"); + if (!cdr_topic) { + return -1; + } + + stasis_router = stasis_message_router_create(cdr_topic); + if (!stasis_router) { + return -1; + } + stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); + stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL); + active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS, cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn); if (!active_cdrs_by_channel) { |