diff options
author | Richard Mudgett <rmudgett@digium.com> | 2013-11-02 04:12:36 +0000 |
---|---|---|
committer | Richard Mudgett <rmudgett@digium.com> | 2013-11-02 04:12:36 +0000 |
commit | 629a5fc39b1ad8bc638106c1f23537e797b5bedc (patch) | |
tree | fc9d6f9fee325d65f5ac96235b6589850f41d3bc /main/stasis_message_router.c | |
parent | a84cff117d1f862a6b345f87d0ba1f158e5feecd (diff) |
vector: Update API to be more flexible.
Made the vector macro API be more like linked lists.
1) Added a name parameter to ast_vector() to name the vector struct.
2) Made the API take a pointer to the vector struct instead of the struct
itself.
3) Added an element cleanup macro/function parameter when removing an
element from the vector for ast_vector_remove_cmp_unordered() and
ast_vector_remove_elem_unordered().
4) Added ast_vector_get_addr() in case the vector element is not a simple
pointer.
* Converted an inline vector usage in stasis_message_router to use the
vector API. It needed the API improvements so it could be converted.
* Fixed topic reference leak in router_dtor() when the
stasis_message_router is destroyed.
* Fixed deadlock potential in stasis_forward_all() and
stasis_forward_cancel(). Locking two topics at the same time requires
deadlock avoidance.
* Made internal_stasis_subscribe() tolerant of a NULL topic.
* Made stasis_message_router_add(),
stasis_message_router_add_cache_update(), stasis_message_router_remove(),
and stasis_message_router_remove_cache_update() tolerant of a NULL
message_type.
* Promoted a LOG_DEBUG message to LOG_ERROR as intended in
dispatch_message().
Review: https://reviewboard.asterisk.org/r/2903/
........
Merged revisions 402429 from http://svn.asterisk.org/svn/asterisk/branches/12
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@402430 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis_message_router.c')
-rw-r--r-- | main/stasis_message_router.c | 186 |
1 files changed, 115 insertions, 71 deletions
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 8c82decfe..7ed9bcb83 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" +#include "asterisk/vector.h" /*! \internal */ struct stasis_message_route { @@ -44,19 +45,13 @@ struct stasis_message_route { void *data; }; -struct route_table { - /*! Current number of entries in the route table */ - size_t current_size; - /*! Allocated number of entires in the route table */ - size_t max_size; - /*! The route table itself */ - struct stasis_message_route routes[]; -}; +ast_vector(route_table, struct stasis_message_route); -static struct stasis_message_route *table_find_route(struct route_table *table, +static struct stasis_message_route *route_table_find(struct route_table *table, struct stasis_message_type *message_type) { size_t idx; + struct stasis_message_route *route; /* While a linear search for routes may seem very inefficient, most * route tables have six routes or less. For such small data, it's @@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table, * tables, then we can look into containers with more efficient * lookups. */ - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - return &table->routes[idx]; + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + if (route->message_type == message_type) { + return route; } } return NULL; } -static int table_add_route(struct route_table **table_ptr, +/*! + * \brief route_table comparator for ast_vector_remove_cmp_unordered() + * + * \param elem Element to compare against + * \param value Value to compare with the vector element. + * + * \return 0 if element does not match. + * \return Non-zero if element matches. + */ +#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value)) + +/*! + * \brief route_table vector element cleanup. + * + * \param elem Element to cleanup + * + * \return Nothing + */ +#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type) + +static int route_table_remove(struct route_table *table, + struct stasis_message_type *message_type) +{ + return ast_vector_remove_cmp_unordered(table, message_type, ROUTE_TABLE_ELEM_CMP, + ROUTE_TABLE_ELEM_CLEANUP); +} + +static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - struct route_table *table = *table_ptr; - struct stasis_message_route *route; - - ast_assert(table_find_route(table, message_type) == NULL); - - if (table->current_size + 1 > table->max_size) { - size_t new_max_size = table->max_size ? table->max_size * 2 : 1; - struct route_table *new_table = ast_realloc(table, - sizeof(*new_table) + - sizeof(new_table->routes[0]) * new_max_size); - if (!new_table) { - return -1; - } - *table_ptr = table = new_table; - table->max_size = new_max_size; - } + struct stasis_message_route route; + int res; - route = &table->routes[table->current_size++]; + ast_assert(callback != NULL); + ast_assert(route_table_find(table, message_type) == NULL); - route->message_type = ao2_bump(message_type); - route->callback = callback; - route->data = data; + route.message_type = ao2_bump(message_type); + route.callback = callback; + route.data = data; - return 0; + res = ast_vector_append(table, route); + if (res) { + ROUTE_TABLE_ELEM_CLEANUP(route); + } + return res; } -static int table_remove_route(struct route_table *table, - struct stasis_message_type *message_type) +static void route_table_dtor(struct route_table *table) { size_t idx; + struct stasis_message_route *route; - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - ao2_cleanup(message_type); - table->routes[idx] = - table->routes[--table->current_size]; - return 0; - } + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + ROUTE_TABLE_ELEM_CLEANUP(*route); } - return -1; + ast_vector_free(table); } /*! \internal */ @@ -124,9 +134,9 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct route_table *routes; + struct route_table routes; /*! Subscribed routes for \ref stasis_cache_update messages */ - struct route_table *cache_routes; + struct route_table cache_routes; /*! Route of last resort */ struct stasis_message_route default_route; }; @@ -137,13 +147,11 @@ static void router_dtor(void *obj) ast_assert(!stasis_subscription_is_subscribed(router->subscription)); ast_assert(stasis_subscription_is_done(router->subscription)); - router->subscription = NULL; - ast_free(router->routes); - router->routes = NULL; + router->subscription = NULL; - ast_free(router->cache_routes); - router->cache_routes = NULL; + route_table_dtor(&router->routes); + route_table_dtor(&router->cache_routes); } static int find_route( @@ -161,12 +169,12 @@ static int find_route( /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = table_find_route(router->cache_routes, update->type); + route = route_table_find(&router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = table_find_route(router->routes, type); + route = route_table_find(&router->routes, type); } if (route == NULL && router->default_route.callback) { @@ -201,6 +209,7 @@ static void router_dispatch(void *data, struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic) { + int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); router = ao2_alloc(sizeof(*router), router_dtor); @@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ast_calloc(1, sizeof(*router->routes)); - if (!router->routes) { - return NULL; - } - - router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); - if (!router->cache_routes) { + res = 0; + res |= ast_vector_init(&router->routes, 0); + res |= ast_vector_init(&router->cache_routes, 0); + if (res) { return NULL; } @@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->routes, message_type, callback, data); + ao2_unlock(router); + return res; } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->cache_routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot cache a route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->cache_routes, message_type, callback, data); + ao2_unlock(router); + return res; } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->routes, message_type); + ao2_unlock(router); } void stasis_message_router_remove_cache_update( struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->cache_routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->cache_routes, message_type); + ao2_unlock(router); } int stasis_message_router_set_default(struct stasis_message_router *router, - stasis_subscription_cb callback, - void *data) + stasis_subscription_cb callback, + void *data) { - SCOPED_AO2LOCK(lock, router); + ast_assert(router != NULL); + ast_assert(callback != NULL); + + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; + ao2_unlock(router); /* While this implementation can never fail, it used to be able to */ return 0; } |