summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-01-12 22:13:12 +0000
committerMatthew Jordan <mjordan@digium.com>2014-01-12 22:13:12 +0000
commit373965dbff734b96132f61741838d6797c4a1876 (patch)
treedd6da807b85c32aa86f9bf7ac86b1b62152558c0
parentf8aaf585a39d496479eb8a4e55f2e327d02b37ca (diff)
CDRs: Synchronize dialplan applications that manipulate CDRs with the engine
In https://reviewboard.asterisk.org/r/3057/, applications and functions that manipulate CDRs were made to interact over Stasis. This was done to synchronize manipulations of CDRs from the dialplan with the updates the engine itself receives over the message bus. This change rested on a faulty premise: that messages published to the CDR topic or to a topic that forwards to the CDR topic are synchronized with the messages handled by the CDR topic subscription in the CDR engine. This is not the case. There is no ordering guaranteed for two messages published to the same topic; ordering is only guaranteed if a message is published to the same subscriber. Stasis was modified in r405311 to allow a publisher to synchronize on the subscriber. This patch uses that API to synchronize the CDR publishers with the CDR engine message router, which maintains the overall topic subscription. (closes issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405312 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405314 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--apps/app_cdr.c32
-rw-r--r--apps/app_forkcdr.c42
-rw-r--r--funcs/func_cdr.c90
-rw-r--r--include/asterisk/cdr.h14
-rw-r--r--main/cdr.c59
5 files changed, 149 insertions, 88 deletions
diff --git a/apps/app_cdr.c b/apps/app_cdr.c
index 2793846f9..bf4961a53 100644
--- a/apps/app_cdr.c
+++ b/apps/app_cdr.c
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/module.h"
#include "asterisk/app.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="NoCDR" language="en_US">
@@ -167,25 +168,22 @@ static void appcdr_callback(void *data, struct stasis_subscription *sub, struct
static int publish_app_cdr_message(struct ast_channel *chan, struct app_cdr_message_payload *payload)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- message = stasis_message_create(appcdr_message_type(), payload);
- if (!message) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
- payload->channel_name);
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
return -1;
}
- subscription = stasis_subscribe(ast_channel_topic(chan), appcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ message = stasis_message_create(appcdr_message_type(), payload);
+ if (!message) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create message\n",
payload->channel_name);
return -1;
}
+ stasis_message_router_publish_sync(router, message);
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
return 0;
}
@@ -236,6 +234,11 @@ static int nocdr_exec(struct ast_channel *chan, const char *data)
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (router) {
+ stasis_message_router_remove(router, appcdr_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(appcdr_message_type);
ast_unregister_application(nocdr_app);
ast_unregister_application(resetcdr_app);
@@ -244,11 +247,18 @@ static int unload_module(void)
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(appcdr_message_type);
res |= ast_register_application_xml(nocdr_app, nocdr_exec);
res |= ast_register_application_xml(resetcdr_app, resetcdr_exec);
+ res |= stasis_message_router_add(router, appcdr_message_type(),
+ appcdr_callback, NULL);
if (res) {
return AST_MODULE_LOAD_FAILURE;
diff --git a/apps/app_forkcdr.c b/apps/app_forkcdr.c
index af5ae6a1c..932e862e1 100644
--- a/apps/app_forkcdr.c
+++ b/apps/app_forkcdr.c
@@ -41,6 +41,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/module.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<application name="ForkCDR" language="en_US">
@@ -136,7 +137,7 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct fork_cdr_message_payload *, payload, ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
char *parse;
struct ast_flags flags = { 0, };
@@ -156,6 +157,12 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
return -1;
}
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
payload->channel_name = ast_channel_name(chan);
payload->flags = &flags;
message = stasis_message_create(forkcdr_message_type(), payload);
@@ -164,36 +171,41 @@ static int forkcdr_exec(struct ast_channel *chan, const char *data)
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), forkcdr_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to fork CDR for channel %s: unable to create subscription\n",
- payload->channel_name);
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
static int unload_module(void)
{
- STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- return ast_unregister_application(app);
+ if (router) {
+ stasis_message_router_remove(router, forkcdr_message_type());
+ }
+ STASIS_MESSAGE_TYPE_CLEANUP(forkcdr_message_type);
+ ast_unregister_application(app);
+ return 0;
}
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(forkcdr_message_type);
res |= ast_register_application_xml(app, forkcdr_exec);
+ res |= stasis_message_router_add(router, forkcdr_message_type(),
+ forkcdr_callback, NULL);
- return res;
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Fork The CDR into 2 separate entities");
diff --git a/funcs/func_cdr.c b/funcs/func_cdr.c
index 3f248168c..61932bf76 100644
--- a/funcs/func_cdr.c
+++ b/funcs/func_cdr.c
@@ -40,6 +40,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/app.h"
#include "asterisk/cdr.h"
#include "asterisk/stasis.h"
+#include "asterisk/stasis_message_router.h"
/*** DOCUMENTATION
<function name="CDR" language="en_US">
@@ -207,6 +208,7 @@ struct cdr_func_payload {
const char *cmd;
const char *arguments;
const char *value;
+ void *data;
};
struct cdr_func_data {
@@ -220,8 +222,8 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(cdr_prop_write_message_type);
static void cdr_read_callback(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
- struct cdr_func_data *output = data;
struct cdr_func_payload *payload = stasis_message_data(message);
+ struct cdr_func_data *output;
char *info;
char *value = NULL;
struct ast_flags flags = { 0 };
@@ -235,9 +237,9 @@ static void cdr_read_callback(void *data, struct stasis_subscription *sub, struc
return;
}
- if (!payload || !output) {
- return;
- }
+ ast_assert(payload != NULL);
+ output = payload->data;
+ ast_assert(output != NULL);
if (ast_strlen_zero(payload->arguments)) {
ast_log(AST_LOG_WARNING, "%s requires a variable (%s(variable[,option]))\n)",
@@ -441,6 +443,7 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
payload->chan = chan;
payload->cmd = cmd;
payload->arguments = parse;
+ payload->data = &output;
buf[0] = '\0';/* Ensure the buffer is initialized. */
output.buf = buf;
@@ -460,18 +463,14 @@ static int cdr_read(struct ast_channel *chan, const char *cmd, char *parse,
if (ast_strlen_zero(ast_channel_name(chan))) {
cdr_read_callback(NULL, NULL, message);
} else {
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_read_callback, &output);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
ast_channel_name(chan));
return -1;
}
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
}
return 0;
@@ -482,8 +481,15 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
- ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router,
+ ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
@@ -499,17 +505,7 @@ static int cdr_write(struct ast_channel *chan, const char *cmd, char *parse,
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -520,7 +516,13 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct cdr_func_payload *, payload,
ao2_alloc(sizeof(*payload), NULL), ao2_cleanup);
- RAII_VAR(struct stasis_subscription *, subscription, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
+
+ if (!router) {
+ ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: no message router\n",
+ ast_channel_name(chan));
+ return -1;
+ }
if (!payload) {
return -1;
@@ -536,17 +538,7 @@ static int cdr_prop_write(struct ast_channel *chan, const char *cmd, char *parse
ast_channel_name(chan));
return -1;
}
-
- subscription = stasis_subscribe(ast_channel_topic(chan), cdr_prop_write_callback, NULL);
- if (!subscription) {
- ast_log(AST_LOG_WARNING, "Failed to manipulate CDR for channel %s: unable to create subscription\n",
- ast_channel_name(chan));
- return -1;
- }
-
- stasis_publish(ast_channel_topic(chan), message);
-
- subscription = stasis_unsubscribe_and_join(subscription);
+ stasis_message_router_publish_sync(router, message);
return 0;
}
@@ -565,8 +557,14 @@ static struct ast_custom_function cdr_prop_function = {
static int unload_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (router) {
+ stasis_message_router_remove(router, cdr_prop_write_message_type());
+ stasis_message_router_remove(router, cdr_write_message_type());
+ stasis_message_router_remove(router, cdr_read_message_type());
+ }
STASIS_MESSAGE_TYPE_CLEANUP(cdr_read_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_write_message_type);
STASIS_MESSAGE_TYPE_CLEANUP(cdr_prop_write_message_type);
@@ -578,15 +576,29 @@ static int unload_module(void)
static int load_module(void)
{
+ RAII_VAR(struct stasis_message_router *, router, ast_cdr_message_router(), ao2_cleanup);
int res = 0;
+ if (!router) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
res |= STASIS_MESSAGE_TYPE_INIT(cdr_read_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_write_message_type);
res |= STASIS_MESSAGE_TYPE_INIT(cdr_prop_write_message_type);
res |= ast_custom_function_register(&cdr_function);
res |= ast_custom_function_register(&cdr_prop_function);
-
- return res;
+ res |= stasis_message_router_add(router, cdr_prop_write_message_type(),
+ cdr_prop_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_write_message_type(),
+ cdr_write_callback, NULL);
+ res |= stasis_message_router_add(router, cdr_read_message_type(),
+ cdr_read_callback, NULL);
+
+ if (res) {
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Call Detail Record (CDR) dialplan functions");
diff --git a/include/asterisk/cdr.h b/include/asterisk/cdr.h
index ad7e5cc69..5654c3847 100644
--- a/include/asterisk/cdr.h
+++ b/include/asterisk/cdr.h
@@ -467,6 +467,20 @@ int ast_cdr_is_enabled(void);
*/
struct ast_cdr *ast_cdr_alloc(void);
+struct stasis_message_router;
+
+/*!
+ * \brief Return the message router for the CDR engine
+ *
+ * This returns the \ref stasis_message_router that the CDR engine
+ * uses for dispatching \ref stasis messages. The reference on the
+ * message router is bumped and must be released by the caller of
+ * this function.
+ *
+ * \retval NULL if the CDR engine is disabled or unavailable
+ * \retval the \ref stasis_message_router otherwise
+ */
+struct stasis_message_router *ast_cdr_message_router(void);
/*!
* \brief Duplicate a public CDR
diff --git a/main/cdr.c b/main/cdr.c
index 593f47152..d83d50692 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -3915,17 +3915,21 @@ static void finalize_batch_mode(void)
ast_cdr_engine_term();
}
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+ if (!stasis_router) {
+ return NULL;
+ }
+
+ ao2_bump(stasis_router);
+ return stasis_router;
+}
+
/*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+ * \brief Destroy the active Stasis subscriptions
*/
static void destroy_subscriptions(void)
{
- stasis_message_router_unsubscribe_and_join(stasis_router);
- stasis_router = NULL;
-
- ao2_cleanup(cdr_topic);
- cdr_topic = NULL;
-
channel_subscription = stasis_forward_cancel(channel_subscription);
bridge_subscription = stasis_forward_cancel(bridge_subscription);
parking_subscription = stasis_forward_cancel(parking_subscription);
@@ -3936,16 +3940,14 @@ static void destroy_subscriptions(void)
*/
static int create_subscriptions(void)
{
- /* Use the CDR topic to determine if we've already created this */
- if (cdr_topic) {
- return 0;
- }
-
- cdr_topic = stasis_topic_create("cdr_engine");
if (!cdr_topic) {
return -1;
}
+ if (channel_subscription || bridge_subscription || parking_subscription) {
+ return 0;
+ }
+
channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
@@ -3959,16 +3961,6 @@ static int create_subscriptions(void)
return -1;
}
- stasis_router = stasis_message_router_create(cdr_topic);
- if (!stasis_router) {
- return -1;
- }
- stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
- stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
-
return 0;
}
@@ -4019,6 +4011,12 @@ static void cdr_engine_cleanup(void)
static void cdr_engine_shutdown(void)
{
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+
+ ao2_cleanup(cdr_topic);
+ cdr_topic = NULL;
+
ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
NULL);
finalize_batch_mode();
@@ -4113,6 +4111,21 @@ int ast_cdr_engine_init(void)
return -1;
}
+ cdr_topic = stasis_topic_create("cdr_engine");
+ if (!cdr_topic) {
+ return -1;
+ }
+
+ stasis_router = stasis_message_router_create(cdr_topic);
+ if (!stasis_router) {
+ return -1;
+ }
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+ stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+
active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
if (!active_cdrs_by_channel) {