diff options
Diffstat (limited to 'main/stasis.c')
-rw-r--r-- | main/stasis.c | 389 |
1 files changed, 215 insertions, 174 deletions
diff --git a/main/stasis.c b/main/stasis.c index 1a03bb3d4..42c901769 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -29,15 +29,15 @@ #include "asterisk.h" -ASTERISK_FILE_VERSION(__FILE__, "$Revision$") +ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/astobj2.h" #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" -#include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" +#include "asterisk/vector.h" /*! * \page stasis-impl Stasis Implementation Notes @@ -134,24 +134,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 -/*! Threadpool for dispatching notifications to subscribers */ -static struct ast_threadpool *pool; - STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ struct stasis_topic { char *name; /*! Variable length array of the subscribers */ - struct stasis_subscription **subscribers; - /*! Allocated length of the subscribers array */ - size_t num_subscribers_max; - /*! Current size of the subscribers array */ - size_t num_subscribers_current; + ast_vector(struct stasis_subscription *) subscribers; + + /*! Topics forwarding into this topic */ + ast_vector(struct stasis_topic *) upstream_topics; }; /* Forward declarations for the tightly-coupled subscription object */ -static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); +static int topic_add_subscription(struct stasis_topic *topic, + struct stasis_subscription *sub); + +static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); static void topic_dtor(void *obj) { @@ -159,16 +158,18 @@ static void topic_dtor(void *obj) /* Subscribers hold a reference to topics, so they should all be * unsubscribed before we get here. */ - ast_assert(topic->num_subscribers_current == 0); + ast_assert(ast_vector_size(topic->subscribers) == 0); ast_free(topic->name); topic->name = NULL; - ast_free(topic->subscribers); - topic->subscribers = NULL; + + ast_vector_free(topic->subscribers); + ast_vector_free(topic->upstream_topics); } struct stasis_topic *stasis_topic_create(const char *name) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + int res = 0; topic = ao2_alloc(sizeof(*topic), topic_dtor); @@ -181,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name) return NULL; } - topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX; - topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers)); - if (!topic->subscribers) { + res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX); + res |= ast_vector_init(topic->upstream_topics, 0); + + if (res != 0) { return NULL; } @@ -247,7 +249,6 @@ static void subscription_dtor(void *obj) * \param message Message to send. */ static void subscription_invoke(struct stasis_subscription *sub, - struct stasis_topic *topic, struct stasis_message *message) { /* Notify that the final message has been received */ @@ -258,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub, } /* Since sub is mostly immutable, no need to lock sub */ - sub->callback(sub->data, sub, topic, message); + sub->callback(sub->data, sub, message); /* Notify that the final message has been processed */ if (stasis_subscription_final_message(sub, message)) { @@ -268,7 +269,8 @@ static void subscription_invoke(struct stasis_subscription *sub, } } -static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); +static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub); +static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub); struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, @@ -286,10 +288,21 @@ struct stasis_subscription *internal_stasis_subscribe( ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); if (needs_mailbox) { - sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + /* With a small number of subscribers, a thread-per-sub is + * acceptable. If our usage changes so that we have larger + * numbers of subscribers, we'll probably want to consider + * a threadpool. We had that originally, but with so few + * subscribers it was actually a performance loss instead of + * a gain. + */ + sub->mailbox = ast_taskprocessor_get(sub->uniqueid, + TPS_REF_DEFAULT); if (!sub->mailbox) { return NULL; } + ast_taskprocessor_set_local(sub->mailbox, sub); + /* Taskprocessor has a reference */ + ao2_ref(sub, +1); } ao2_ref(topic, +1); @@ -302,7 +315,7 @@ struct stasis_subscription *internal_stasis_subscribe( if (topic_add_subscription(topic, sub) != 0) { return NULL; } - send_subscription_change_message(topic, sub->uniqueid, "Subscribe"); + send_subscription_subscribe(topic, sub); ao2_ref(sub, +1); return sub; @@ -316,29 +329,42 @@ struct stasis_subscription *stasis_subscribe( return internal_stasis_subscribe(topic, callback, data, 1); } +static int sub_cleanup(void *data) +{ + struct stasis_subscription *sub = data; + ao2_cleanup(sub); + return 0; +} + struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { - if (sub) { - size_t i; - /* The subscription may be the last ref to this topic. Hold - * the topic ref open until after the unlock. */ - RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic), - ao2_cleanup); - SCOPED_AO2LOCK(lock_topic, topic); + /* The subscription may be the last ref to this topic. Hold + * the topic ref open until after the unlock. */ + RAII_VAR(struct stasis_topic *, topic, + ao2_bump(sub ? sub->topic : NULL), ao2_cleanup); - for (i = 0; i < topic->num_subscribers_current; ++i) { - if (topic->subscribers[i] == sub) { - send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe"); - /* swap [i] with last entry; remove last entry */ - topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current]; - /* Unsubscribing unrefs the subscription */ - ao2_cleanup(sub); - return NULL; - } - } + if (!sub) { + return NULL; + } + + /* We have to remove the subscription first, to ensure the unsubscribe + * is the final message */ + if (topic_remove_subscription(sub->topic, sub) != 0) { + ast_log(LOG_ERROR, + "Internal error: subscription has invalid topic\n"); + return NULL; + } + + /* Now let everyone know about the unsubscribe */ + send_subscription_unsubscribe(topic, sub); - ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n"); + /* When all that's done, remove the ref the mailbox has on the sub */ + if (sub->mailbox) { + ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub); } + + /* Unsubscribing unrefs the subscription */ + ao2_cleanup(sub); return NULL; } @@ -388,8 +414,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) struct stasis_topic *topic = sub->topic; SCOPED_AO2LOCK(lock_topic, topic); - for (i = 0; i < topic->num_subscribers_current; ++i) { - if (topic->subscribers[i] == sub) { + for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { + if (ast_vector_get(topic->subscribers, i) == sub) { return 1; } } @@ -431,74 +457,36 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st */ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) { - struct stasis_subscription **subscribers; + size_t idx; SCOPED_AO2LOCK(lock, topic); - /* Increase list size, if needed */ - if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) { - subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers)); - if (!subscribers) { - return -1; - } - topic->subscribers = subscribers; - topic->num_subscribers_max *= 2; - } - /* The reference from the topic to the subscription is shared with * the owner of the subscription, which will explicitly unsubscribe * to release it. * * If we bumped the refcount here, the owner would have to unsubscribe * and cleanup, which is a bit awkward. */ - topic->subscribers[topic->num_subscribers_current++] = sub; - return 0; -} + ast_vector_append(topic->subscribers, sub); -/*! - * \internal - * \brief Information needed to dispatch a message to a subscription - */ -struct dispatch { - /*! Topic message was published to */ - struct stasis_topic *topic; - /*! The message itself */ - struct stasis_message *message; - /*! Subscription receiving the message */ - struct stasis_subscription *sub; -}; + for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + topic_add_subscription( + ast_vector_get(topic->upstream_topics, idx), sub); + } -static void dispatch_dtor(void *data) -{ - struct dispatch *dispatch = data; - ao2_cleanup(dispatch->topic); - ao2_cleanup(dispatch->message); - ao2_cleanup(dispatch->sub); + return 0; } -static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) +static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + size_t idx; + SCOPED_AO2LOCK(lock_topic, topic); - ast_assert(topic != NULL); - ast_assert(message != NULL); - ast_assert(sub != NULL); - - dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor); - if (!dispatch) { - return NULL; + for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) { + topic_remove_subscription( + ast_vector_get(topic->upstream_topics, idx), sub); } - dispatch->topic = topic; - ao2_ref(topic, +1); - - dispatch->message = message; - ao2_ref(message, +1); - - dispatch->sub = sub; - ao2_ref(sub, +1); - - ao2_ref(dispatch, +1); - return dispatch; + return ast_vector_remove_elem_unordered(topic->subscribers, sub); } /*! @@ -506,16 +494,34 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi * \param data \ref dispatch object * \return 0 */ -static int dispatch_exec(void *data) +static int dispatch_exec(struct ast_taskprocessor_local *local) { - RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); + struct stasis_subscription *sub = local->local_data; + struct stasis_message *message = local->data; - subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); + subscription_invoke(sub, message); + ao2_cleanup(message); return 0; } -void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +static void dispatch_message(struct stasis_subscription *sub, + struct stasis_message *message) +{ + if (sub->mailbox) { + ao2_bump(message); + if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { + /* Push failed; ugh. */ + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + ao2_cleanup(message); + } + } else { + /* Dispatch directly */ + subscription_invoke(sub, message); + } +} + +void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message) { size_t i; /* The topic may be unref'ed by the subscription invocation. @@ -525,70 +531,104 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); - ast_assert(publisher_topic != NULL); ast_assert(message != NULL); - for (i = 0; i < topic->num_subscribers_current; ++i) { - struct stasis_subscription *sub = topic->subscribers[i]; + for (i = 0; i < ast_vector_size(topic->subscribers); ++i) { + struct stasis_subscription *sub = + ast_vector_get(topic->subscribers, i); ast_assert(sub != NULL); - if (sub->mailbox) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); - - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); - break; - } - - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - /* Ownership transferred to mailbox. - * Don't increment ref, b/c the task processor - * may have already gotten rid of the object. - */ - dispatch = NULL; - } - } else { - /* Dispatch directly */ - subscription_invoke(sub, publisher_topic, message); - } + dispatch_message(sub, message); } } -void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +/*! + * \brief Forwarding information + * + * Any message posted to \a from_topic is forwarded to \a to_topic. + * + * In cases where both the \a from_topic and \a to_topic need to be locked, + * always lock the \a to_topic first, then the \a from_topic. Lest you deadlock. + */ +struct stasis_forward { + /*! Originating topic */ + struct stasis_topic *from_topic; + /*! Destination topic */ + struct stasis_topic *to_topic; +}; + +static void forward_dtor(void *obj) { - stasis_forward_message(topic, topic, message); + struct stasis_forward *forward = obj; + + ao2_cleanup(forward->from_topic); + forward->from_topic = NULL; + ao2_cleanup(forward->to_topic); + forward->to_topic = NULL; } -/*! \brief Forwarding subscriber */ -static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) +struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward) { - struct stasis_topic *to_topic = data; - stasis_forward_message(to_topic, topic, message); + if (forward) { + int idx; - if (stasis_subscription_final_message(sub, message)) { - ao2_cleanup(to_topic); + struct stasis_topic *from = forward->from_topic; + struct stasis_topic *to = forward->to_topic; + + SCOPED_AO2LOCK(to_lock, to); + + ast_vector_remove_elem_unordered(to->upstream_topics, from); + + ao2_lock(from); + for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) { + topic_remove_subscription( + from, ast_vector_get(to->subscribers, idx)); + } + ao2_unlock(from); } + + ao2_cleanup(forward); + + return NULL; } -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic) +struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, + struct stasis_topic *to_topic) { - struct stasis_subscription *sub; + RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup); + if (!from_topic || !to_topic) { return NULL; } - /* Forwarding subscriptions should dispatch directly instead of having a - * mailbox. Otherwise, messages forwarded to the same topic from - * different topics may get reordered. Which is bad. - */ - sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); - if (sub) { - /* hold a ref to to_topic for this forwarding subscription */ - ao2_ref(to_topic, +1); + forward = ao2_alloc(sizeof(*forward), forward_dtor); + if (!forward) { + return NULL; } - return sub; + + forward->from_topic = ao2_bump(from_topic); + forward->to_topic = ao2_bump(to_topic); + + { + SCOPED_AO2LOCK(lock, to_topic); + int res; + + res = ast_vector_append(to_topic->upstream_topics, from_topic); + if (res != 0) { + return NULL; + } + + { + SCOPED_AO2LOCK(lock, from_topic); + size_t idx; + for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) { + topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx)); + } + } + } + + return ao2_bump(forward); } static void subscription_change_dtor(void *obj) @@ -598,7 +638,7 @@ static void subscription_change_dtor(void *obj) ao2_cleanup(change->topic); } -static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description) +static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description) { RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); @@ -616,12 +656,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi return change; } -static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description) +static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub) { RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - change = subscription_change_alloc(topic, uniqueid, description); + /* This assumes that we have already unsubscribed */ + ast_assert(stasis_subscription_is_subscribed(sub)); + + change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe"); if (!change) { return; @@ -636,15 +679,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u stasis_publish(topic, msg); } +static void send_subscription_unsubscribe(struct stasis_topic *topic, + struct stasis_subscription *sub) +{ + RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + + /* This assumes that we have already unsubscribed */ + ast_assert(!stasis_subscription_is_subscribed(sub)); + + change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe"); + + if (!change) { + return; + } + + msg = stasis_message_create(stasis_subscription_change_type(), change); + + if (!msg) { + return; + } + + stasis_publish(topic, msg); + + /* Now we have to dispatch to the subscription itself */ + dispatch_message(sub, msg); +} + struct topic_pool_entry { - struct stasis_subscription *forward; + struct stasis_forward *forward; struct stasis_topic *topic; }; static void topic_pool_entry_dtor(void *obj) { struct topic_pool_entry *entry = obj; - entry->forward = stasis_unsubscribe(entry->forward); + entry->forward = stasis_forward_cancel(entry->forward); ao2_cleanup(entry->topic); entry->topic = NULL; } @@ -731,13 +801,6 @@ void stasis_log_bad_type_access(const char *name) ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name); } -/*! \brief Shutdown function */ -static void stasis_exit(void) -{ - ast_threadpool_shutdown(pool); - pool = NULL; -} - /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { @@ -748,36 +811,14 @@ int stasis_init(void) { int cache_init; - struct ast_threadpool_options opts; - /* Be sure the types are cleaned up after the message bus */ ast_register_cleanup(stasis_cleanup); - ast_register_atexit(stasis_exit); - - if (stasis_config_init() != 0) { - ast_log(LOG_ERROR, "Stasis configuration failed\n"); - return -1; - } if (stasis_wait_init() != 0) { ast_log(LOG_ERROR, "Stasis initialization failed\n"); return -1; } - if (pool) { - ast_log(LOG_ERROR, "Stasis double-initialized\n"); - return -1; - } - - stasis_config_get_threadpool_options(&opts); - ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n", - opts.initial_size, opts.max_size, opts.idle_timeout); - pool = ast_threadpool_create("stasis-core", NULL, &opts); - if (!pool) { - ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n"); - return -1; - } - cache_init = stasis_cache_init(); if (cache_init != 0) { return -1; |