summaryrefslogtreecommitdiff
path: root/main/cdr.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-01-12 22:13:12 +0000
committerMatthew Jordan <mjordan@digium.com>2014-01-12 22:13:12 +0000
commit373965dbff734b96132f61741838d6797c4a1876 (patch)
treedd6da807b85c32aa86f9bf7ac86b1b62152558c0 /main/cdr.c
parentf8aaf585a39d496479eb8a4e55f2e327d02b37ca (diff)
CDRs: Synchronize dialplan applications that manipulate CDRs with the engine
In https://reviewboard.asterisk.org/r/3057/, applications and functions that manipulate CDRs were made to interact over Stasis. This was done to synchronize manipulations of CDRs from the dialplan with the updates the engine itself receives over the message bus. This change rested on a faulty premise: that messages published to the CDR topic or to a topic that forwards to the CDR topic are synchronized with the messages handled by the CDR topic subscription in the CDR engine. This is not the case. There is no ordering guaranteed for two messages published to the same topic; ordering is only guaranteed if a message is published to the same subscriber. Stasis was modified in r405311 to allow a publisher to synchronize on the subscriber. This patch uses that API to synchronize the CDR publishers with the CDR engine message router, which maintains the overall topic subscription. (closes issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405312 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405314 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/cdr.c')
-rw-r--r--main/cdr.c59
1 files changed, 36 insertions, 23 deletions
diff --git a/main/cdr.c b/main/cdr.c
index 593f47152..d83d50692 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -3915,17 +3915,21 @@ static void finalize_batch_mode(void)
ast_cdr_engine_term();
}
+struct stasis_message_router *ast_cdr_message_router(void)
+{
+ if (!stasis_router) {
+ return NULL;
+ }
+
+ ao2_bump(stasis_router);
+ return stasis_router;
+}
+
/*!
- * \brief Destroy the active Stasis subscriptions/router/topics
+ * \brief Destroy the active Stasis subscriptions
*/
static void destroy_subscriptions(void)
{
- stasis_message_router_unsubscribe_and_join(stasis_router);
- stasis_router = NULL;
-
- ao2_cleanup(cdr_topic);
- cdr_topic = NULL;
-
channel_subscription = stasis_forward_cancel(channel_subscription);
bridge_subscription = stasis_forward_cancel(bridge_subscription);
parking_subscription = stasis_forward_cancel(parking_subscription);
@@ -3936,16 +3940,14 @@ static void destroy_subscriptions(void)
*/
static int create_subscriptions(void)
{
- /* Use the CDR topic to determine if we've already created this */
- if (cdr_topic) {
- return 0;
- }
-
- cdr_topic = stasis_topic_create("cdr_engine");
if (!cdr_topic) {
return -1;
}
+ if (channel_subscription || bridge_subscription || parking_subscription) {
+ return 0;
+ }
+
channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic);
if (!channel_subscription) {
return -1;
@@ -3959,16 +3961,6 @@ static int create_subscriptions(void)
return -1;
}
- stasis_router = stasis_message_router_create(cdr_topic);
- if (!stasis_router) {
- return -1;
- }
- stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
- stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
- stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
-
return 0;
}
@@ -4019,6 +4011,12 @@ static void cdr_engine_cleanup(void)
static void cdr_engine_shutdown(void)
{
+ stasis_message_router_unsubscribe_and_join(stasis_router);
+ stasis_router = NULL;
+
+ ao2_cleanup(cdr_topic);
+ cdr_topic = NULL;
+
ao2_callback(active_cdrs_by_channel, OBJ_NODATA, cdr_object_dispatch_all_cb,
NULL);
finalize_batch_mode();
@@ -4113,6 +4111,21 @@ int ast_cdr_engine_init(void)
return -1;
}
+ cdr_topic = stasis_topic_create("cdr_engine");
+ if (!cdr_topic) {
+ return -1;
+ }
+
+ stasis_router = stasis_message_router_create(cdr_topic);
+ if (!stasis_router) {
+ return -1;
+ }
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
+ stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
+ stasis_message_router_add(stasis_router, ast_parked_call_type(), handle_parked_call_message, NULL);
+
active_cdrs_by_channel = ao2_container_alloc(NUM_CDR_BUCKETS,
cdr_object_channel_hash_fn, cdr_object_channel_cmp_fn);
if (!active_cdrs_by_channel) {