diff options
Diffstat (limited to 'main/stasis.c')
-rw-r--r-- | main/stasis.c | 122 |
1 files changed, 70 insertions, 52 deletions
diff --git a/main/stasis.c b/main/stasis.c index eabdfdc1c..db95986ed 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -140,10 +140,10 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); struct stasis_topic { char *name; /*! Variable length array of the subscribers */ - ast_vector(struct stasis_subscription *) subscribers; + ast_vector(, struct stasis_subscription *) subscribers; /*! Topics forwarding into this topic */ - ast_vector(struct stasis_topic *) upstream_topics; + ast_vector(, struct stasis_topic *) upstream_topics; }; /* Forward declarations for the tightly-coupled subscription object */ @@ -152,18 +152,28 @@ static int topic_add_subscription(struct stasis_topic *topic, static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); +/*! \brief Lock two topics. */ +#define topic_lock_both(topic1, topic2) \ + do { \ + ao2_lock(topic1); \ + while (ao2_trylock(topic2)) { \ + AO2_DEADLOCK_AVOIDANCE(topic1); \ + } \ + } while (0) + static void topic_dtor(void *obj) { struct stasis_topic *topic = obj; /* Subscribers hold a reference to topics, so they should all be * unsubscribed before we get here. */ - ast_assert(ast_vector_size(topic->subscribers) == 0); + ast_assert(ast_vector_size(&topic->subscribers) == 0); + ast_free(topic->name); topic->name = NULL; - ast_vector_free(topic->subscribers); - ast_vector_free(topic->upstream_topics); + ast_vector_free(&topic->subscribers); + ast_vector_free(&topic->upstream_topics); } struct stasis_topic *stasis_topic_create(const char *name) @@ -182,8 +192,8 @@ struct stasis_topic *stasis_topic_create(const char *name) return NULL; } - res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX); - res |= ast_vector_init(topic->upstream_topics, 0); + res |= ast_vector_init(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); + res |= ast_vector_init(&topic->upstream_topics, 0); if (res != 0) { return NULL; @@ -280,6 +290,10 @@ struct stasis_subscription *internal_stasis_subscribe( { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); + if (!topic) { + return NULL; + } + sub = ao2_alloc(sizeof(*sub), subscription_dtor); if (!sub) { return NULL; @@ -414,8 +428,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) struct stasis_topic *topic = sub->topic; SCOPED_AO2LOCK(lock_topic, topic); - for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { - if (ast_vector_get(topic->subscribers, i) == sub) { + for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) { + if (ast_vector_get(&topic->subscribers, i) == sub) { return 1; } } @@ -466,11 +480,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs * * If we bumped the refcount here, the owner would have to unsubscribe * and cleanup, which is a bit awkward. */ - ast_vector_append(topic->subscribers, sub); + ast_vector_append(&topic->subscribers, sub); - for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) { topic_add_subscription( - ast_vector_get(topic->upstream_topics, idx), sub); + ast_vector_get(&topic->upstream_topics, idx), sub); } return 0; @@ -481,12 +495,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s size_t idx; SCOPED_AO2LOCK(lock_topic, topic); - for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) { topic_remove_subscription( - ast_vector_get(topic->upstream_topics, idx), sub); + ast_vector_get(&topic->upstream_topics, idx), sub); } - return ast_vector_remove_elem_unordered(topic->subscribers, sub); + return ast_vector_remove_elem_unordered(&topic->subscribers, sub, + AST_VECTOR_ELEM_CLEANUP_NOOP); } /*! @@ -512,7 +527,7 @@ static void dispatch_message(struct stasis_subscription *sub, ao2_bump(message); if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { /* Push failed; ugh. */ - ast_log(LOG_DEBUG, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping dispatch\n"); ao2_cleanup(message); } } else { @@ -521,26 +536,28 @@ static void dispatch_message(struct stasis_subscription *sub, } } -void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message) +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) { size_t i; - /* The topic may be unref'ed by the subscription invocation. - * Make sure we hold onto a reference while dispatching. */ - RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic), - ao2_cleanup); - SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); ast_assert(message != NULL); - for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { - struct stasis_subscription *sub = - ast_vector_get(topic->subscribers, i); + /* + * The topic may be unref'ed by the subscription invocation. + * Make sure we hold onto a reference while dispatching. + */ + ao2_ref(topic, +1); + ao2_lock(topic); + for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) { + struct stasis_subscription *sub = ast_vector_get(&topic->subscribers, i); ast_assert(sub != NULL); dispatch_message(sub, message); } + ao2_unlock(topic); + ao2_ref(topic, -1); } /*! @@ -570,23 +587,26 @@ static void forward_dtor(void *obj) struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) { - if (forward) { - int idx; + int idx; + struct stasis_topic *from; + struct stasis_topic *to; - struct stasis_topic *from = forward->from_topic; - struct stasis_topic *to = forward->to_topic; + if (!forward) { + return NULL; + } - SCOPED_AO2LOCK(to_lock, to); + from = forward->from_topic; + to = forward->to_topic; - ast_vector_remove_elem_unordered(to->upstream_topics, from); + topic_lock_both(to, from); + ast_vector_remove_elem_unordered(&to->upstream_topics, from, + AST_VECTOR_ELEM_CLEANUP_NOOP); - ao2_lock(from); - for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) { - topic_remove_subscription( - from, ast_vector_get(to->subscribers, idx)); - } - ao2_unlock(from); + for (idx = 0; idx < ast_vector_size(&to->subscribers); ++idx) { + topic_remove_subscription(from, ast_vector_get(&to->subscribers, idx)); } + ao2_unlock(from); + ao2_unlock(to); ao2_cleanup(forward); @@ -596,6 +616,8 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) { + int res; + size_t idx; RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup); if (!from_topic || !to_topic) { @@ -610,23 +632,19 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, forward->from_topic = ao2_bump(from_topic); forward->to_topic = ao2_bump(to_topic); - { - SCOPED_AO2LOCK(lock, to_topic); - int res; - - res = ast_vector_append(to_topic->upstream_topics, from_topic); - if (res != 0) { - return NULL; - } + topic_lock_both(to_topic, from_topic); + res = ast_vector_append(&to_topic->upstream_topics, from_topic); + if (res != 0) { + ao2_unlock(from_topic); + ao2_unlock(to_topic); + return NULL; + } - { - SCOPED_AO2LOCK(lock, from_topic); - size_t idx; - for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) { - topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx)); - } - } + for (idx = 0; idx < ast_vector_size(&to_topic->subscribers); ++idx) { + topic_add_subscription(from_topic, ast_vector_get(&to_topic->subscribers, idx)); } + ao2_unlock(from_topic); + ao2_unlock(to_topic); return ao2_bump(forward); } |