diff options
Diffstat (limited to 'main/stasis_message_router.c')
-rw-r--r-- | main/stasis_message_router.c | 186 |
1 files changed, 115 insertions, 71 deletions
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 8c82decfe..7ed9bcb83 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" +#include "asterisk/vector.h" /*! \internal */ struct stasis_message_route { @@ -44,19 +45,13 @@ struct stasis_message_route { void *data; }; -struct route_table { - /*! Current number of entries in the route table */ - size_t current_size; - /*! Allocated number of entires in the route table */ - size_t max_size; - /*! The route table itself */ - struct stasis_message_route routes[]; -}; +ast_vector(route_table, struct stasis_message_route); -static struct stasis_message_route *table_find_route(struct route_table *table, +static struct stasis_message_route *route_table_find(struct route_table *table, struct stasis_message_type *message_type) { size_t idx; + struct stasis_message_route *route; /* While a linear search for routes may seem very inefficient, most * route tables have six routes or less. For such small data, it's @@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table, * tables, then we can look into containers with more efficient * lookups. */ - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - return &table->routes[idx]; + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + if (route->message_type == message_type) { + return route; } } return NULL; } -static int table_add_route(struct route_table **table_ptr, +/*! + * \brief route_table comparator for ast_vector_remove_cmp_unordered() + * + * \param elem Element to compare against + * \param value Value to compare with the vector element. + * + * \return 0 if element does not match. + * \return Non-zero if element matches. + */ +#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value)) + +/*! + * \brief route_table vector element cleanup. + * + * \param elem Element to cleanup + * + * \return Nothing + */ +#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type) + +static int route_table_remove(struct route_table *table, + struct stasis_message_type *message_type) +{ + return ast_vector_remove_cmp_unordered(table, message_type, ROUTE_TABLE_ELEM_CMP, + ROUTE_TABLE_ELEM_CLEANUP); +} + +static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - struct route_table *table = *table_ptr; - struct stasis_message_route *route; - - ast_assert(table_find_route(table, message_type) == NULL); - - if (table->current_size + 1 > table->max_size) { - size_t new_max_size = table->max_size ? table->max_size * 2 : 1; - struct route_table *new_table = ast_realloc(table, - sizeof(*new_table) + - sizeof(new_table->routes[0]) * new_max_size); - if (!new_table) { - return -1; - } - *table_ptr = table = new_table; - table->max_size = new_max_size; - } + struct stasis_message_route route; + int res; - route = &table->routes[table->current_size++]; + ast_assert(callback != NULL); + ast_assert(route_table_find(table, message_type) == NULL); - route->message_type = ao2_bump(message_type); - route->callback = callback; - route->data = data; + route.message_type = ao2_bump(message_type); + route.callback = callback; + route.data = data; - return 0; + res = ast_vector_append(table, route); + if (res) { + ROUTE_TABLE_ELEM_CLEANUP(route); + } + return res; } -static int table_remove_route(struct route_table *table, - struct stasis_message_type *message_type) +static void route_table_dtor(struct route_table *table) { size_t idx; + struct stasis_message_route *route; - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - ao2_cleanup(message_type); - table->routes[idx] = - table->routes[--table->current_size]; - return 0; - } + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + ROUTE_TABLE_ELEM_CLEANUP(*route); } - return -1; + ast_vector_free(table); } /*! \internal */ @@ -124,9 +134,9 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct route_table *routes; + struct route_table routes; /*! Subscribed routes for \ref stasis_cache_update messages */ - struct route_table *cache_routes; + struct route_table cache_routes; /*! Route of last resort */ struct stasis_message_route default_route; }; @@ -137,13 +147,11 @@ static void router_dtor(void *obj) ast_assert(!stasis_subscription_is_subscribed(router->subscription)); ast_assert(stasis_subscription_is_done(router->subscription)); - router->subscription = NULL; - ast_free(router->routes); - router->routes = NULL; + router->subscription = NULL; - ast_free(router->cache_routes); - router->cache_routes = NULL; + route_table_dtor(&router->routes); + route_table_dtor(&router->cache_routes); } static int find_route( @@ -161,12 +169,12 @@ static int find_route( /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = table_find_route(router->cache_routes, update->type); + route = route_table_find(&router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = table_find_route(router->routes, type); + route = route_table_find(&router->routes, type); } if (route == NULL && router->default_route.callback) { @@ -201,6 +209,7 @@ static void router_dispatch(void *data, struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic) { + int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); router = ao2_alloc(sizeof(*router), router_dtor); @@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ast_calloc(1, sizeof(*router->routes)); - if (!router->routes) { - return NULL; - } - - router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); - if (!router->cache_routes) { + res = 0; + res |= ast_vector_init(&router->routes, 0); + res |= ast_vector_init(&router->cache_routes, 0); + if (res) { return NULL; } @@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->routes, message_type, callback, data); + ao2_unlock(router); + return res; } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->cache_routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot cache a route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->cache_routes, message_type, callback, data); + ao2_unlock(router); + return res; } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->routes, message_type); + ao2_unlock(router); } void stasis_message_router_remove_cache_update( struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->cache_routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->cache_routes, message_type); + ao2_unlock(router); } int stasis_message_router_set_default(struct stasis_message_router *router, - stasis_subscription_cb callback, - void *data) + stasis_subscription_cb callback, + void *data) { - SCOPED_AO2LOCK(lock, router); + ast_assert(router != NULL); + ast_assert(callback != NULL); + + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; + ao2_unlock(router); /* While this implementation can never fail, it used to be able to */ return 0; } |