diff options
Diffstat (limited to 'main/stasis.c')
-rw-r--r-- | main/stasis.c | 128 |
1 files changed, 117 insertions, 11 deletions
diff --git a/main/stasis.c b/main/stasis.c index b1af7b7f6..1a03bb3d4 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -39,6 +39,95 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/utils.h" #include "asterisk/uuid.h" +/*! + * \page stasis-impl Stasis Implementation Notes + * + * \par Reference counting + * + * Stasis introduces a number of objects, which are tightly related to one + * another. Because we rely on ref-counting for memory management, understanding + * these relationships is important to understanding this code. + * + * \code{.txt} + * + * stasis_topic <----> stasis_subscription + * ^ ^ + * \ / + * \ / + * dispatch + * | + * | + * v + * stasis_message + * | + * | + * v + * stasis_message_type + * + * \endcode + * + * The most troubling thing in this chart is the cyclic reference between + * stasis_topic and stasis_subscription. This is both unfortunate, and + * necessary. Topics need the subscription in order to dispatch messages; + * subscriptions need the topics to unsubscribe and check subscription status. + * + * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the + * topic's reference to a subscription. When the subcription is destroyed, it + * will remove its reference to the topic. + * + * This means that until a subscription has be explicitly unsubscribed, it will + * not be destroyed. Neither will a topic be destroyed while it has subscribers. + * The destructors of both have assertions regarding this to catch ref-counting + * problems where a subscription or topic has had an extra ao2_cleanup(). + * + * The \ref dispatch object is a transient object, which is posted to a + * subscription's taskprocessor to send a message to the subscriber. They have + * short life cycles, allocated on one thread, destroyed on another. + * + * During shutdown, or the deletion of a domain object, there are a flurry of + * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages + * are processed. Any one of these cleanups could be the one to actually destroy + * a given object, so care must be taken to ensure that an object isn't + * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock() + * that might happen when a RAII_VAR() goes out of scope. + * + * \par Typical life cycles + * + * \li stasis_topic - There are several topics which live for the duration of + * the Asterisk process (ast_channel_topic_all(), etc.) but most of these + * are actually fed by shorter-lived topics whose lifetime is associated + * with some domain object (like ast_channel_topic() for a given + * ast_channel). + * + * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as + * topics, for similar reasons. + * + * \li dispatch - Very short lived; just long enough to post a message to a + * subscriber. + * + * \li stasis_message - Short to intermediate lifetimes, but that is mostly + * irrelevant. Messages are strictly data and have no behavior associated + * with them, so it doesn't really matter if/when they are destroyed. By + * design, a component could hold a ref to a message forever without any + * ill consequences (aside from consuming more memory). + * + * \li stasis_message_type - Long life cycles, typically only destroyed on + * module unloading or _clean_ process exit. + * + * \par Subscriber shutdown sequencing + * + * Subscribers are sensitive to shutdown sequencing, specifically in how the + * reference message types. This is fully detailed on the wiki at + * https://wiki.asterisk.org/wiki/x/K4BqAQ. + * + * In short, the lifetime of the \a data (and \a callback, if in a module) must + * be held until the stasis_subscription_final_message() has been received. + * Depending on the structure of the subscriber code, this can be handled by + * using stasis_subscription_final_message() to free resources on the final + * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to + * block until the unsubscribe has completed. + */ + /*! Initial size of the subscribers list. */ #define INITIAL_SUBSCRIBERS_MAX 4 @@ -53,7 +142,7 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ struct stasis_topic { char *name; - /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */ + /*! Variable length array of the subscribers */ struct stasis_subscription **subscribers; /*! Allocated length of the subscribers array */ size_t num_subscribers_max; @@ -67,6 +156,10 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs static void topic_dtor(void *obj) { struct stasis_topic *topic = obj; + + /* Subscribers hold a reference to topics, so they should all be + * unsubscribed before we get here. */ + ast_assert(topic->num_subscribers_current == 0); ast_free(topic->name); topic->name = NULL; ast_free(topic->subscribers); @@ -116,7 +209,7 @@ struct stasis_subscription { /*! Data pointer to be handed to the callback. */ void *data; - /*! Lock for joining with subscription. */ + /*! Lock for completion flags \c final_message_{rxed,processed}. */ ast_mutex_t join_lock; /*! Condition for joining with subscription. */ ast_cond_t join_cond; @@ -131,8 +224,14 @@ struct stasis_subscription { static void subscription_dtor(void *obj) { struct stasis_subscription *sub = obj; + + /* Subscriptions need to be manually unsubscribed before destruction + * b/c there's a cyclic reference between topics and subscriptions */ ast_assert(!stasis_subscription_is_subscribed(sub)); + /* If there are any messages in flight to this subscription; that would + * be bad. */ ast_assert(stasis_subscription_is_done(sub)); + ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); @@ -221,7 +320,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { if (sub) { size_t i; - struct stasis_topic *topic = sub->topic; + /* The subscription may be the last ref to this topic. Hold + * the topic ref open until after the unlock. */ + RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic), + ao2_cleanup); SCOPED_AO2LOCK(lock_topic, topic); for (i = 0; i < topic->num_subscribers_current; ++i) { @@ -240,11 +342,6 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } -/*! - * \brief Block until the final message has been received on a subscription. - * - * \param subscription Subscription to wait on. - */ void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { @@ -347,7 +444,12 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs topic->num_subscribers_max *= 2; } - /* Don't ref sub here or we'll cause a reference cycle. */ + /* The reference from the topic to the subscription is shared with + * the owner of the subscription, which will explicitly unsubscribe + * to release it. + * + * If we bumped the refcount here, the owner would have to unsubscribe + * and cleanup, which is a bit awkward. */ topic->subscribers[topic->num_subscribers_current++] = sub; return 0; } @@ -413,9 +515,13 @@ static int dispatch_exec(void *data) return 0; } -void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message) +void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message) { size_t i; + /* The topic may be unref'ed by the subscription invocation. + * Make sure we hold onto a reference while dispatching. */ + RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic), + ao2_cleanup); SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); @@ -625,7 +731,7 @@ void stasis_log_bad_type_access(const char *name) ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name); } -/*! \brief Cleanup function */ +/*! \brief Shutdown function */ static void stasis_exit(void) { ast_threadpool_shutdown(pool); |