diff options
-rw-r--r-- | include/asterisk/stasis_message_router.h | 27 | ||||
-rw-r--r-- | main/stasis_message_router.c | 99 |
2 files changed, 70 insertions, 56 deletions
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index e7d5a4cc6..81a636dcd 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -47,9 +47,12 @@ struct stasis_message_router; /*! * \brief Create a new message router object. + * * \param topic Topic to subscribe route to. + * * \return New \ref stasis_message_router. * \return \c NULL on error. + * * \since 12 */ struct stasis_message_router *stasis_message_router_create( @@ -59,6 +62,7 @@ struct stasis_message_router *stasis_message_router_create( * \brief Unsubscribe the router from the upstream topic. * * \param router Router to unsubscribe. + * * \since 12 */ void stasis_message_router_unsubscribe(struct stasis_message_router *router); @@ -71,6 +75,7 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router); * vs. stasis_message_router_unsubscribe(). * * \param router Router to unsubscribe. + * * \since 12 */ void stasis_message_router_unsubscribe_and_join( @@ -80,6 +85,7 @@ void stasis_message_router_unsubscribe_and_join( * \brief Returns whether \a router has received its final message. * * \param router Router. + * * \return True (non-zero) if stasis_subscription_final_message() has been * received. * \return False (zero) if waiting for the end. @@ -88,10 +94,15 @@ int stasis_message_router_is_done(struct stasis_message_router *router); /*! * \brief Add a route to a message router. + * * \param router Router to add the route to. * \param message_type Type of message to route. * \param callback Callback to forard messages of \a message_type to. * \param data Data pointer to pass to \a callback. + * + * \retval 0 on success + * \retval -1 on failure + * * \since 12 */ int stasis_message_router_add(struct stasis_message_router *router, @@ -100,10 +111,26 @@ int stasis_message_router_add(struct stasis_message_router *router, void *data); /*! + * \brief Remove a route from a message router. + * + * \param router Router to remove the route from. + * \param message_type Type of message to route. + * + * \since 12 + */ +void stasis_message_router_remove(struct stasis_message_router *router, + struct stasis_message_type *message_type); + +/*! * \brief Sets the default route of a router. + * * \param router Router to set the default route of. * \param callback Callback to forard messages which otherwise have no home. * \param data Data pointer to pass to \a callback. + * + * \retval 0 on success + * \retval -1 on failure + * * \since 12 */ int stasis_message_router_set_default(struct stasis_message_router *router, diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index c7acca1ff..4e7bbb5e9 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -34,8 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" -#define INITIAL_ROUTES_MAX 8 - /*! \internal */ struct stasis_message_route { /*! Message type handle by this route. */ @@ -54,38 +52,53 @@ static void route_dtor(void *obj) route->message_type = NULL; } +static int route_hash(const void *obj, const int flags) +{ + const struct stasis_message_route *route = obj; + const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type; + + return ast_str_hash(stasis_message_type_name(message_type)); +} + +static int route_cmp(void *obj, void *arg, int flags) +{ + const struct stasis_message_route *left = obj; + const struct stasis_message_route *right = arg; + const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type; + + return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0; +} + /*! \internal */ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; - /*! Variable length array of the routes */ - struct stasis_message_route **routes; + /*! Subscribed routes */ + struct ao2_container *routes; /*! Route of last resort */ struct stasis_message_route *default_route; - /*! Allocated length of the routes array */ - size_t num_routes_max; - /*! Current size of the routes array */ - size_t num_routes_current; }; static void router_dtor(void *obj) { struct stasis_message_router *router = obj; - size_t i; ast_assert(!stasis_subscription_is_subscribed(router->subscription)); ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; - for (i = 0; i < router->num_routes_current; ++i) { - ao2_cleanup(router->routes[i]); - router->routes[i] = NULL; - } - ast_free(router->routes); + + ao2_cleanup(router->routes); router->routes = NULL; + ao2_cleanup(router->default_route); router->default_route = NULL; } +static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type) +{ + return ao2_find(router->routes, message_type, OBJ_KEY); +} + static void router_dispatch(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, @@ -95,29 +108,15 @@ static void router_dispatch(void *data, RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); struct stasis_message_type *type = stasis_message_type(message); - size_t i; { SCOPED_AO2LOCK(lock, router); - /* We don't expect many message types, so a simple loop should - * be adequate, even if the complexity is O(n). Sorting the list - * would be an easy way to bring that down to O(log(n)). Using a - * hashtable/ao2_container could be even better. Just be sure to - * profile before you optimize! - */ - route = router->default_route; - for (i = 0; i < router->num_routes_current; ++i) { - if (router->routes[i]->message_type == type) { - route = router->routes[i]; - break; + if (!(route = find_route(router, type))) { + if ((route = router->default_route)) { + ao2_ref(route, +1); } } - - /* Ref the route before leaving the scoped lock */ - if (route) { - ao2_ref(route, +1); - } } if (route) { @@ -141,10 +140,7 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->num_routes_max = INITIAL_ROUTES_MAX; - router->routes = ast_calloc(router->num_routes_max, - sizeof(*router->routes)); - if (!router->routes) { + if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) { return NULL; } @@ -212,31 +208,14 @@ static struct stasis_message_route *route_create( static int add_route(struct stasis_message_router *router, struct stasis_message_route *route) { - struct stasis_message_route **routes; - size_t i; SCOPED_AO2LOCK(lock, router); + RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - /* Check for route conflicts */ - for (i = 0; i < router->num_routes_current; ++i) { - if (router->routes[i]->message_type == route->message_type) { - return -1; - } - } - - /* Increase list size, if needed */ - if (router->num_routes_current + 1 > router->num_routes_max) { - routes = realloc(router->routes, - 2 * router->num_routes_max * sizeof(*routes)); - if (!routes) { - return -1; - } - router->routes = routes; - router->num_routes_max *= 2; + if ((existing_route = find_route(router, route->message_type))) { + return -1; } - - ao2_ref(route, +1); - router->routes[router->num_routes_current++] = route; + ao2_link(router->routes, route); return 0; } @@ -255,6 +234,14 @@ int stasis_message_router_add(struct stasis_message_router *router, return add_route(router, route); } +void stasis_message_router_remove(struct stasis_message_router *router, + struct stasis_message_type *message_type) +{ + SCOPED_AO2LOCK(lock, router); + + ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); +} + int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data) |