summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-05-17 21:10:32 +0000
committerDavid M. Lee <dlee@digium.com>2013-05-17 21:10:32 +0000
commitb97c71bb1190cb41eba9081d14724bcb39d422ba (patch)
tree2ae24b23411b0ab59b2239c4cefc5675c67de48e
parent91bab7642281b593495284bd16744a6213cb6ea8 (diff)
Fix shutdown assertions in stasis-core
In r388005, macros were introduced to consistently define message types. This added an assert if a message type was used either before it was initialized or after it had been cleaned up. It turns out that this assertion fires during shutdown. This actually exposed a hidden shutdown ordering problem. Since unsubscribing is asynchronous, it's possible that the message types used by the subscription could be freed before the final message of the subscription was processed. This patch adds stasis_subscription_join(), which blocks until the last message has been processed by the subscription. Since joining was most commonly done right after an unsubscribe, a stasis_unsubscribe_and_join() convenience function was also added. Similar functions were also added to the stasis_caching_topic and stasis_message_router, since they wrap subscriptions and have similar problems. Other code in trunk was refactored to join() where appropriate, or at least verify that the subscription was complete before being destroyed. Review: https://reviewboard.asterisk.org/r/2540 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@389011 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--apps/app_queue.c4
-rw-r--r--apps/app_voicemail.c4
-rw-r--r--channels/chan_iax2.c12
-rw-r--r--channels/chan_sip.c10
-rw-r--r--funcs/func_presencestate.c2
-rw-r--r--include/asterisk/stasis.h116
-rw-r--r--include/asterisk/stasis_message_router.h24
-rw-r--r--main/app.c2
-rw-r--r--main/devicestate.c2
-rw-r--r--main/endpoints.c7
-rw-r--r--main/manager.c4
-rw-r--r--main/manager_channels.c2
-rw-r--r--main/pbx.c8
-rw-r--r--main/stasis.c81
-rw-r--r--main/stasis_cache.c19
-rw-r--r--main/stasis_channels.c6
-rw-r--r--main/stasis_endpoints.c18
-rw-r--r--main/stasis_message_router.c21
-rw-r--r--res/res_chan_stats.c4
-rw-r--r--res/res_jabber.c8
-rw-r--r--res/res_stasis.c2
21 files changed, 280 insertions, 76 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 3455cc748..c63cd071e 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -9866,9 +9866,7 @@ static int unload_module(void)
res |= ast_data_unregister(NULL);
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_extension_state_del(0, extension_state_cb);
diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c
index 13bcd3ea5..b3ceeebc9 100644
--- a/apps/app_voicemail.c
+++ b/apps/app_voicemail.c
@@ -12689,9 +12689,7 @@ static void stop_poll_thread(void)
{
poll_thread_run = 0;
- if (mwi_sub_sub) {
- mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
- }
+ mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
ast_mutex_lock(&poll_lock);
ast_cond_signal(&poll_cond);
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index eeffb6696..112a99375 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -1334,9 +1334,7 @@ static void network_change_stasis_subscribe(void)
static void network_change_stasis_unsubscribe(void)
{
- if (network_change_sub) {
- network_change_sub = stasis_unsubscribe(network_change_sub);
- }
+ network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
@@ -1349,9 +1347,7 @@ static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)
@@ -12424,9 +12420,7 @@ static void peer_destructor(void *obj)
if (peer->dnsmgr)
ast_dnsmgr_release(peer->dnsmgr);
- if (peer->mwi_event_sub) {
- peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
- }
+ peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
ast_string_field_free_memory(peer);
}
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 937acb94e..88965fc73 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -16742,25 +16742,21 @@ static void network_change_stasis_subscribe(void)
static void network_change_stasis_unsubscribe(void)
{
- if (network_change_sub) {
- network_change_sub = stasis_unsubscribe(network_change_sub);
- }
+ network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
{
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
- acl_change_stasis_cb, NULL);
+ acl_change_stasis_cb, NULL);
}
}
static void acl_change_event_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)
diff --git a/funcs/func_presencestate.c b/funcs/func_presencestate.c
index 01e6d09c2..3bf4a81b3 100644
--- a/funcs/func_presencestate.c
+++ b/funcs/func_presencestate.c
@@ -706,7 +706,7 @@ AST_TEST_DEFINE(test_presence_state_change)
return AST_TEST_FAIL;
}
- test_sub = stasis_unsubscribe(test_sub);
+ test_sub = stasis_unsubscribe_and_join(test_sub);
ao2_cleanup(cb_data->presence_state);
ast_free((char *)cb_data);
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 7e46dbf41..e6ea6fa13 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -124,6 +124,34 @@
* stasis_subscription. Due to cyclic references, the \ref
* stasis_subscription will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
+ *
+ * \par Shutdown
+ *
+ * Subscriptions have two options for unsubscribing, depending upon the context
+ * in which you need to unsubscribe.
+ *
+ * If your subscription is owned by a module, and you must unsubscribe from the
+ * module_unload() function, then you'll want to use the
+ * stasis_unsubscribe_and_join() function. This will block until the final
+ * message has been received on the subscription. Otherwise, there's the danger
+ * of invoking the callback function after it has been unloaded.
+ *
+ * If your subscription is owned by an object, then your object should have an
+ * explicit shutdown() function, which calls stasis_unsubscribe(). In your
+ * subscription handler, when the stasis_subscription_final_message() has been
+ * received, decrement the refcount on your object. In your object's destructor,
+ * you may assert that stasis_subscription_is_done() to validate that the
+ * subscription's callback will no longer be invoked.
+ *
+ * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
+ * an object's destructor. While code that does this may work most of the time,
+ * it's got one big downside. There's a general assumption that object
+ * destruction is non-blocking. If you block the destruction waiting for the
+ * subscription to complete, there's the danger that the subscription may
+ * process a message which will bump the refcount up by one. Then it does
+ * whatever it does, decrements the refcount, which then proceeds to re-destroy
+ * the object. Now you've got hard to reproduce bugs that only show up under
+ * certain loads.
*/
#include "asterisk/utils.h"
@@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
* \since 12
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
- stasis_subscription_cb callback,
- void *data);
+ stasis_subscription_cb callback, void *data);
/*!
* \brief Cancel a subscription.
@@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
* delivery of the final message.
*
* \param subscription Subscription to cancel.
- * \retval NULL for convenience
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_subscription *stasis_unsubscribe(
+ struct stasis_subscription *subscription);
+
+/*!
+ * \brief Block until the last message is processed on a subscription.
+ *
+ * This function will not return until the \a subscription's callback for the
+ * stasis_subscription_final_message() completes. This allows cleanup routines
+ * to run before unblocking the joining thread.
+ *
+ * \param subscription Subscription to block on.
+ * \since 12
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Returns whether \a subscription has received its final message.
+ *
+ * Note that a subscription is considered done even while the
+ * stasis_subscription_final_message() is being processed. This allows cleanup
+ * routines to check the status of the subscription.
+ *
+ * \param subscription Subscription.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_subscription_is_done(struct stasis_subscription *subscription);
+
+/*!
+ * \brief Cancel a subscription, blocking until the last message is processed.
+ *
+ * While normally it's recommended to stasis_unsubscribe() and wait for
+ * stasis_subscription_final_message(), there are times (like during a module
+ * unload) where you have to wait for the final message (otherwise you'll call
+ * a function in a shared module that no longer exists).
+ *
+ * \param subscription Subscription to cancel.
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription);
/*!
* \brief Create a subscription which forwards all messages from one topic to
@@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc
* \return \c NULL on error.
* \since 12
*/
-struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
+struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
+ struct stasis_topic *to_topic);
/*!
* \brief Get the unique ID for the subscription.
@@ -389,7 +459,8 @@ struct stasis_topic_pool;
/*!
* \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
* \param pooled_topic Topic to which messages will be routed
- * \retval the new stasis_topic_pool or NULL on failure
+ * \return the new stasis_topic_pool
+ * \return \c NULL on failure
*/
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
@@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t
* \brief Find or create a topic in the pool
* \param pool Pool for which to get the topic
* \param topic_name Name of the topic to get
- * \retval The already stored or newly allocated topic
- * \retval NULL if the topic was not found and could not be allocated
+ * \return The already stored or newly allocated topic
+ * \return \c NULL if the topic was not found and could not be allocated
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
@@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message);
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
/*!
- * Unsubscribes a caching topic from its upstream topic.
+ * \brief Unsubscribes a caching topic from its upstream topic.
+ *
+ * This function returns immediately, so be sure to cleanup when
+ * stasis_subscription_final_message() is received.
+ *
+ * \param caching_topic Caching topic to unsubscribe
+ * \return \c NULL for convenience
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_caching_unsubscribe(
+ struct stasis_caching_topic *caching_topic);
+
+/*!
+ * \brief Unsubscribes a caching topic from its upstream topic, blocking until
+ * all messages have been forwarded.
+ *
+ * See stasis_unsubscriben_and_join() for more info on when to use this as
+ * opposed to stasis_caching_unsubscribe().
+ *
* \param caching_topic Caching topic to unsubscribe
- * \retval NULL for convenience
+ * \return \c NULL for convenience
* \since 12
*/
-struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
+ struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
@@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
/*!
* \brief Dump cached items to a subscription
* \param caching_topic The topic returned from stasis_caching_topic_create().
- * \param type Type of message to dump (any type if NULL).
+ * \param type Type of message to dump (any type if \c NULL).
* \return ao2_container containing all matches (must be unreffed by caller)
- * \return NULL on allocation error
+ * \return \c NULL on allocation error
* \since 12
*/
struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 42770d293..e7d5a4cc6 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -57,12 +57,36 @@ struct stasis_message_router *stasis_message_router_create(
/*!
* \brief Unsubscribe the router from the upstream topic.
+ *
* \param router Router to unsubscribe.
* \since 12
*/
void stasis_message_router_unsubscribe(struct stasis_message_router *router);
/*!
+ * \brief Unsubscribe the router from the upstream topic, blocking until the
+ * final message has been processed.
+ *
+ * See stasis_unsubscribe_and_join() for info on when to use this
+ * vs. stasis_message_router_unsubscribe().
+ *
+ * \param router Router to unsubscribe.
+ * \since 12
+ */
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router);
+
+/*!
+ * \brief Returns whether \a router has received its final message.
+ *
+ * \param router Router.
+ * \return True (non-zero) if stasis_subscription_final_message() has been
+ * received.
+ * \return False (zero) if waiting for the end.
+ */
+int stasis_message_router_is_done(struct stasis_message_router *router);
+
+/*!
* \brief Add a route to a message router.
* \param router Router to add the route to.
* \param message_type Type of message to route.
diff --git a/main/app.c b/main/app.c
index 0cdd9d31d..0e8a68f7b 100644
--- a/main/app.c
+++ b/main/app.c
@@ -2727,7 +2727,7 @@ static void app_exit(void)
{
ao2_cleanup(mwi_topic_all);
mwi_topic_all = NULL;
- mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+ mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
ao2_cleanup(mwi_topic_pool);
mwi_topic_pool = NULL;
diff --git a/main/devicestate.c b/main/devicestate.c
index aa31dbfd6..afa9621d3 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -776,7 +776,7 @@ static void devstate_exit(void)
{
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
diff --git a/main/endpoints.c b/main/endpoints.c
index 95397f960..c2d0577f9 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj)
struct ast_endpoint *endpoint = obj;
/* The router should be shut down already */
- ast_assert(endpoint->router == NULL);
+ ast_assert(stasis_message_router_is_done(endpoint->router));
+ ao2_cleanup(endpoint->router);
+ endpoint->router = NULL;
stasis_unsubscribe(endpoint->forward);
endpoint->forward = NULL;
@@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
stasis_publish(endpoint->topic, message);
}
+ /* Bump refcount to hold on to the router */
+ ao2_ref(endpoint->router, +1);
stasis_message_router_unsubscribe(endpoint->router);
- endpoint->router = NULL;
}
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
diff --git a/main/manager.c b/main/manager.c
index 4d2923eb5..c9b2fbe1e 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
/* In order to understand what the heck is going on with the
diff --git a/main/manager_channels.c b/main/manager_channels.c
index 63380a762..e1f918868 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
- stasis_message_router_unsubscribe(channel_state_router);
+ stasis_message_router_unsubscribe_and_join(channel_state_router);
channel_state_router = NULL;
}
diff --git a/main/pbx.c b/main/pbx.c
index 74aeee928..9492d9559 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -11700,12 +11700,8 @@ static void unload_pbx(void)
{
int x;
- if (presence_state_sub) {
- presence_state_sub = stasis_unsubscribe(presence_state_sub);
- }
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
/* Unregister builtin applications */
for (x = 0; x < ARRAY_LEN(builtins); x++) {
diff --git a/main/stasis.c b/main/stasis.c
index 98dff95a6..d0ded401c 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -114,16 +114,30 @@ struct stasis_subscription {
stasis_subscription_cb callback;
/*! Data pointer to be handed to the callback. */
void *data;
+
+ /*! Lock for joining with subscription. */
+ ast_mutex_t join_lock;
+ /*! Condition for joining with subscription. */
+ ast_cond_t join_cond;
+ /*! Flag set when final message for sub has been received.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_rxed;
+ /*! Flag set when final message for sub has been processed.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_processed;
};
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_assert(stasis_subscription_is_done(sub));
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
+ ast_mutex_destroy(&sub->join_lock);
+ ast_cond_destroy(&sub->join_cond);
}
/*!
@@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
- /* Since sub->topic doesn't change, no need to lock sub */
- sub->callback(sub->data,
- sub,
- topic,
- message);
+ /* Notify that the final message has been received */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_rxed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
+
+ /* Since sub is mostly immutable, no need to lock sub */
+ sub->callback(sub->data, sub, topic, message);
+
+ /* Notify that the final message has been processed */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_processed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe(
sub->topic = topic;
sub->callback = callback;
sub->data = data;
+ ast_mutex_init(&sub->join_lock);
+ ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
@@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL;
}
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ /* Wait until the processed flag has been set */
+ while (!subscription->final_message_processed) {
+ ast_cond_wait(&subscription->join_cond,
+ &subscription->join_lock);
+ }
+ }
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ return subscription->final_message_rxed;
+ }
+
+ /* Null subscription is about as done as you can get */
+ return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription)
+{
+ if (!subscription) {
+ return NULL;
+ }
+
+ /* Bump refcount to hold it past the unsubscribe */
+ ao2_ref(subscription, +1);
+ stasis_unsubscribe(subscription);
+ stasis_subscription_join(subscription);
+ /* Now decrement the refcount back */
+ ao2_cleanup(subscription);
+ return NULL;
+}
+
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
{
if (sub) {
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 75e6b5f95..154b4f020 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -53,6 +53,8 @@ struct stasis_caching_topic {
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ ast_assert(stasis_subscription_is_done(caching_topic->sub));
+ ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
caching_topic->cache = NULL;
@@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
{
if (caching_topic) {
if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ /* Increment the reference to hold on to it past the
+ * unsubscribe */
+ ao2_ref(caching_topic->sub, +1);
stasis_unsubscribe(caching_topic->sub);
} else {
ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
@@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
return NULL;
}
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
+{
+ if (!caching_topic) {
+ return NULL;
+ }
+
+ /* Hold a ref past the unsubscribe */
+ ao2_ref(caching_topic, +1);
+ stasis_caching_unsubscribe(caching_topic);
+ stasis_subscription_join(caching_topic->sub);
+ ao2_cleanup(caching_topic);
+ return NULL;
+}
+
struct cache_entry {
struct stasis_message_type *type;
char *id;
diff --git a/main/stasis_channels.c b/main/stasis_channels.c
index 3df52d0b3..95f5f9d0e 100644
--- a/main/stasis_channels.c
+++ b/main/stasis_channels.c
@@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal(
void ast_stasis_channels_shutdown(void)
{
+ channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
+ ao2_cleanup(channel_topic_all);
+ channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
- ao2_cleanup(channel_topic_all);
- channel_topic_all = NULL;
- channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
}
void ast_stasis_channels_init(void)
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
index 7428e2cf1..252614f62 100644
--- a/main/stasis_endpoints.c
+++ b/main/stasis_endpoints.c
@@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message)
}
-static void endpoints_stasis_shutdown(void)
-{
- ao2_cleanup(endpoint_topic_all);
- endpoint_topic_all = NULL;
-
- stasis_caching_unsubscribe(endpoint_topic_all_cached);
- endpoint_topic_all_cached = NULL;
-}
-
struct ast_json *ast_endpoint_snapshot_to_json(
const struct ast_endpoint_snapshot *snapshot)
{
@@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json(
return ast_json_ref(json);
}
+static void endpoints_stasis_shutdown(void)
+{
+ stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
+ endpoint_topic_all_cached = NULL;
+
+ ao2_cleanup(endpoint_topic_all);
+ endpoint_topic_all = NULL;
+}
+
int ast_endpoint_stasis_init(void)
{
ast_register_atexit(endpoints_stasis_shutdown);
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 97ed7ad92..c7acca1ff 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -74,6 +74,7 @@ static void router_dtor(void *obj)
size_t i;
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
+ ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
for (i = 0; i < router->num_routes_current; ++i) {
ao2_cleanup(router->routes[i]);
@@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router)
stasis_unsubscribe(router->subscription);
}
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router)
+{
+ if (!router) {
+ return;
+ }
+ stasis_unsubscribe_and_join(router->subscription);
+}
+
+int stasis_message_router_is_done(struct stasis_message_router *router)
+{
+ if (!router) {
+ /* Null router is about as done as you can get */
+ return 1;
+ }
+
+ return stasis_subscription_is_done(router->subscription);
+}
+
+
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,
diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c
index f5f2267aa..d54ba2f0b 100644
--- a/res/res_chan_stats.c
+++ b/res/res_chan_stats.c
@@ -172,9 +172,9 @@ static int load_module(void)
static int unload_module(void)
{
- stasis_unsubscribe(sub);
+ stasis_unsubscribe_and_join(sub);
sub = NULL;
- stasis_message_router_unsubscribe(router);
+ stasis_message_router_unsubscribe_and_join(router);
router = NULL;
return 0;
}
diff --git a/res/res_jabber.c b/res/res_jabber.c
index 69f366522..2070c8024 100644
--- a/res/res_jabber.c
+++ b/res/res_jabber.c
@@ -4770,12 +4770,8 @@ static int unload_module(void)
ast_unregister_application(app_ajileave);
ast_manager_unregister("JabberSend");
ast_custom_function_unregister(&jabberstatus_function);
- if (mwi_sub) {
- mwi_sub = stasis_unsubscribe(mwi_sub);
- }
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_custom_function_unregister(&jabberreceive_function);
ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 5327ca4ac..e9931a09a 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -722,7 +722,7 @@ static int unload_module(void)
{
int r = 0;
- stasis_message_router_unsubscribe(channel_router);
+ stasis_message_router_unsubscribe_and_join(channel_router);
channel_router = NULL;
ao2_cleanup(apps_registry);