summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--apps/app_cdr.c32
-rw-r--r--apps/app_forkcdr.c42
-rw-r--r--funcs/func_cdr.c90
-rw-r--r--include/asterisk/cdr.h14
-rw-r--r--main/cdr.c59
5 files changed, 149 insertions, 88 deletions
diff --git a/apps/app_cdr.c b/apps/app_cdr.c
index 2793846f9..bf4961a53 100644
--- a/apps/app_cdr.c
+++ b/apps/app_cdr.c
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/module.h"
#include "asterisk/app.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="NoCDR" language="en_US">
@@ -167,25 +168,22 @@ static void appcdr_callback(void *data, struct stasis_subscription *sub, struct
static int publish_app_cdr_message(struct ast_channel *chan, struct app_cdr_message_payload *payload)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- message = stasis_message_create(appcdr_message_type(), payload);
- if (!message) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
- payload->channel_name);
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
return -1;
}
- subscription = stasis_subscribe(ast_channel_topic(chan), appcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ message = stasis_message_create(appcdr_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
payload->channel_name);
return -1;
}
+ stasis_message_router_publish_sync(router, message);
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
return 0;
}
@@ -236,6 +234,11 @@ static int nocdr_exec(struct ast_channel *chan, const char *data)
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (router) {
+ stasis_message_router_remove(router, appcdr_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(appcdr_message_type);
ast_unregister_application(nocdr_app);
ast_unregister_application(resetcdr_app);
@@ -244,11 +247,18 @@ static int unload_module(void)
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(appcdr_message_type);
res |= ast_register_application_xml(nocdr_app, nocdr_exec);
res |= ast_register_application_xml(resetcdr_app, resetcdr_exec);
+ res |= stasis_message_router_add(router, appcdr_message_type(),
+ appcdr_callback, NULL);
if (res) {
return AST_MODULE_LOAD_FAILURE;
diff --git a/apps/app_forkcdr.c b/apps/app_forkcdr.c
index af5ae6a1c..932e862e1 100644
--- a/apps/app_forkcdr.c
+++ b/apps/app_forkcdr.c
@@ -41,6 +41,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="ForkCDR" language="en_US">
@@ -136,7 +137,7 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
char *parse;
struct ast_flags flags = { 0, };
@@ -156,6 +157,12 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
return -1;
}
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
payload->channel_name = ast_channel_name(chan);
payload->flags = &flags;
message = stasis_message_create(forkcdr_message_type(), payload);
@@ -164,36 +171,41 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n",
- payload->channel_name);
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
static int unload_module(void)
{
- STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- return ast_unregister_application(app);
+ if (router) {
+ stasis_message_router_remove(router, forkcdr_message_type());
+ }
+ STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ ast_unregister_application(app);
+ return 0;
}
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type);
res |= ast_register_application_xml(app, forkcdr_exec);
+ res |= stasis_message_router_add(router, forkcdr_message_type(),
+ forkcdr_callback, NULL);
- return res;
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities");
diff --git a/funcs/func_cdr.c b/funcs/func_cdr.c
index 3f248168c..61932bf76 100644
--- a/funcs/func_cdr.c
+++ b/funcs/func_cdr.c
@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/cdr.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<function name="CDR" language="en_US">
@@ -207,6 +208,7 @@ struct cdr_func_payload {
const char *cmd;
const char *arguments;
const char *value;
+ void *data;
};
struct cdr_func_data {
@@ -220,8 +222,8 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_prop_write_message_type);
static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
- struct cdr_func_data *output = data;
struct cdr_func_payload *payload = stasis_message_data(message);
+ struct cdr_func_data *output;
char *info;
char *value = NULL;
struct ast_flags flags = { 0 };
@@ -235,9 +237,9 @@ static void cdr_read_callback(void *data, struct stasis_subscription *sub, struc
return;
}
- if (!payload || !output) {
- return;
- }
+ ast_assert(payload != NULL);
+ output = payload->data;
+ ast_assert(output != NULL);
if (ast_strlen_zero(payload->arguments)) {
ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
@@ -441,6 +443,7 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
payload->chan = chan;
payload->cmd = cmd;
payload->arguments = parse;
+ payload->data = &output;
buf[0] = '\0';/* Ensure the buffer is initialized. */
output.buf = buf;
@@ -460,18 +463,14 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (ast_strlen_zero(ast_channel_name(chan))) {
cdr_read_callback(NULL, NULL, message);
} else {
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
}
return 0;
@@ -482,8 +481,15 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
- ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router,
+ ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
@@ -499,17 +505,7 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -520,7 +516,13 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
@@ -536,17 +538,7 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -565,8 +557,14 @@ static struct ast_custom_function cdr_prop_function = {
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (router) {
+ stasis_message_router_remove(router, cdr_prop_write_message_type());
+ stasis_message_router_remove(router, cdr_write_message_type());
+ stasis_message_router_remove(router, cdr_read_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
@@ -578,15 +576,29 @@ static int unload_module(void)
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
res |= ast_custom_function_register(&cdr_function);
res |= ast_custom_function_register(&cdr_prop_function);
-
- return res;
+ res |= stasis_message_router_add(router, cdr_prop_write_message_type(),
+ cdr_prop_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_write_message_type(),
+ cdr_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_read_message_type(),
+ cdr_read_callback, NULL);
+
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions");
diff --git a/include/asterisk/cdr.h b/include/asterisk/cdr.h
index ad7e5cc69..5654c3847 100644
--- a/include/asterisk/cdr.h
+++ b/include/asterisk/cdr.h
@@ -467,6 +467,20 @@ int ast_cdr_is_enabled(void);
*/
struct ast_cdr *ast_cdr_alloc(void);
+struct stasis_message_router;
+
+/*!
+ * \brief Return the message router for the CDR engine
+ *
+ * This returns the \ref stasis_message_router that the CDR engine
+ * uses for dispatching \ref stasis messages. The reference on the
+ * message router is bumped and must be released by the caller of
+ * this function.
+ *
+ * \retval NULL if the CDR engine is disabled or unavailable
+ * \retval the \ref stasis_message_router otherwise
+ */
+struct stasis_message_router *ast_cdr_message_router(void);
/*!
* \brief Duplicate a public CDR
diff --git a/main/cdr.c b/main/cdr.c
index 593f47152..d83d50692 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -3915,17 +3915,21 @@ static void finalize_batch_mode(void)
ast_cdr_engine_term();
}
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+ if (!stasis_router) {
+ return NULL;
+ }
+
+ ao2_bump(stasis_router);
+ return stasis_router;
+}
+
/*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+ * \brief Destroy the active Stasis subscriptions
*/
static void destroy_subscriptions(void)
{
- stasis_message_router_unsubscribe_and_join(stasis_router);
- stasis_router = NULL;
-
- ao2_cleanup(cdr_topic);
- cdr_topic = NULL;
-
channel_subscription = stasis_forward_cancel(channel_subscription);
bridge_subscription = stasis_forward_cancel(bridge_subscription);
parking_subscription = stasis_forward_cancel(parking_subscription);
@@ -3936,16 +3940,14 @@ static void destroy_subscriptions(void)
*/
static int create_subscriptions(void)
{
- /* Use the CDR topic to determine if we've already created this */
- if (cdr_topic) {
- return 0;
- }
-
- cdr_topic = stasis_topic_create("cdr_engine");
if (!cdr_topic) {
return -1;
}
+ if (channel_subscription || bridge_subscription || parking_subscription) {
+ return 0;
+ }
+
channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
@@ -3959,16 +3961,6 @@ static int create_subscriptions(void)
return -1;
}
- stasis_router = stasis_message_router_create(cdr_topic);
- if (!stasis_router) {
- return -1;
- }
- stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
- stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
-
return 0;
}
@@ -4019,6 +4011,12 @@ static void cdr_engine_cleanup(void)
static void cdr_engine_shutdown(void)
{
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+
+ ao2_cleanup(cdr_topic);
+ cdr_topic = NULL;
+
ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
NULL);
finalize_batch_mode();
@@ -4113,6 +4111,21 @@ int ast_cdr_engine_init(void)
return -1;
}
+ cdr_topic = stasis_topic_create("cdr_engine");
+ if (!cdr_topic) {
+ return -1;
+ }
+
+ stasis_router = stasis_message_router_create(cdr_topic);
+ if (!stasis_router) {
+ return -1;
+ }
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+ stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+
active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
if (!active_cdrs_by_channel) {