diff options
Diffstat (limited to 'main/stasis_message_router.c')
-rw-r--r-- | main/stasis_message_router.c | 233 |
1 files changed, 99 insertions, 134 deletions
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 26d2f2c0c..8c82decfe 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" -/*! Number of hash buckets for the route table. Keep it prime! */ -#define ROUTE_TABLE_BUCKETS 7 - /*! \internal */ struct stasis_message_route { /*! Message type handle by this route. */ @@ -47,29 +44,79 @@ struct stasis_message_route { void *data; }; -static void route_dtor(void *obj) +struct route_table { + /*! Current number of entries in the route table */ + size_t current_size; + /*! Allocated number of entires in the route table */ + size_t max_size; + /*! The route table itself */ + struct stasis_message_route routes[]; +}; + +static struct stasis_message_route *table_find_route(struct route_table *table, + struct stasis_message_type *message_type) { - struct stasis_message_route *route = obj; + size_t idx; + + /* While a linear search for routes may seem very inefficient, most + * route tables have six routes or less. For such small data, it's + * hard to beat a linear search. If we start having larger route + * tables, then we can look into containers with more efficient + * lookups. + */ + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + return &table->routes[idx]; + } + } - ao2_cleanup(route->message_type); - route->message_type = NULL; + return NULL; } -static int route_hash(const void *obj, const int flags) +static int table_add_route(struct route_table **table_ptr, + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data) { - const struct stasis_message_route *route = obj; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type; + struct route_table *table = *table_ptr; + struct stasis_message_route *route; + + ast_assert(table_find_route(table, message_type) == NULL); + + if (table->current_size + 1 > table->max_size) { + size_t new_max_size = table->max_size ? table->max_size * 2 : 1; + struct route_table *new_table = ast_realloc(table, + sizeof(*new_table) + + sizeof(new_table->routes[0]) * new_max_size); + if (!new_table) { + return -1; + } + *table_ptr = table = new_table; + table->max_size = new_max_size; + } - return ast_str_hash(stasis_message_type_name(message_type)); + route = &table->routes[table->current_size++]; + + route->message_type = ao2_bump(message_type); + route->callback = callback; + route->data = data; + + return 0; } -static int route_cmp(void *obj, void *arg, int flags) +static int table_remove_route(struct route_table *table, + struct stasis_message_type *message_type) { - const struct stasis_message_route *left = obj; - const struct stasis_message_route *right = arg; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type; - - return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0; + size_t idx; + + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + ao2_cleanup(message_type); + table->routes[idx] = + table->routes[--table->current_size]; + return 0; + } + } + return -1; } /*! \internal */ @@ -77,11 +124,11 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct ao2_container *routes; - /*! Subscribed routes for \ref stasi_cache_update messages */ - struct ao2_container *cache_routes; + struct route_table *routes; + /*! Subscribed routes for \ref stasis_cache_update messages */ + struct route_table *cache_routes; /*! Route of last resort */ - struct stasis_message_route *default_route; + struct stasis_message_route default_route; }; static void router_dtor(void *obj) @@ -92,66 +139,60 @@ static void router_dtor(void *obj) ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; - ao2_cleanup(router->routes); + ast_free(router->routes); router->routes = NULL; - ao2_cleanup(router->cache_routes); + ast_free(router->cache_routes); router->cache_routes = NULL; - - ao2_cleanup(router->default_route); - router->default_route = NULL; } -static struct stasis_message_route *find_route( +static int find_route( struct stasis_message_router *router, - struct stasis_message *message) + struct stasis_message *message, + struct stasis_message_route *route_out) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route *route = NULL; struct stasis_message_type *type = stasis_message_type(message); SCOPED_AO2LOCK(lock, router); + ast_assert(route_out != NULL); + if (type == stasis_cache_update_type()) { /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = ao2_find(router->cache_routes, update->type, OBJ_KEY); + route = table_find_route(router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = ao2_find(router->routes, type, OBJ_KEY); + route = table_find_route(router->routes, type); } - if (route == NULL) { + if (route == NULL && router->default_route.callback) { /* Maybe the default route, then? */ - if ((route = router->default_route)) { - ao2_ref(route, +1); - } + route = &router->default_route; } - if (route == NULL) { - return NULL; + if (!route) { + return -1; } - ao2_ref(route, +1); - return route; + *route_out = *route; + return 0; } static void router_dispatch(void *data, struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { struct stasis_message_router *router = data; - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route route; - route = find_route(router, message); - - if (route) { - route->callback(route->data, sub, topic, message); + if (find_route(router, message, &route) == 0) { + route.callback(route.data, sub, message); } - if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(router); } @@ -167,14 +208,12 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash, - route_cmp); + router->routes = ast_calloc(1, sizeof(*router->routes)); if (!router->routes) { return NULL; } - router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, - route_hash, route_cmp); + router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); if (!router->cache_routes) { return NULL; } @@ -216,100 +255,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router) return stasis_subscription_is_done(router->subscription); } - -static struct stasis_message_route *route_create( - struct stasis_message_type *message_type, - stasis_subscription_cb callback, - void *data) -{ - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = ao2_alloc(sizeof(*route), route_dtor); - if (!route) { - return NULL; - } - - if (message_type) { - ao2_ref(message_type, +1); - } - route->message_type = message_type; - route->callback = callback; - route->data = data; - - ao2_ref(route, +1); - return route; -} - -static int add_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->routes, route); - return 0; -} - -static int add_cache_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->cache_routes, route->message_type, - OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->cache_routes, route); - return 0; -} - int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->routes, message_type, callback, data); } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_cache_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->cache_routes, message_type, callback, data); } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->routes, message_type); } void stasis_message_router_remove_cache_update( @@ -317,9 +283,7 @@ void stasis_message_router_remove_cache_update( struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->cache_routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->cache_routes, message_type); } int stasis_message_router_set_default(struct stasis_message_router *router, @@ -327,7 +291,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router, void *data) { SCOPED_AO2LOCK(lock, router); - ao2_cleanup(router->default_route); - router->default_route = route_create(NULL, callback, data); - return router->default_route ? 0 : -1; + router->default_route.callback = callback; + router->default_route.data = data; + /* While this implementation can never fail, it used to be able to */ + return 0; } |