summaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-05-17 21:10:32 +0000
committerDavid M. Lee <dlee@digium.com>2013-05-17 21:10:32 +0000
commitb97c71bb1190cb41eba9081d14724bcb39d422ba (patch)
tree2ae24b23411b0ab59b2239c4cefc5675c67de48e /include
parent91bab7642281b593495284bd16744a6213cb6ea8 (diff)
Fix shutdown assertions in stasis-core
In r388005, macros were introduced to consistently define message types. This added an assert if a message type was used either before it was initialized or after it had been cleaned up. It turns out that this assertion fires during shutdown. This actually exposed a hidden shutdown ordering problem. Since unsubscribing is asynchronous, it's possible that the message types used by the subscription could be freed before the final message of the subscription was processed. This patch adds stasis_subscription_join(), which blocks until the last message has been processed by the subscription. Since joining was most commonly done right after an unsubscribe, a stasis_unsubscribe_and_join() convenience function was also added. Similar functions were also added to the stasis_caching_topic and stasis_message_router, since they wrap subscriptions and have similar problems. Other code in trunk was refactored to join() where appropriate, or at least verify that the subscription was complete before being destroyed. Review: https://reviewboard.asterisk.org/r/2540 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@389011 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'include')
-rw-r--r--include/asterisk/stasis.h116
-rw-r--r--include/asterisk/stasis_message_router.h24
2 files changed, 127 insertions, 13 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 7e46dbf41..e6ea6fa13 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -124,6 +124,34 @@
* stasis_subscription. Due to cyclic references, the \ref
* stasis_subscription will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
+ *
+ * \par Shutdown
+ *
+ * Subscriptions have two options for unsubscribing, depending upon the context
+ * in which you need to unsubscribe.
+ *
+ * If your subscription is owned by a module, and you must unsubscribe from the
+ * module_unload() function, then you'll want to use the
+ * stasis_unsubscribe_and_join() function. This will block until the final
+ * message has been received on the subscription. Otherwise, there's the danger
+ * of invoking the callback function after it has been unloaded.
+ *
+ * If your subscription is owned by an object, then your object should have an
+ * explicit shutdown() function, which calls stasis_unsubscribe(). In your
+ * subscription handler, when the stasis_subscription_final_message() has been
+ * received, decrement the refcount on your object. In your object's destructor,
+ * you may assert that stasis_subscription_is_done() to validate that the
+ * subscription's callback will no longer be invoked.
+ *
+ * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
+ * an object's destructor. While code that does this may work most of the time,
+ * it's got one big downside. There's a general assumption that object
+ * destruction is non-blocking. If you block the destruction waiting for the
+ * subscription to complete, there's the danger that the subscription may
+ * process a message which will bump the refcount up by one. Then it does
+ * whatever it does, decrements the refcount, which then proceeds to re-destroy
+ * the object. Now you've got hard to reproduce bugs that only show up under
+ * certain loads.
*/
#include "asterisk/utils.h"
@@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
* \since 12
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
- stasis_subscription_cb callback,
- void *data);
+ stasis_subscription_cb callback, void *data);
/*!
* \brief Cancel a subscription.
@@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
* delivery of the final message.
*
* \param subscription Subscription to cancel.
- * \retval NULL for convenience
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+ struct stasis_subscription *subscription);
+
+/*!
+ * \brief Block until the last message is processed on a subscription.
+ *
+ * This function will not return until the \a subscription's callback for the
+ * stasis_subscription_final_message() completes. This allows cleanup routines
+ * to run before unblocking the joining thread.
+ *
+ * \param subscription Subscription to block on.
+ * \since 12
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Returns whether \a subscription has received its final message.
+ *
+ * Note that a subscription is considered done even while the
+ * stasis_subscription_final_message() is being processed. This allows cleanup
+ * routines to check the status of the subscription.
+ *
+ * \param subscription Subscription.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_subscription_is_done(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait for
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription);
/*!
* \brief Create a subscription which forwards all messages from one topic to
@@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic);
/*!
* \brief Get the unique ID for the subscription.
@@ -389,7 +459,8 @@ struct stasis_topic_pool;
/*!
* \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
* \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
*/
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
@@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t
* \brief Find or create a topic in the pool
* \param pool Pool for which to get the topic
* \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
@@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message);
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
/*!
- * Unsubscribes a caching topic from its upstream topic.
+ * \brief Unsubscribes a caching topic from its upstream topic.
+ *
+ * This function returns immediately, so be sure to cleanup when
+ * stasis_subscription_final_message() is received.
+ *
+ * \param caching_topic Caching topic to unsubscribe
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe(
+ struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Unsubscribes a caching topic from its upstream topic, blocking until
+ * all messages have been forwarded.
+ *
+ * See stasis_unsubscriben_and_join() for more info on when to use this as
+ * opposed to stasis_caching_unsubscribe().
+ *
* \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
+ struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
@@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
/*!
* \brief Dump cached items to a subscription
* \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
* \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
* \since 12
*/
struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 42770d293..e7d5a4cc6 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -57,12 +57,36 @@ struct stasis_message_router *stasis_message_router_create(
/*!
* \brief Unsubscribe the router from the upstream topic.
+ *
* \param router Router to unsubscribe.
* \since 12
*/
void stasis_message_router_unsubscribe(struct stasis_message_router *router);
/*!
+ * \brief Unsubscribe the router from the upstream topic, blocking until the
+ * final message has been processed.
+ *
+ * See stasis_unsubscribe_and_join() for info on when to use this
+ * vs. stasis_message_router_unsubscribe().
+ *
+ * \param router Router to unsubscribe.
+ * \since 12
+ */
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router);
+
+/*!
+ * \brief Returns whether \a router has received its final message.
+ *
+ * \param router Router.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_message_router_is_done(struct stasis_message_router *router);
+
+/*!
* \brief Add a route to a message router.
* \param router Router to add the route to.
* \param message_type Type of message to route.