summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-07-23 13:39:50 +0000
committerDavid M. Lee <dlee@digium.com>2013-07-23 13:39:50 +0000
commitfec667646fb63fb0d55d26cf76ade739dc62bcfe (patch)
treeddc5248bd5b7ed97a1939961592c9394621d81ff /main
parent5f56004b1dbfb67dd9cbc58a110ddec8089b10af (diff)
Fix bridge/channel AMI event ordering issues
The stasis_cache_update messages are somewhat cumbersome to handle with the stasis_message_router. Since all updates have the same message type, they are normally handled with the same route. Since caching itself is a first class component of stasis-core, it makes sense for the router to handle the cache update messages itself. This patch adds stasis_message_router_add_cache_update() and stasis_message_router_remove_cache_update() to handle the routing of stasis_cache_update messages. This patch also corrects an issue with manager_{bridging,channels}.c, where events might be reordered. The reordering occurs because the components use different message routers, which they needed because they both needed to route cache update messages. They now both use manager's router, and add cache routes for just the cache updates they are interested in. (closes issue ASTERISK-22038) Review: https://reviewboard.asterisk.org/r/2677/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395118 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/cdr.c6
-rw-r--r--main/manager_bridging.c29
-rw-r--r--main/manager_channels.c74
-rw-r--r--main/stasis_message_router.c126
4 files changed, 134 insertions, 101 deletions
diff --git a/main/cdr.c b/main/cdr.c
index 2e6209551..633324206 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -1967,9 +1967,7 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
struct cdr_object *it_cdr;
ast_assert(update != NULL);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -4024,7 +4022,7 @@ int ast_cdr_engine_init(void)
if (!stasis_router) {
return -1;
}
- stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_channel_cache_message, NULL);
+ stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
diff --git a/main/manager_bridging.c b/main/manager_bridging.c
index ccad94785..c24567eaa 100644
--- a/main/manager_bridging.c
+++ b/main/manager_bridging.c
@@ -196,9 +196,7 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
update = stasis_message_data(message);
- if (ast_bridge_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_bridge_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -495,35 +493,22 @@ int manager_bridging_init(void)
return -1;
}
- /* BUGBUG - This should really route off of the manager_router, but
- * can't b/c manager_channels is already routing the
- * stasis_cache_update_type() messages. Having a separate router can
- * cause some message ordering issues with bridge and channel messages.
- */
- bridge_state_router = stasis_message_router_create(bridge_topic);
+ bridge_state_router = ast_manager_get_message_router();
if (!bridge_state_router) {
return -1;
}
- ret |= stasis_message_router_add(bridge_state_router,
- stasis_cache_update_type(),
- bridge_snapshot_update,
- NULL);
+ ret |= stasis_message_router_add_cache_update(bridge_state_router,
+ ast_bridge_snapshot_type(), bridge_snapshot_update, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_bridge_merge_message_type(),
- bridge_merge_cb,
- NULL);
+ ast_bridge_merge_message_type(), bridge_merge_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_entered_bridge_type(),
- channel_enter_cb,
- NULL);
+ ast_channel_entered_bridge_type(), channel_enter_cb, NULL);
ret |= stasis_message_router_add(bridge_state_router,
- ast_channel_left_bridge_type(),
- channel_leave_cb,
- NULL);
+ ast_channel_left_bridge_type(), channel_leave_cb, NULL);
ret |= ast_manager_register_xml_core("BridgeList", 0, manager_bridges_list);
ret |= ast_manager_register_xml_core("BridgeInfo", 0, manager_bridge_info);
diff --git a/main/manager_channels.c b/main/manager_channels.c
index 6e8621973..d26f0be06 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -726,9 +726,7 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
update = stasis_message_data(message);
- if (ast_channel_snapshot_type() != update->type) {
- return;
- }
+ ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
@@ -1283,85 +1281,57 @@ int manager_channels_init(void)
ast_register_atexit(manager_channels_shutdown);
- ret |= stasis_message_router_add(message_router,
- stasis_cache_update_type(),
- channel_snapshot_update,
- NULL);
+ ret |= stasis_message_router_add_cache_update(message_router,
+ ast_channel_snapshot_type(), channel_snapshot_update, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_user_event_type(),
- channel_user_event_cb,
- NULL);
+ ast_channel_user_event_type(), channel_user_event_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_begin_type(),
- channel_dtmf_begin_cb,
- NULL);
+ ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dtmf_end_type(),
- channel_dtmf_end_cb,
- NULL);
+ ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_request_type(),
- channel_hangup_request_cb,
- NULL);
+ ast_channel_hangup_request_type(), channel_hangup_request_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_dial_type(),
- channel_dial_cb,
- NULL);
+ ast_channel_dial_type(), channel_dial_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hold_type(),
- channel_hold_cb,
- NULL);
+ ast_channel_hold_type(), channel_hold_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_unhold_type(),
- channel_unhold_cb,
- NULL);
+ ast_channel_unhold_type(), channel_unhold_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_fax_type(),
- channel_fax_cb,
- NULL);
+ ast_channel_fax_type(), channel_fax_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_start_type(),
- channel_chanspy_start_cb,
- NULL);
+ ast_channel_chanspy_start_type(), channel_chanspy_start_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_chanspy_stop_type(),
- channel_chanspy_stop_cb,
- NULL);
+ ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_hangup_handler_type(),
- channel_hangup_handler_cb,
- NULL);
+ ast_channel_hangup_handler_type(), channel_hangup_handler_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_moh_start_type(),
- channel_moh_start_cb,
- NULL);
+ ast_channel_moh_start_type(), channel_moh_start_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_moh_stop_type(),
- channel_moh_stop_cb,
- NULL);
+ ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_start_type(),
- channel_monitor_start_cb,
- NULL);
+ ast_channel_monitor_start_type(), channel_monitor_start_cb,
+ NULL);
ret |= stasis_message_router_add(message_router,
- ast_channel_monitor_stop_type(),
- channel_monitor_stop_cb,
- NULL);
+ ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL);
/* If somehow we failed to add any routes, just shut down the whole
* thing and fail it.
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 4409d2226..26d2f2c0c 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -34,6 +34,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
+/*! Number of hash buckets for the route table. Keep it prime! */
+#define ROUTE_TABLE_BUCKETS 7
+
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@@ -75,6 +78,8 @@ struct stasis_message_router {
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
+ /*! Subscribed routes for \ref stasi_cache_update messages */
+ struct ao2_container *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
};
@@ -90,13 +95,46 @@ static void router_dtor(void *obj)
ao2_cleanup(router->routes);
router->routes = NULL;
+ ao2_cleanup(router->cache_routes);
+ router->cache_routes = NULL;
+
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
-static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type)
+static struct stasis_message_route *find_route(
+ struct stasis_message_router *router,
+ struct stasis_message *message)
{
- return ao2_find(router->routes, message_type, OBJ_KEY);
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_type *type = stasis_message_type(message);
+ SCOPED_AO2LOCK(lock, router);
+
+ if (type == stasis_cache_update_type()) {
+ /* Find a cache route */
+ struct stasis_cache_update *update =
+ stasis_message_data(message);
+ route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Find a regular route */
+ route = ao2_find(router->routes, type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Maybe the default route, then? */
+ if ((route = router->default_route)) {
+ ao2_ref(route, +1);
+ }
+ }
+
+ if (route == NULL) {
+ return NULL;
+ }
+
+ ao2_ref(route, +1);
+ return route;
}
static void router_dispatch(void *data,
@@ -105,29 +143,18 @@ static void router_dispatch(void *data,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
- struct stasis_message_type *type = stasis_message_type(message);
- {
- SCOPED_AO2LOCK(lock, router);
-
- if (!(route = find_route(router, type))) {
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
- }
- }
+ route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
}
+
if (stasis_subscription_final_message(sub, message)) {
- router_needs_cleanup = router;
- return;
+ ao2_cleanup(router);
}
-
}
struct stasis_message_router *stasis_message_router_create(
@@ -140,7 +167,15 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) {
+ router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
+ route_cmp);
+ if (!router->routes) {
+ return NULL;
+ }
+
+ router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
+ route_hash, route_cmp);
+ if (!router->cache_routes) {
return NULL;
}
@@ -211,7 +246,10 @@ static int add_route(struct stasis_message_router *router,
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
- if ((existing_route = find_route(router, route->message_type))) {
+ existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
@@ -219,10 +257,27 @@ static int add_route(struct stasis_message_router *router,
return 0;
}
+static int add_cache_route(struct stasis_message_router *router,
+ struct stasis_message_route *route)
+{
+ RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, router);
+
+ existing_route = ao2_find(router->cache_routes, route->message_type,
+ OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
+ return -1;
+ }
+
+ ao2_link(router->cache_routes, route);
+ return 0;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
@@ -234,12 +289,37 @@ int stasis_message_router_add(struct stasis_message_router *router,
return add_route(router, route);
}
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
+{
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+ route = route_create(message_type, callback, data);
+ if (!route) {
+ return -1;
+ }
+
+ return add_cache_route(router, route);
+}
+
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type)
+ struct stasis_message_type *message_type)
+{
+ SCOPED_AO2LOCK(lock, router);
+
+ ao2_find(router->routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+}
+
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
- ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ ao2_find(router->cache_routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
}
int stasis_message_router_set_default(struct stasis_message_router *router,