summaryrefslogtreecommitdiff
path: root/main/stasis_message_router.c
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2013-11-02 04:12:36 +0000
committerRichard Mudgett <rmudgett@digium.com>2013-11-02 04:12:36 +0000
commit629a5fc39b1ad8bc638106c1f23537e797b5bedc (patch)
treefc9d6f9fee325d65f5ac96235b6589850f41d3bc /main/stasis_message_router.c
parenta84cff117d1f862a6b345f87d0ba1f158e5feecd (diff)
vector: Update API to be more flexible.
Made the vector macro API be more like linked lists. 1) Added a name parameter to ast_vector() to name the vector struct. 2) Made the API take a pointer to the vector struct instead of the struct itself. 3) Added an element cleanup macro/function parameter when removing an element from the vector for ast_vector_remove_cmp_unordered() and ast_vector_remove_elem_unordered(). 4) Added ast_vector_get_addr() in case the vector element is not a simple pointer. * Converted an inline vector usage in stasis_message_router to use the vector API. It needed the API improvements so it could be converted. * Fixed topic reference leak in router_dtor() when the stasis_message_router is destroyed. * Fixed deadlock potential in stasis_forward_all() and stasis_forward_cancel(). Locking two topics at the same time requires deadlock avoidance. * Made internal_stasis_subscribe() tolerant of a NULL topic. * Made stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_remove(), and stasis_message_router_remove_cache_update() tolerant of a NULL message_type. * Promoted a LOG_DEBUG message to LOG_ERROR as intended in dispatch_message(). Review: https://reviewboard.asterisk.org/r/2903/ ........ Merged revisions 402429 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@402430 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis_message_router.c')
-rw-r--r--main/stasis_message_router.c186
1 files changed, 115 insertions, 71 deletions
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 8c82decfe..7ed9bcb83 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
+#include "asterisk/vector.h"
/*! \internal */
struct stasis_message_route {
@@ -44,19 +45,13 @@ struct stasis_message_route {
void *data;
};
-struct route_table {
- /*! Current number of entries in the route table */
- size_t current_size;
- /*! Allocated number of entires in the route table */
- size_t max_size;
- /*! The route table itself */
- struct stasis_message_route routes[];
-};
+ast_vector(route_table, struct stasis_message_route);
-static struct stasis_message_route *table_find_route(struct route_table *table,
+static struct stasis_message_route *route_table_find(struct route_table *table,
struct stasis_message_type *message_type)
{
size_t idx;
+ struct stasis_message_route *route;
/* While a linear search for routes may seem very inefficient, most
* route tables have six routes or less. For such small data, it's
@@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table,
* tables, then we can look into containers with more efficient
* lookups.
*/
- for (idx = 0; idx < table->current_size; ++idx) {
- if (table->routes[idx].message_type == message_type) {
- return &table->routes[idx];
+ for (idx = 0; idx < ast_vector_size(table); ++idx) {
+ route = ast_vector_get_addr(table, idx);
+ if (route->message_type == message_type) {
+ return route;
}
}
return NULL;
}
-static int table_add_route(struct route_table **table_ptr,
+/*!
+ * \brief route_table comparator for ast_vector_remove_cmp_unordered()
+ *
+ * \param elem Element to compare against
+ * \param value Value to compare with the vector element.
+ *
+ * \return 0 if element does not match.
+ * \return Non-zero if element matches.
+ */
+#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
+
+/*!
+ * \brief route_table vector element cleanup.
+ *
+ * \param elem Element to cleanup
+ *
+ * \return Nothing
+ */
+#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
+
+static int route_table_remove(struct route_table *table,
+ struct stasis_message_type *message_type)
+{
+ return ast_vector_remove_cmp_unordered(table, message_type, ROUTE_TABLE_ELEM_CMP,
+ ROUTE_TABLE_ELEM_CLEANUP);
+}
+
+static int route_table_add(struct route_table *table,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- struct route_table *table = *table_ptr;
- struct stasis_message_route *route;
-
- ast_assert(table_find_route(table, message_type) == NULL);
-
- if (table->current_size + 1 > table->max_size) {
- size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
- struct route_table *new_table = ast_realloc(table,
- sizeof(*new_table) +
- sizeof(new_table->routes[0]) * new_max_size);
- if (!new_table) {
- return -1;
- }
- *table_ptr = table = new_table;
- table->max_size = new_max_size;
- }
+ struct stasis_message_route route;
+ int res;
- route = &table->routes[table->current_size++];
+ ast_assert(callback != NULL);
+ ast_assert(route_table_find(table, message_type) == NULL);
- route->message_type = ao2_bump(message_type);
- route->callback = callback;
- route->data = data;
+ route.message_type = ao2_bump(message_type);
+ route.callback = callback;
+ route.data = data;
- return 0;
+ res = ast_vector_append(table, route);
+ if (res) {
+ ROUTE_TABLE_ELEM_CLEANUP(route);
+ }
+ return res;
}
-static int table_remove_route(struct route_table *table,
- struct stasis_message_type *message_type)
+static void route_table_dtor(struct route_table *table)
{
size_t idx;
+ struct stasis_message_route *route;
- for (idx = 0; idx < table->current_size; ++idx) {
- if (table->routes[idx].message_type == message_type) {
- ao2_cleanup(message_type);
- table->routes[idx] =
- table->routes[--table->current_size];
- return 0;
- }
+ for (idx = 0; idx < ast_vector_size(table); ++idx) {
+ route = ast_vector_get_addr(table, idx);
+ ROUTE_TABLE_ELEM_CLEANUP(*route);
}
- return -1;
+ ast_vector_free(table);
}
/*! \internal */
@@ -124,9 +134,9 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
- struct route_table *routes;
+ struct route_table routes;
/*! Subscribed routes for \ref stasis_cache_update messages */
- struct route_table *cache_routes;
+ struct route_table cache_routes;
/*! Route of last resort */
struct stasis_message_route default_route;
};
@@ -137,13 +147,11 @@ static void router_dtor(void *obj)
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
ast_assert(stasis_subscription_is_done(router->subscription));
- router->subscription = NULL;
- ast_free(router->routes);
- router->routes = NULL;
+ router->subscription = NULL;
- ast_free(router->cache_routes);
- router->cache_routes = NULL;
+ route_table_dtor(&router->routes);
+ route_table_dtor(&router->cache_routes);
}
static int find_route(
@@ -161,12 +169,12 @@ static int find_route(
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
- route = table_find_route(router->cache_routes, update->type);
+ route = route_table_find(&router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
- route = table_find_route(router->routes, type);
+ route = route_table_find(&router->routes, type);
}
if (route == NULL && router->default_route.callback) {
@@ -201,6 +209,7 @@ static void router_dispatch(void *data,
struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic)
{
+ int res;
RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
router = ao2_alloc(sizeof(*router), router_dtor);
@@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- router->routes = ast_calloc(1, sizeof(*router->routes));
- if (!router->routes) {
- return NULL;
- }
-
- router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
- if (!router->cache_routes) {
+ res = 0;
+ res |= ast_vector_init(&router->routes, 0);
+ res |= ast_vector_init(&router->cache_routes, 0);
+ if (res) {
return NULL;
}
@@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- SCOPED_AO2LOCK(lock, router);
- return table_add_route(&router->routes, message_type, callback, data);
+ int res;
+
+ ast_assert(router != NULL);
+
+ if (!message_type) {
+ /* Cannot route to NULL type. */
+ return -1;
+ }
+ ao2_lock(router);
+ res = route_table_add(&router->routes, message_type, callback, data);
+ ao2_unlock(router);
+ return res;
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
- SCOPED_AO2LOCK(lock, router);
- return table_add_route(&router->cache_routes, message_type, callback, data);
+ int res;
+
+ ast_assert(router != NULL);
+
+ if (!message_type) {
+ /* Cannot cache a route to NULL type. */
+ return -1;
+ }
+ ao2_lock(router);
+ res = route_table_add(&router->cache_routes, message_type, callback, data);
+ ao2_unlock(router);
+ return res;
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
- SCOPED_AO2LOCK(lock, router);
- table_remove_route(router->routes, message_type);
+ ast_assert(router != NULL);
+
+ if (!message_type) {
+ /* Cannot remove a NULL type. */
+ return;
+ }
+ ao2_lock(router);
+ route_table_remove(&router->routes, message_type);
+ ao2_unlock(router);
}
void stasis_message_router_remove_cache_update(
struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
- SCOPED_AO2LOCK(lock, router);
- table_remove_route(router->cache_routes, message_type);
+ ast_assert(router != NULL);
+
+ if (!message_type) {
+ /* Cannot remove a NULL type. */
+ return;
+ }
+ ao2_lock(router);
+ route_table_remove(&router->cache_routes, message_type);
+ ao2_unlock(router);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
- stasis_subscription_cb callback,
- void *data)
+ stasis_subscription_cb callback,
+ void *data)
{
- SCOPED_AO2LOCK(lock, router);
+ ast_assert(router != NULL);
+ ast_assert(callback != NULL);
+
+ ao2_lock(router);
router->default_route.callback = callback;
router->default_route.data = data;
+ ao2_unlock(router);
/* While this implementation can never fail, it used to be able to */
return 0;
}