summaryrefslogtreecommitdiff
path: root/main/stasis_message_router.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-07-23 13:39:50 +0000
committerDavid M. Lee <dlee@digium.com>2013-07-23 13:39:50 +0000
commitfec667646fb63fb0d55d26cf76ade739dc62bcfe (patch)
treeddc5248bd5b7ed97a1939961592c9394621d81ff /main/stasis_message_router.c
parent5f56004b1dbfb67dd9cbc58a110ddec8089b10af (diff)
Fix bridge/channel AMI event ordering issues
The stasis_cache_update messages are somewhat cumbersome to handle with the stasis_message_router. Since all updates have the same message type, they are normally handled with the same route. Since caching itself is a first class component of stasis-core, it makes sense for the router to handle the cache update messages itself. This patch adds stasis_message_router_add_cache_update() and stasis_message_router_remove_cache_update() to handle the routing of stasis_cache_update messages. This patch also corrects an issue with manager_{bridging,channels}.c, where events might be reordered. The reordering occurs because the components use different message routers, which they needed because they both needed to route cache update messages. They now both use manager's router, and add cache routes for just the cache updates they are interested in. (closes issue ASTERISK-22038) Review: https://reviewboard.asterisk.org/r/2677/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395118 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis_message_router.c')
-rw-r--r--main/stasis_message_router.c126
1 files changed, 103 insertions, 23 deletions
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 4409d2226..26d2f2c0c 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -34,6 +34,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
+/*! Number of hash buckets for the route table. Keep it prime! */
+#define ROUTE_TABLE_BUCKETS 7
+
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@@ -75,6 +78,8 @@ struct stasis_message_router {
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
+ /*! Subscribed routes for \ref stasi_cache_update messages */
+ struct ao2_container *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
};
@@ -90,13 +95,46 @@ static void router_dtor(void *obj)
ao2_cleanup(router->routes);
router->routes = NULL;
+ ao2_cleanup(router->cache_routes);
+ router->cache_routes = NULL;
+
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
-static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type)
+static struct stasis_message_route *find_route(
+ struct stasis_message_router *router,
+ struct stasis_message *message)
{
- return ao2_find(router->routes, message_type, OBJ_KEY);
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+ struct stasis_message_type *type = stasis_message_type(message);
+ SCOPED_AO2LOCK(lock, router);
+
+ if (type == stasis_cache_update_type()) {
+ /* Find a cache route */
+ struct stasis_cache_update *update =
+ stasis_message_data(message);
+ route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Find a regular route */
+ route = ao2_find(router->routes, type, OBJ_KEY);
+ }
+
+ if (route == NULL) {
+ /* Maybe the default route, then? */
+ if ((route = router->default_route)) {
+ ao2_ref(route, +1);
+ }
+ }
+
+ if (route == NULL) {
+ return NULL;
+ }
+
+ ao2_ref(route, +1);
+ return route;
}
static void router_dispatch(void *data,
@@ -105,29 +143,18 @@ static void router_dispatch(void *data,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
- RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
- struct stasis_message_type *type = stasis_message_type(message);
- {
- SCOPED_AO2LOCK(lock, router);
-
- if (!(route = find_route(router, type))) {
- if ((route = router->default_route)) {
- ao2_ref(route, +1);
- }
- }
- }
+ route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
}
+
if (stasis_subscription_final_message(sub, message)) {
- router_needs_cleanup = router;
- return;
+ ao2_cleanup(router);
}
-
}
struct stasis_message_router *stasis_message_router_create(
@@ -140,7 +167,15 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) {
+ router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
+ route_cmp);
+ if (!router->routes) {
+ return NULL;
+ }
+
+ router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
+ route_hash, route_cmp);
+ if (!router->cache_routes) {
return NULL;
}
@@ -211,7 +246,10 @@ static int add_route(struct stasis_message_router *router,
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
- if ((existing_route = find_route(router, route->message_type))) {
+ existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
@@ -219,10 +257,27 @@ static int add_route(struct stasis_message_router *router,
return 0;
}
+static int add_cache_route(struct stasis_message_router *router,
+ struct stasis_message_route *route)
+{
+ RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, router);
+
+ existing_route = ao2_find(router->cache_routes, route->message_type,
+ OBJ_KEY);
+
+ if (existing_route) {
+ ast_log(LOG_ERROR, "Cannot add route; route exists\n");
+ return -1;
+ }
+
+ ao2_link(router->cache_routes, route);
+ return 0;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback,
- void *data)
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
@@ -234,12 +289,37 @@ int stasis_message_router_add(struct stasis_message_router *router,
return add_route(router, route);
}
+int stasis_message_router_add_cache_update(struct stasis_message_router *router,
+ struct stasis_message_type *message_type,
+ stasis_subscription_cb callback, void *data)
+{
+ RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+
+ route = route_create(message_type, callback, data);
+ if (!route) {
+ return -1;
+ }
+
+ return add_cache_route(router, route);
+}
+
void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type)
+ struct stasis_message_type *message_type)
+{
+ SCOPED_AO2LOCK(lock, router);
+
+ ao2_find(router->routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+}
+
+void stasis_message_router_remove_cache_update(
+ struct stasis_message_router *router,
+ struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
- ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+ ao2_find(router->cache_routes, message_type,
+ OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
}
int stasis_message_router_set_default(struct stasis_message_router *router,