diff options
-rw-r--r-- | include/asterisk/lock.h | 31 | ||||
-rw-r--r-- | include/asterisk/vector.h | 134 | ||||
-rw-r--r-- | main/stasis.c | 122 | ||||
-rw-r--r-- | main/stasis_message_router.c | 186 |
4 files changed, 300 insertions, 173 deletions
diff --git a/include/asterisk/lock.h b/include/asterisk/lock.h index d76a8d185..7741654a0 100644 --- a/include/asterisk/lock.h +++ b/include/asterisk/lock.h @@ -339,6 +339,28 @@ int ast_find_lock_info(void *lock_addr, char *filename, size_t filename_size, in * used during deadlock avoidance, to preserve the original location where * a lock was originally acquired. */ +#define AO2_DEADLOCK_AVOIDANCE(obj) \ + do { \ + char __filename[80], __func[80], __mutex_name[80]; \ + int __lineno; \ + int __res = ast_find_lock_info(ao2_object_get_lockaddr(obj), __filename, sizeof(__filename), &__lineno, __func, sizeof(__func), __mutex_name, sizeof(__mutex_name)); \ + int __res2 = ao2_unlock(obj); \ + usleep(1); \ + if (__res < 0) { /* Could happen if the ao2 object does not have a mutex. */ \ + if (__res2) { \ + ast_log(LOG_WARNING, "Could not unlock ao2 object '%s': %s and no lock info found! I will NOT try to relock.\n", #obj, strerror(__res2)); \ + } else { \ + ao2_lock(obj); \ + } \ + } else { \ + if (__res2) { \ + ast_log(LOG_WARNING, "Could not unlock ao2 object '%s': %s. {{{Originally locked at %s line %d: (%s) '%s'}}} I will NOT try to relock.\n", #obj, strerror(__res2), __filename, __lineno, __func, __mutex_name); \ + } else { \ + __ao2_lock(obj, AO2_LOCK_REQ_MUTEX, __filename, __func, __lineno, __mutex_name); \ + } \ + } \ + } while (0) + #define CHANNEL_DEADLOCK_AVOIDANCE(chan) \ do { \ char __filename[80], __func[80], __mutex_name[80]; \ @@ -493,12 +515,17 @@ static inline void delete_reentrancy_cs(struct ast_lock_track **plt) #else /* !DEBUG_THREADS */ -#define CHANNEL_DEADLOCK_AVOIDANCE(chan) \ +#define AO2_DEADLOCK_AVOIDANCE(obj) \ + ao2_unlock(obj); \ + usleep(1); \ + ao2_lock(obj); + +#define CHANNEL_DEADLOCK_AVOIDANCE(chan) \ ast_channel_unlock(chan); \ usleep(1); \ ast_channel_lock(chan); -#define DEADLOCK_AVOIDANCE(lock) \ +#define DEADLOCK_AVOIDANCE(lock) \ do { \ int __res; \ if (!(__res = ast_mutex_unlock(lock))) { \ diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h index f5d3e9a14..4efc5ce4f 100644 --- a/include/asterisk/vector.h +++ b/include/asterisk/vector.h @@ -33,9 +33,14 @@ * \since 12 */ -/*! \brief Define a vector structure */ -#define ast_vector(type) \ - struct { \ +/*! + * \brief Define a vector structure + * + * \param name Optional vector struct name. + * \param type Vector element type. + */ +#define ast_vector(name, type) \ + struct name { \ type *elems; \ size_t max; \ size_t current; \ @@ -55,15 +60,15 @@ */ #define ast_vector_init(vec, size) ({ \ size_t __size = (size); \ - size_t alloc_size = __size * sizeof(*(vec).elems); \ - (vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \ - (vec).current = 0; \ - if ((vec).elems) { \ - (vec).max = __size; \ + size_t alloc_size = __size * sizeof(*((vec)->elems)); \ + (vec)->elems = alloc_size ? ast_malloc(alloc_size) : NULL; \ + (vec)->current = 0; \ + if ((vec)->elems) { \ + (vec)->max = __size; \ } else { \ - (vec).max = 0; \ + (vec)->max = 0; \ } \ - alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \ + (alloc_size == 0 || (vec)->elems != NULL) ? 0 : -1; \ }) /*! @@ -75,10 +80,10 @@ * \param vec Vector to deallocate. */ #define ast_vector_free(vec) do { \ - ast_free((vec).elems); \ - (vec).elems = NULL; \ - (vec).max = 0; \ - (vec).current = 0; \ + ast_free((vec)->elems); \ + (vec)->elems = NULL; \ + (vec)->max = 0; \ + (vec)->current = 0; \ } while (0) /*! @@ -90,25 +95,24 @@ * \return 0 on success. * \return Non-zero on failure. */ -#define ast_vector_append(vec, elem) ({ \ - int res = 0; \ - \ - if ((vec).current + 1 > (vec).max) { \ - size_t new_max = (vec).max ? 2 * (vec).max : 1; \ - typeof((vec).elems) new_elems = ast_realloc( \ - (vec).elems, new_max * sizeof(*new_elems)); \ - if (new_elems) { \ - (vec).elems = new_elems; \ - (vec).max = new_max; \ - } else { \ - res = -1; \ - } \ - } \ - \ - if (res == 0) { \ - (vec).elems[(vec).current++] = (elem); \ - } \ - res; \ +#define ast_vector_append(vec, elem) ({ \ + int res = 0; \ + do { \ + if ((vec)->current + 1 > (vec)->max) { \ + size_t new_max = (vec)->max ? 2 * (vec)->max : 1; \ + typeof((vec)->elems) new_elems = ast_realloc( \ + (vec)->elems, new_max * sizeof(*new_elems)); \ + if (new_elems) { \ + (vec)->elems = new_elems; \ + (vec)->max = new_max; \ + } else { \ + res = -1; \ + break; \ + } \ + } \ + (vec)->elems[(vec)->current++] = (elem); \ + } while (0); \ + res; \ }) /*! @@ -122,11 +126,11 @@ * \return The element that was removed. */ #define ast_vector_remove_unordered(vec, idx) ({ \ - typeof((vec).elems[0]) res; \ + typeof((vec)->elems[0]) res; \ size_t __idx = (idx); \ - ast_assert(__idx < (vec).current); \ - res = (vec).elems[__idx]; \ - (vec).elems[__idx] = (vec).elems[--(vec).current]; \ + ast_assert(__idx < (vec)->current); \ + res = (vec)->elems[__idx]; \ + (vec)->elems[__idx] = (vec)->elems[--(vec)->current]; \ res; \ }) @@ -137,15 +141,18 @@ * \param vec Vector to remove from. * \param value Value to pass into comparator. * \param cmp Comparator function/macros (called as \c cmp(elem, value)) + * \param cleanup How to cleanup a removed element macro/function. + * * \return 0 if element was removed. * \return Non-zero if element was not in the vector. */ -#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \ +#define ast_vector_remove_cmp_unordered(vec, value, cmp, cleanup) ({ \ int res = -1; \ size_t idx; \ typeof(value) __value = (value); \ - for (idx = 0; idx < (vec).current; ++idx) { \ - if (cmp((vec).elems[idx], __value)) { \ + for (idx = 0; idx < (vec)->current; ++idx) { \ + if (cmp((vec)->elems[idx], __value)) { \ + cleanup((vec)->elems[idx]); \ ast_vector_remove_unordered((vec), idx); \ res = 0; \ break; \ @@ -154,20 +161,39 @@ res; \ }) -/*! \brief Default comparator for ast_vector_remove_elem_unordered() */ -#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b)) +/*! + * \brief Default comparator for ast_vector_remove_elem_unordered() + * + * \param elem Element to compare against + * \param value Value to compare with the vector element. + * + * \return 0 if element does not match. + * \return Non-zero if element matches. + */ +#define AST_VECTOR_ELEM_DEFAULT_CMP(elem, value) ((elem) == (value)) + +/*! + * \brief Vector element cleanup that does nothing. + * + * \param elem Element to cleanup + * + * \return Nothing + */ +#define AST_VECTOR_ELEM_CLEANUP_NOOP(elem) /*! * \brief Remove an element from a vector. * * \param vec Vector to remove from. * \param elem Element to remove + * \param cleanup How to cleanup a removed element macro/function. + * * \return 0 if element was removed. * \return Non-zero if element was not in the vector. */ -#define ast_vector_remove_elem_unordered(vec, elem) ({ \ - ast_vector_remove_cmp_unordered((vec), (elem), \ - AST_VECTOR_DEFAULT_CMP); \ +#define ast_vector_remove_elem_unordered(vec, elem, cleanup) ({ \ + ast_vector_remove_cmp_unordered((vec), (elem), \ + AST_VECTOR_ELEM_DEFAULT_CMP, cleanup); \ }) /*! @@ -176,7 +202,19 @@ * \param vec Vector to query. * \return Number of elements in the vector. */ -#define ast_vector_size(vec) (vec).current +#define ast_vector_size(vec) (vec)->current + +/*! + * \brief Get an address of element in a vector. + * + * \param vec Vector to query. + * \param idx Index of the element to get address of. + */ +#define ast_vector_get_addr(vec, idx) ({ \ + size_t __idx = (idx); \ + ast_assert(__idx < (vec)->current); \ + &(vec)->elems[__idx]; \ +}) /*! * \brief Get an element from a vector. @@ -186,8 +224,8 @@ */ #define ast_vector_get(vec, idx) ({ \ size_t __idx = (idx); \ - ast_assert(__idx < (vec).current); \ - (vec).elems[__idx]; \ + ast_assert(__idx < (vec)->current); \ + (vec)->elems[__idx]; \ }) #endif /* _ASTERISK_VECTOR_H */ diff --git a/main/stasis.c b/main/stasis.c index eabdfdc1c..db95986ed 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -140,10 +140,10 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); struct stasis_topic { char *name; /*! Variable length array of the subscribers */ - ast_vector(struct stasis_subscription *) subscribers; + ast_vector(, struct stasis_subscription *) subscribers; /*! Topics forwarding into this topic */ - ast_vector(struct stasis_topic *) upstream_topics; + ast_vector(, struct stasis_topic *) upstream_topics; }; /* Forward declarations for the tightly-coupled subscription object */ @@ -152,18 +152,28 @@ static int topic_add_subscription(struct stasis_topic *topic, static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); +/*! \brief Lock two topics. */ +#define topic_lock_both(topic1, topic2) \ + do { \ + ao2_lock(topic1); \ + while (ao2_trylock(topic2)) { \ + AO2_DEADLOCK_AVOIDANCE(topic1); \ + } \ + } while (0) + static void topic_dtor(void *obj) { struct stasis_topic *topic = obj; /* Subscribers hold a reference to topics, so they should all be * unsubscribed before we get here. */ - ast_assert(ast_vector_size(topic->subscribers) == 0); + ast_assert(ast_vector_size(&topic->subscribers) == 0); + ast_free(topic->name); topic->name = NULL; - ast_vector_free(topic->subscribers); - ast_vector_free(topic->upstream_topics); + ast_vector_free(&topic->subscribers); + ast_vector_free(&topic->upstream_topics); } struct stasis_topic *stasis_topic_create(const char *name) @@ -182,8 +192,8 @@ struct stasis_topic *stasis_topic_create(const char *name) return NULL; } - res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX); - res |= ast_vector_init(topic->upstream_topics, 0); + res |= ast_vector_init(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); + res |= ast_vector_init(&topic->upstream_topics, 0); if (res != 0) { return NULL; @@ -280,6 +290,10 @@ struct stasis_subscription *internal_stasis_subscribe( { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); + if (!topic) { + return NULL; + } + sub = ao2_alloc(sizeof(*sub), subscription_dtor); if (!sub) { return NULL; @@ -414,8 +428,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) struct stasis_topic *topic = sub->topic; SCOPED_AO2LOCK(lock_topic, topic); - for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { - if (ast_vector_get(topic->subscribers, i) == sub) { + for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) { + if (ast_vector_get(&topic->subscribers, i) == sub) { return 1; } } @@ -466,11 +480,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs * * If we bumped the refcount here, the owner would have to unsubscribe * and cleanup, which is a bit awkward. */ - ast_vector_append(topic->subscribers, sub); + ast_vector_append(&topic->subscribers, sub); - for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) { topic_add_subscription( - ast_vector_get(topic->upstream_topics, idx), sub); + ast_vector_get(&topic->upstream_topics, idx), sub); } return 0; @@ -481,12 +495,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s size_t idx; SCOPED_AO2LOCK(lock_topic, topic); - for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) { topic_remove_subscription( - ast_vector_get(topic->upstream_topics, idx), sub); + ast_vector_get(&topic->upstream_topics, idx), sub); } - return ast_vector_remove_elem_unordered(topic->subscribers, sub); + return ast_vector_remove_elem_unordered(&topic->subscribers, sub, + AST_VECTOR_ELEM_CLEANUP_NOOP); } /*! @@ -512,7 +527,7 @@ static void dispatch_message(struct stasis_subscription *sub, ao2_bump(message); if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { /* Push failed; ugh. */ - ast_log(LOG_DEBUG, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping dispatch\n"); ao2_cleanup(message); } } else { @@ -521,26 +536,28 @@ static void dispatch_message(struct stasis_subscription *sub, } } -void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message) +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) { size_t i; - /* The topic may be unref'ed by the subscription invocation. - * Make sure we hold onto a reference while dispatching. */ - RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic), - ao2_cleanup); - SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); ast_assert(message != NULL); - for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { - struct stasis_subscription *sub = - ast_vector_get(topic->subscribers, i); + /* + * The topic may be unref'ed by the subscription invocation. + * Make sure we hold onto a reference while dispatching. + */ + ao2_ref(topic, +1); + ao2_lock(topic); + for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) { + struct stasis_subscription *sub = ast_vector_get(&topic->subscribers, i); ast_assert(sub != NULL); dispatch_message(sub, message); } + ao2_unlock(topic); + ao2_ref(topic, -1); } /*! @@ -570,23 +587,26 @@ static void forward_dtor(void *obj) struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) { - if (forward) { - int idx; + int idx; + struct stasis_topic *from; + struct stasis_topic *to; - struct stasis_topic *from = forward->from_topic; - struct stasis_topic *to = forward->to_topic; + if (!forward) { + return NULL; + } - SCOPED_AO2LOCK(to_lock, to); + from = forward->from_topic; + to = forward->to_topic; - ast_vector_remove_elem_unordered(to->upstream_topics, from); + topic_lock_both(to, from); + ast_vector_remove_elem_unordered(&to->upstream_topics, from, + AST_VECTOR_ELEM_CLEANUP_NOOP); - ao2_lock(from); - for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) { - topic_remove_subscription( - from, ast_vector_get(to->subscribers, idx)); - } - ao2_unlock(from); + for (idx = 0; idx < ast_vector_size(&to->subscribers); ++idx) { + topic_remove_subscription(from, ast_vector_get(&to->subscribers, idx)); } + ao2_unlock(from); + ao2_unlock(to); ao2_cleanup(forward); @@ -596,6 +616,8 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) { + int res; + size_t idx; RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup); if (!from_topic || !to_topic) { @@ -610,23 +632,19 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, forward->from_topic = ao2_bump(from_topic); forward->to_topic = ao2_bump(to_topic); - { - SCOPED_AO2LOCK(lock, to_topic); - int res; - - res = ast_vector_append(to_topic->upstream_topics, from_topic); - if (res != 0) { - return NULL; - } + topic_lock_both(to_topic, from_topic); + res = ast_vector_append(&to_topic->upstream_topics, from_topic); + if (res != 0) { + ao2_unlock(from_topic); + ao2_unlock(to_topic); + return NULL; + } - { - SCOPED_AO2LOCK(lock, from_topic); - size_t idx; - for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) { - topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx)); - } - } + for (idx = 0; idx < ast_vector_size(&to_topic->subscribers); ++idx) { + topic_add_subscription(from_topic, ast_vector_get(&to_topic->subscribers, idx)); } + ao2_unlock(from_topic); + ao2_unlock(to_topic); return ao2_bump(forward); } diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 8c82decfe..7ed9bcb83 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" +#include "asterisk/vector.h" /*! \internal */ struct stasis_message_route { @@ -44,19 +45,13 @@ struct stasis_message_route { void *data; }; -struct route_table { - /*! Current number of entries in the route table */ - size_t current_size; - /*! Allocated number of entires in the route table */ - size_t max_size; - /*! The route table itself */ - struct stasis_message_route routes[]; -}; +ast_vector(route_table, struct stasis_message_route); -static struct stasis_message_route *table_find_route(struct route_table *table, +static struct stasis_message_route *route_table_find(struct route_table *table, struct stasis_message_type *message_type) { size_t idx; + struct stasis_message_route *route; /* While a linear search for routes may seem very inefficient, most * route tables have six routes or less. For such small data, it's @@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table, * tables, then we can look into containers with more efficient * lookups. */ - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - return &table->routes[idx]; + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + if (route->message_type == message_type) { + return route; } } return NULL; } -static int table_add_route(struct route_table **table_ptr, +/*! + * \brief route_table comparator for ast_vector_remove_cmp_unordered() + * + * \param elem Element to compare against + * \param value Value to compare with the vector element. + * + * \return 0 if element does not match. + * \return Non-zero if element matches. + */ +#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value)) + +/*! + * \brief route_table vector element cleanup. + * + * \param elem Element to cleanup + * + * \return Nothing + */ +#define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type) + +static int route_table_remove(struct route_table *table, + struct stasis_message_type *message_type) +{ + return ast_vector_remove_cmp_unordered(table, message_type, ROUTE_TABLE_ELEM_CMP, + ROUTE_TABLE_ELEM_CLEANUP); +} + +static int route_table_add(struct route_table *table, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - struct route_table *table = *table_ptr; - struct stasis_message_route *route; - - ast_assert(table_find_route(table, message_type) == NULL); - - if (table->current_size + 1 > table->max_size) { - size_t new_max_size = table->max_size ? table->max_size * 2 : 1; - struct route_table *new_table = ast_realloc(table, - sizeof(*new_table) + - sizeof(new_table->routes[0]) * new_max_size); - if (!new_table) { - return -1; - } - *table_ptr = table = new_table; - table->max_size = new_max_size; - } + struct stasis_message_route route; + int res; - route = &table->routes[table->current_size++]; + ast_assert(callback != NULL); + ast_assert(route_table_find(table, message_type) == NULL); - route->message_type = ao2_bump(message_type); - route->callback = callback; - route->data = data; + route.message_type = ao2_bump(message_type); + route.callback = callback; + route.data = data; - return 0; + res = ast_vector_append(table, route); + if (res) { + ROUTE_TABLE_ELEM_CLEANUP(route); + } + return res; } -static int table_remove_route(struct route_table *table, - struct stasis_message_type *message_type) +static void route_table_dtor(struct route_table *table) { size_t idx; + struct stasis_message_route *route; - for (idx = 0; idx < table->current_size; ++idx) { - if (table->routes[idx].message_type == message_type) { - ao2_cleanup(message_type); - table->routes[idx] = - table->routes[--table->current_size]; - return 0; - } + for (idx = 0; idx < ast_vector_size(table); ++idx) { + route = ast_vector_get_addr(table, idx); + ROUTE_TABLE_ELEM_CLEANUP(*route); } - return -1; + ast_vector_free(table); } /*! \internal */ @@ -124,9 +134,9 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct route_table *routes; + struct route_table routes; /*! Subscribed routes for \ref stasis_cache_update messages */ - struct route_table *cache_routes; + struct route_table cache_routes; /*! Route of last resort */ struct stasis_message_route default_route; }; @@ -137,13 +147,11 @@ static void router_dtor(void *obj) ast_assert(!stasis_subscription_is_subscribed(router->subscription)); ast_assert(stasis_subscription_is_done(router->subscription)); - router->subscription = NULL; - ast_free(router->routes); - router->routes = NULL; + router->subscription = NULL; - ast_free(router->cache_routes); - router->cache_routes = NULL; + route_table_dtor(&router->routes); + route_table_dtor(&router->cache_routes); } static int find_route( @@ -161,12 +169,12 @@ static int find_route( /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = table_find_route(router->cache_routes, update->type); + route = route_table_find(&router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = table_find_route(router->routes, type); + route = route_table_find(&router->routes, type); } if (route == NULL && router->default_route.callback) { @@ -201,6 +209,7 @@ static void router_dispatch(void *data, struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic) { + int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); router = ao2_alloc(sizeof(*router), router_dtor); @@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ast_calloc(1, sizeof(*router->routes)); - if (!router->routes) { - return NULL; - } - - router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); - if (!router->cache_routes) { + res = 0; + res |= ast_vector_init(&router->routes, 0); + res |= ast_vector_init(&router->cache_routes, 0); + if (res) { return NULL; } @@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->routes, message_type, callback, data); + ao2_unlock(router); + return res; } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - SCOPED_AO2LOCK(lock, router); - return table_add_route(&router->cache_routes, message_type, callback, data); + int res; + + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot cache a route to NULL type. */ + return -1; + } + ao2_lock(router); + res = route_table_add(&router->cache_routes, message_type, callback, data); + ao2_unlock(router); + return res; } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->routes, message_type); + ao2_unlock(router); } void stasis_message_router_remove_cache_update( struct stasis_message_router *router, struct stasis_message_type *message_type) { - SCOPED_AO2LOCK(lock, router); - table_remove_route(router->cache_routes, message_type); + ast_assert(router != NULL); + + if (!message_type) { + /* Cannot remove a NULL type. */ + return; + } + ao2_lock(router); + route_table_remove(&router->cache_routes, message_type); + ao2_unlock(router); } int stasis_message_router_set_default(struct stasis_message_router *router, - stasis_subscription_cb callback, - void *data) + stasis_subscription_cb callback, + void *data) { - SCOPED_AO2LOCK(lock, router); + ast_assert(router != NULL); + ast_assert(callback != NULL); + + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; + ao2_unlock(router); /* While this implementation can never fail, it used to be able to */ return 0; } |