summaryrefslogtreecommitdiff
path: root/main/stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/stasis.c')
-rw-r--r--main/stasis.c128
1 files changed, 117 insertions, 11 deletions
diff --git a/main/stasis.c b/main/stasis.c
index b1af7b7f6..1a03bb3d4 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -39,6 +39,95 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
+/*!
+ * \page stasis-impl Stasis Implementation Notes
+ *
+ * \par Reference counting
+ *
+ * Stasis introduces a number of objects, which are tightly related to one
+ * another. Because we rely on ref-counting for memory management, understanding
+ * these relationships is important to understanding this code.
+ *
+ * \code{.txt}
+ *
+ * stasis_topic <----> stasis_subscription
+ * ^ ^
+ * \ /
+ * \ /
+ * dispatch
+ * |
+ * |
+ * v
+ * stasis_message
+ * |
+ * |
+ * v
+ * stasis_message_type
+ *
+ * \endcode
+ *
+ * The most troubling thing in this chart is the cyclic reference between
+ * stasis_topic and stasis_subscription. This is both unfortunate, and
+ * necessary. Topics need the subscription in order to dispatch messages;
+ * subscriptions need the topics to unsubscribe and check subscription status.
+ *
+ * The cycle is broken by stasis_unsubscribe(). The unsubscribe will remove the
+ * topic's reference to a subscription. When the subcription is destroyed, it
+ * will remove its reference to the topic.
+ *
+ * This means that until a subscription has be explicitly unsubscribed, it will
+ * not be destroyed. Neither will a topic be destroyed while it has subscribers.
+ * The destructors of both have assertions regarding this to catch ref-counting
+ * problems where a subscription or topic has had an extra ao2_cleanup().
+ *
+ * The \ref dispatch object is a transient object, which is posted to a
+ * subscription's taskprocessor to send a message to the subscriber. They have
+ * short life cycles, allocated on one thread, destroyed on another.
+ *
+ * During shutdown, or the deletion of a domain object, there are a flurry of
+ * ao2_cleanup()s on subscriptions and topics, as the final in-flight messages
+ * are processed. Any one of these cleanups could be the one to actually destroy
+ * a given object, so care must be taken to ensure that an object isn't
+ * referenced after an ao2_cleanup(). This includes the implicit ao2_unlock()
+ * that might happen when a RAII_VAR() goes out of scope.
+ *
+ * \par Typical life cycles
+ *
+ * \li stasis_topic - There are several topics which live for the duration of
+ * the Asterisk process (ast_channel_topic_all(), etc.) but most of these
+ * are actually fed by shorter-lived topics whose lifetime is associated
+ * with some domain object (like ast_channel_topic() for a given
+ * ast_channel).
+ *
+ * \li stasis_subscription - Subscriptions have a similar mix of lifetimes as
+ * topics, for similar reasons.
+ *
+ * \li dispatch - Very short lived; just long enough to post a message to a
+ * subscriber.
+ *
+ * \li stasis_message - Short to intermediate lifetimes, but that is mostly
+ * irrelevant. Messages are strictly data and have no behavior associated
+ * with them, so it doesn't really matter if/when they are destroyed. By
+ * design, a component could hold a ref to a message forever without any
+ * ill consequences (aside from consuming more memory).
+ *
+ * \li stasis_message_type - Long life cycles, typically only destroyed on
+ * module unloading or _clean_ process exit.
+ *
+ * \par Subscriber shutdown sequencing
+ *
+ * Subscribers are sensitive to shutdown sequencing, specifically in how the
+ * reference message types. This is fully detailed on the wiki at
+ * https://wiki.asterisk.org/wiki/x/K4BqAQ.
+ *
+ * In short, the lifetime of the \a data (and \a callback, if in a module) must
+ * be held until the stasis_subscription_final_message() has been received.
+ * Depending on the structure of the subscriber code, this can be handled by
+ * using stasis_subscription_final_message() to free resources on the final
+ * message, or using stasis_subscription_join()/stasis_unsubscribe_and_join() to
+ * block until the unsubscribe has completed.
+ */
+
/*! Initial size of the subscribers list. */
#define INITIAL_SUBSCRIBERS_MAX 4
@@ -53,7 +142,7 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char *name;
- /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
+ /*! Variable length array of the subscribers */
struct stasis_subscription **subscribers;
/*! Allocated length of the subscribers array */
size_t num_subscribers_max;
@@ -67,6 +156,10 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
static void topic_dtor(void *obj)
{
struct stasis_topic *topic = obj;
+
+ /* Subscribers hold a reference to topics, so they should all be
+ * unsubscribed before we get here. */
+ ast_assert(topic->num_subscribers_current == 0);
ast_free(topic->name);
topic->name = NULL;
ast_free(topic->subscribers);
@@ -116,7 +209,7 @@ struct stasis_subscription {
/*! Data pointer to be handed to the callback. */
void *data;
- /*! Lock for joining with subscription. */
+ /*! Lock for completion flags \c final_message_{rxed,processed}. */
ast_mutex_t join_lock;
/*! Condition for joining with subscription. */
ast_cond_t join_cond;
@@ -131,8 +224,14 @@ struct stasis_subscription {
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
+
+ /* Subscriptions need to be manually unsubscribed before destruction
+ * b/c there's a cyclic reference between topics and subscriptions */
ast_assert(!stasis_subscription_is_subscribed(sub));
+ /* If there are any messages in flight to this subscription; that would
+ * be bad. */
ast_assert(stasis_subscription_is_done(sub));
+
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
@@ -221,7 +320,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
size_t i;
- struct stasis_topic *topic = sub->topic;
+ /* The subscription may be the last ref to this topic. Hold
+ * the topic ref open until after the unlock. */
+ RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
+ ao2_cleanup);
SCOPED_AO2LOCK(lock_topic, topic);
for (i = 0; i < topic->num_subscribers_current; ++i) {
@@ -240,11 +342,6 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL;
}
-/*!
- * \brief Block until the final message has been received on a subscription.
- *
- * \param subscription Subscription to wait on.
- */
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
@@ -347,7 +444,12 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic->num_subscribers_max *= 2;
}
- /* Don't ref sub here or we'll cause a reference cycle. */
+ /* The reference from the topic to the subscription is shared with
+ * the owner of the subscription, which will explicitly unsubscribe
+ * to release it.
+ *
+ * If we bumped the refcount here, the owner would have to unsubscribe
+ * and cleanup, which is a bit awkward. */
topic->subscribers[topic->num_subscribers_current++] = sub;
return 0;
}
@@ -413,9 +515,13 @@ static int dispatch_exec(void *data)
return 0;
}
-void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
{
size_t i;
+ /* The topic may be unref'ed by the subscription invocation.
+ * Make sure we hold onto a reference while dispatching. */
+ RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
+ ao2_cleanup);
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
@@ -625,7 +731,7 @@ void stasis_log_bad_type_access(const char *name)
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
-/*! \brief Cleanup function */
+/*! \brief Shutdown function */
static void stasis_exit(void)
{
ast_threadpool_shutdown(pool);