diff options
-rw-r--r-- | apps/app_queue.c | 4 | ||||
-rw-r--r-- | apps/app_voicemail.c | 4 | ||||
-rw-r--r-- | channels/chan_iax2.c | 12 | ||||
-rw-r--r-- | channels/chan_sip.c | 10 | ||||
-rw-r--r-- | funcs/func_presencestate.c | 2 | ||||
-rw-r--r-- | include/asterisk/stasis.h | 116 | ||||
-rw-r--r-- | include/asterisk/stasis_message_router.h | 24 | ||||
-rw-r--r-- | main/app.c | 2 | ||||
-rw-r--r-- | main/devicestate.c | 2 | ||||
-rw-r--r-- | main/endpoints.c | 7 | ||||
-rw-r--r-- | main/manager.c | 4 | ||||
-rw-r--r-- | main/manager_channels.c | 2 | ||||
-rw-r--r-- | main/pbx.c | 8 | ||||
-rw-r--r-- | main/stasis.c | 81 | ||||
-rw-r--r-- | main/stasis_cache.c | 19 | ||||
-rw-r--r-- | main/stasis_channels.c | 6 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 18 | ||||
-rw-r--r-- | main/stasis_message_router.c | 21 | ||||
-rw-r--r-- | res/res_chan_stats.c | 4 | ||||
-rw-r--r-- | res/res_jabber.c | 8 | ||||
-rw-r--r-- | res/res_stasis.c | 2 |
21 files changed, 280 insertions, 76 deletions
diff --git a/apps/app_queue.c b/apps/app_queue.c index 3455cc748..c63cd071e 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -9866,9 +9866,7 @@ static int unload_module(void) res |= ast_data_unregister(NULL); - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); ast_extension_state_del(0, extension_state_cb); diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index 13bcd3ea5..b3ceeebc9 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -12689,9 +12689,7 @@ static void stop_poll_thread(void) { poll_thread_run = 0; - if (mwi_sub_sub) { - mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub); - } + mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub); ast_mutex_lock(&poll_lock); ast_cond_signal(&poll_cond); diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index eeffb6696..112a99375 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1334,9 +1334,7 @@ static void network_change_stasis_subscribe(void) static void network_change_stasis_unsubscribe(void) { - if (network_change_sub) { - network_change_sub = stasis_unsubscribe(network_change_sub); - } + network_change_sub = stasis_unsubscribe_and_join(network_change_sub); } static void acl_change_stasis_subscribe(void) @@ -1349,9 +1347,7 @@ static void acl_change_stasis_subscribe(void) static void acl_change_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } static int network_change_sched_cb(const void *data) @@ -12424,9 +12420,7 @@ static void peer_destructor(void *obj) if (peer->dnsmgr) ast_dnsmgr_release(peer->dnsmgr); - if (peer->mwi_event_sub) { - peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub); - } + peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub); ast_string_field_free_memory(peer); } diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 937acb94e..88965fc73 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -16742,25 +16742,21 @@ static void network_change_stasis_subscribe(void) static void network_change_stasis_unsubscribe(void) { - if (network_change_sub) { - network_change_sub = stasis_unsubscribe(network_change_sub); - } + network_change_sub = stasis_unsubscribe_and_join(network_change_sub); } static void acl_change_stasis_subscribe(void) { if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), - acl_change_stasis_cb, NULL); + acl_change_stasis_cb, NULL); } } static void acl_change_event_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } static int network_change_sched_cb(const void *data) diff --git a/funcs/func_presencestate.c b/funcs/func_presencestate.c index 01e6d09c2..3bf4a81b3 100644 --- a/funcs/func_presencestate.c +++ b/funcs/func_presencestate.c @@ -706,7 +706,7 @@ AST_TEST_DEFINE(test_presence_state_change) return AST_TEST_FAIL; } - test_sub = stasis_unsubscribe(test_sub); + test_sub = stasis_unsubscribe_and_join(test_sub); ao2_cleanup(cb_data->presence_state); ast_free((char *)cb_data); diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 7e46dbf41..e6ea6fa13 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -124,6 +124,34 @@ * stasis_subscription. Due to cyclic references, the \ref * stasis_subscription will not be freed until after it has been unsubscribed, * and all other ao2_ref()'s have been cleaned up. + * + * \par Shutdown + * + * Subscriptions have two options for unsubscribing, depending upon the context + * in which you need to unsubscribe. + * + * If your subscription is owned by a module, and you must unsubscribe from the + * module_unload() function, then you'll want to use the + * stasis_unsubscribe_and_join() function. This will block until the final + * message has been received on the subscription. Otherwise, there's the danger + * of invoking the callback function after it has been unloaded. + * + * If your subscription is owned by an object, then your object should have an + * explicit shutdown() function, which calls stasis_unsubscribe(). In your + * subscription handler, when the stasis_subscription_final_message() has been + * received, decrement the refcount on your object. In your object's destructor, + * you may assert that stasis_subscription_is_done() to validate that the + * subscription's callback will no longer be invoked. + * + * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from + * an object's destructor. While code that does this may work most of the time, + * it's got one big downside. There's a general assumption that object + * destruction is non-blocking. If you block the destruction waiting for the + * subscription to complete, there's the danger that the subscription may + * process a message which will bump the refcount up by one. Then it does + * whatever it does, decrements the refcount, which then proceeds to re-destroy + * the object. Now you've got hard to reproduce bugs that only show up under + * certain loads. */ #include "asterisk/utils.h" @@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s * \since 12 */ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, - stasis_subscription_cb callback, - void *data); + stasis_subscription_cb callback, void *data); /*! * \brief Cancel a subscription. @@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * delivery of the final message. * * \param subscription Subscription to cancel. - * \retval NULL for convenience + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_subscription *stasis_unsubscribe( + struct stasis_subscription *subscription); + +/*! + * \brief Block until the last message is processed on a subscription. + * + * This function will not return until the \a subscription's callback for the + * stasis_subscription_final_message() completes. This allows cleanup routines + * to run before unblocking the joining thread. + * + * \param subscription Subscription to block on. + * \since 12 + */ +void stasis_subscription_join(struct stasis_subscription *subscription); + +/*! + * \brief Returns whether \a subscription has received its final message. + * + * Note that a subscription is considered done even while the + * stasis_subscription_final_message() is being processed. This allows cleanup + * routines to check the status of the subscription. + * + * \param subscription Subscription. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_subscription_is_done(struct stasis_subscription *subscription); + +/*! + * \brief Cancel a subscription, blocking until the last message is processed. + * + * While normally it's recommended to stasis_unsubscribe() and wait for + * stasis_subscription_final_message(), there are times (like during a module + * unload) where you have to wait for the final message (otherwise you'll call + * a function in a shared module that no longer exists). + * + * \param subscription Subscription to cancel. + * \return \c NULL for convenience * \since 12 */ -struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription); +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription); /*! * \brief Create a subscription which forwards all messages from one topic to @@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc * \return \c NULL on error. * \since 12 */ -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, + struct stasis_topic *to_topic); /*! * \brief Get the unique ID for the subscription. @@ -389,7 +459,8 @@ struct stasis_topic_pool; /*! * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic * \param pooled_topic Topic to which messages will be routed - * \retval the new stasis_topic_pool or NULL on failure + * \return the new stasis_topic_pool + * \return \c NULL on failure */ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic); @@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t * \brief Find or create a topic in the pool * \param pool Pool for which to get the topic * \param topic_name Name of the topic to get - * \retval The already stored or newly allocated topic - * \retval NULL if the topic was not found and could not be allocated + * \return The already stored or newly allocated topic + * \return \c NULL if the topic was not found and could not be allocated */ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name); @@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message); struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); /*! - * Unsubscribes a caching topic from its upstream topic. + * \brief Unsubscribes a caching topic from its upstream topic. + * + * This function returns immediately, so be sure to cleanup when + * stasis_subscription_final_message() is received. + * + * \param caching_topic Caching topic to unsubscribe + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_caching_topic *stasis_caching_unsubscribe( + struct stasis_caching_topic *caching_topic); + +/*! + * \brief Unsubscribes a caching topic from its upstream topic, blocking until + * all messages have been forwarded. + * + * See stasis_unsubscriben_and_join() for more info on when to use this as + * opposed to stasis_caching_unsubscribe(). + * * \param caching_topic Caching topic to unsubscribe - * \retval NULL for convenience + * \return \c NULL for convenience * \since 12 */ -struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic); +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( + struct stasis_caching_topic *caching_topic); /*! * \brief Returns the topic of cached events from a caching topics. @@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top /*! * \brief Dump cached items to a subscription * \param caching_topic The topic returned from stasis_caching_topic_create(). - * \param type Type of message to dump (any type if NULL). + * \param type Type of message to dump (any type if \c NULL). * \return ao2_container containing all matches (must be unreffed by caller) - * \return NULL on allocation error + * \return \c NULL on allocation error * \since 12 */ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 42770d293..e7d5a4cc6 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -57,12 +57,36 @@ struct stasis_message_router *stasis_message_router_create( /*! * \brief Unsubscribe the router from the upstream topic. + * * \param router Router to unsubscribe. * \since 12 */ void stasis_message_router_unsubscribe(struct stasis_message_router *router); /*! + * \brief Unsubscribe the router from the upstream topic, blocking until the + * final message has been processed. + * + * See stasis_unsubscribe_and_join() for info on when to use this + * vs. stasis_message_router_unsubscribe(). + * + * \param router Router to unsubscribe. + * \since 12 + */ +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router); + +/*! + * \brief Returns whether \a router has received its final message. + * + * \param router Router. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_message_router_is_done(struct stasis_message_router *router); + +/*! * \brief Add a route to a message router. * \param router Router to add the route to. * \param message_type Type of message to route. diff --git a/main/app.c b/main/app.c index 0cdd9d31d..0e8a68f7b 100644 --- a/main/app.c +++ b/main/app.c @@ -2727,7 +2727,7 @@ static void app_exit(void) { ao2_cleanup(mwi_topic_all); mwi_topic_all = NULL; - mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached); + mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type); ao2_cleanup(mwi_topic_pool); mwi_topic_pool = NULL; diff --git a/main/devicestate.c b/main/devicestate.c index aa31dbfd6..afa9621d3 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -776,7 +776,7 @@ static void devstate_exit(void) { ao2_cleanup(device_state_topic_all); device_state_topic_all = NULL; - device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached); + device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); ao2_cleanup(device_state_topic_pool); device_state_topic_pool = NULL; diff --git a/main/endpoints.c b/main/endpoints.c index 95397f960..c2d0577f9 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj) struct ast_endpoint *endpoint = obj; /* The router should be shut down already */ - ast_assert(endpoint->router == NULL); + ast_assert(stasis_message_router_is_done(endpoint->router)); + ao2_cleanup(endpoint->router); + endpoint->router = NULL; stasis_unsubscribe(endpoint->forward); endpoint->forward = NULL; @@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) stasis_publish(endpoint->topic, message); } + /* Bump refcount to hold on to the router */ + ao2_ref(endpoint->router, +1); stasis_message_router_unsubscribe(endpoint->router); - endpoint->router = NULL; } const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) diff --git a/main/manager.c b/main/manager.c index 4d2923eb5..c9b2fbe1e 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void) static void acl_change_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } /* In order to understand what the heck is going on with the diff --git a/main/manager_channels.c b/main/manager_channels.c index 63380a762..e1f918868 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub, static void manager_channels_shutdown(void) { - stasis_message_router_unsubscribe(channel_state_router); + stasis_message_router_unsubscribe_and_join(channel_state_router); channel_state_router = NULL; } diff --git a/main/pbx.c b/main/pbx.c index 74aeee928..9492d9559 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -11700,12 +11700,8 @@ static void unload_pbx(void) { int x; - if (presence_state_sub) { - presence_state_sub = stasis_unsubscribe(presence_state_sub); - } - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub); + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); /* Unregister builtin applications */ for (x = 0; x < ARRAY_LEN(builtins); x++) { diff --git a/main/stasis.c b/main/stasis.c index 98dff95a6..d0ded401c 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -114,16 +114,30 @@ struct stasis_subscription { stasis_subscription_cb callback; /*! Data pointer to be handed to the callback. */ void *data; + + /*! Lock for joining with subscription. */ + ast_mutex_t join_lock; + /*! Condition for joining with subscription. */ + ast_cond_t join_cond; + /*! Flag set when final message for sub has been received. + * Be sure join_lock is held before reading/setting. */ + int final_message_rxed; + /*! Flag set when final message for sub has been processed. + * Be sure join_lock is held before reading/setting. */ + int final_message_processed; }; static void subscription_dtor(void *obj) { struct stasis_subscription *sub = obj; ast_assert(!stasis_subscription_is_subscribed(sub)); + ast_assert(stasis_subscription_is_done(sub)); ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; + ast_mutex_destroy(&sub->join_lock); + ast_cond_destroy(&sub->join_cond); } /*! @@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) { - /* Since sub->topic doesn't change, no need to lock sub */ - sub->callback(sub->data, - sub, - topic, - message); + /* Notify that the final message has been received */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_rxed = 1; + ast_cond_signal(&sub->join_cond); + } + + /* Since sub is mostly immutable, no need to lock sub */ + sub->callback(sub->data, sub, topic, message); + + /* Notify that the final message has been processed */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_processed = 1; + ast_cond_signal(&sub->join_cond); + } } static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); @@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe( sub->topic = topic; sub->callback = callback; sub->data = data; + ast_mutex_init(&sub->join_lock); + ast_cond_init(&sub->join_cond, NULL); if (topic_add_subscription(topic, sub) != 0) { return NULL; @@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } +/*! + * \brief Block until the final message has been received on a subscription. + * + * \param subscription Subscription to wait on. + */ +void stasis_subscription_join(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + /* Wait until the processed flag has been set */ + while (!subscription->final_message_processed) { + ast_cond_wait(&subscription->join_cond, + &subscription->join_lock); + } + } +} + +int stasis_subscription_is_done(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + return subscription->final_message_rxed; + } + + /* Null subscription is about as done as you can get */ + return 1; +} + +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription) +{ + if (!subscription) { + return NULL; + } + + /* Bump refcount to hold it past the unsubscribe */ + ao2_ref(subscription, +1); + stasis_unsubscribe(subscription); + stasis_subscription_join(subscription); + /* Now decrement the refcount back */ + ao2_cleanup(subscription); + return NULL; +} + int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) { if (sub) { diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 75e6b5f95..154b4f020 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -53,6 +53,8 @@ struct stasis_caching_topic { static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + ast_assert(stasis_subscription_is_done(caching_topic->sub)); + ao2_cleanup(caching_topic->sub); caching_topic->sub = NULL; ao2_cleanup(caching_topic->cache); caching_topic->cache = NULL; @@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to { if (caching_topic) { if (stasis_subscription_is_subscribed(caching_topic->sub)) { + /* Increment the reference to hold on to it past the + * unsubscribe */ + ao2_ref(caching_topic->sub, +1); stasis_unsubscribe(caching_topic->sub); } else { ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n"); @@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to return NULL; } +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic) +{ + if (!caching_topic) { + return NULL; + } + + /* Hold a ref past the unsubscribe */ + ao2_ref(caching_topic, +1); + stasis_caching_unsubscribe(caching_topic); + stasis_subscription_join(caching_topic->sub); + ao2_cleanup(caching_topic); + return NULL; +} + struct cache_entry { struct stasis_message_type *type; char *id; diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 3df52d0b3..95f5f9d0e 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal( void ast_stasis_channels_shutdown(void) { + channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached); + ao2_cleanup(channel_topic_all); + channel_topic_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type); - ao2_cleanup(channel_topic_all); - channel_topic_all = NULL; - channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached); } void ast_stasis_channels_init(void) diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index 7428e2cf1..252614f62 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message) } -static void endpoints_stasis_shutdown(void) -{ - ao2_cleanup(endpoint_topic_all); - endpoint_topic_all = NULL; - - stasis_caching_unsubscribe(endpoint_topic_all_cached); - endpoint_topic_all_cached = NULL; -} - struct ast_json *ast_endpoint_snapshot_to_json( const struct ast_endpoint_snapshot *snapshot) { @@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json( return ast_json_ref(json); } +static void endpoints_stasis_shutdown(void) +{ + stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached); + endpoint_topic_all_cached = NULL; + + ao2_cleanup(endpoint_topic_all); + endpoint_topic_all = NULL; +} + int ast_endpoint_stasis_init(void) { ast_register_atexit(endpoints_stasis_shutdown); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 97ed7ad92..c7acca1ff 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -74,6 +74,7 @@ static void router_dtor(void *obj) size_t i; ast_assert(!stasis_subscription_is_subscribed(router->subscription)); + ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; for (i = 0; i < router->num_routes_current; ++i) { ao2_cleanup(router->routes[i]); @@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router) stasis_unsubscribe(router->subscription); } +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router) +{ + if (!router) { + return; + } + stasis_unsubscribe_and_join(router->subscription); +} + +int stasis_message_router_is_done(struct stasis_message_router *router) +{ + if (!router) { + /* Null router is about as done as you can get */ + return 1; + } + + return stasis_subscription_is_done(router->subscription); +} + + static struct stasis_message_route *route_create( struct stasis_message_type *message_type, stasis_subscription_cb callback, diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index f5f2267aa..d54ba2f0b 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -172,9 +172,9 @@ static int load_module(void) static int unload_module(void) { - stasis_unsubscribe(sub); + stasis_unsubscribe_and_join(sub); sub = NULL; - stasis_message_router_unsubscribe(router); + stasis_message_router_unsubscribe_and_join(router); router = NULL; return 0; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 69f366522..2070c8024 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -4770,12 +4770,8 @@ static int unload_module(void) ast_unregister_application(app_ajileave); ast_manager_unregister("JabberSend"); ast_custom_function_unregister(&jabberstatus_function); - if (mwi_sub) { - mwi_sub = stasis_unsubscribe(mwi_sub); - } - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + mwi_sub = stasis_unsubscribe_and_join(mwi_sub); + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); ast_custom_function_unregister(&jabberreceive_function); ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, { diff --git a/res/res_stasis.c b/res/res_stasis.c index 5327ca4ac..e9931a09a 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -722,7 +722,7 @@ static int unload_module(void) { int r = 0; - stasis_message_router_unsubscribe(channel_router); + stasis_message_router_unsubscribe_and_join(channel_router); channel_router = NULL; ao2_cleanup(apps_registry); |