diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/app.c | 2 | ||||
-rw-r--r-- | main/devicestate.c | 2 | ||||
-rw-r--r-- | main/endpoints.c | 7 | ||||
-rw-r--r-- | main/manager.c | 4 | ||||
-rw-r--r-- | main/manager_channels.c | 2 | ||||
-rw-r--r-- | main/pbx.c | 8 | ||||
-rw-r--r-- | main/stasis.c | 81 | ||||
-rw-r--r-- | main/stasis_cache.c | 19 | ||||
-rw-r--r-- | main/stasis_channels.c | 6 | ||||
-rw-r--r-- | main/stasis_endpoints.c | 18 | ||||
-rw-r--r-- | main/stasis_message_router.c | 21 |
11 files changed, 139 insertions, 31 deletions
diff --git a/main/app.c b/main/app.c index 0cdd9d31d..0e8a68f7b 100644 --- a/main/app.c +++ b/main/app.c @@ -2727,7 +2727,7 @@ static void app_exit(void) { ao2_cleanup(mwi_topic_all); mwi_topic_all = NULL; - mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached); + mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type); ao2_cleanup(mwi_topic_pool); mwi_topic_pool = NULL; diff --git a/main/devicestate.c b/main/devicestate.c index aa31dbfd6..afa9621d3 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -776,7 +776,7 @@ static void devstate_exit(void) { ao2_cleanup(device_state_topic_all); device_state_topic_all = NULL; - device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached); + device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); ao2_cleanup(device_state_topic_pool); device_state_topic_pool = NULL; diff --git a/main/endpoints.c b/main/endpoints.c index 95397f960..c2d0577f9 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj) struct ast_endpoint *endpoint = obj; /* The router should be shut down already */ - ast_assert(endpoint->router == NULL); + ast_assert(stasis_message_router_is_done(endpoint->router)); + ao2_cleanup(endpoint->router); + endpoint->router = NULL; stasis_unsubscribe(endpoint->forward); endpoint->forward = NULL; @@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) stasis_publish(endpoint->topic, message); } + /* Bump refcount to hold on to the router */ + ao2_ref(endpoint->router, +1); stasis_message_router_unsubscribe(endpoint->router); - endpoint->router = NULL; } const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) diff --git a/main/manager.c b/main/manager.c index 4d2923eb5..c9b2fbe1e 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void) static void acl_change_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } /* In order to understand what the heck is going on with the diff --git a/main/manager_channels.c b/main/manager_channels.c index 63380a762..e1f918868 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub, static void manager_channels_shutdown(void) { - stasis_message_router_unsubscribe(channel_state_router); + stasis_message_router_unsubscribe_and_join(channel_state_router); channel_state_router = NULL; } diff --git a/main/pbx.c b/main/pbx.c index 74aeee928..9492d9559 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -11700,12 +11700,8 @@ static void unload_pbx(void) { int x; - if (presence_state_sub) { - presence_state_sub = stasis_unsubscribe(presence_state_sub); - } - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub); + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); /* Unregister builtin applications */ for (x = 0; x < ARRAY_LEN(builtins); x++) { diff --git a/main/stasis.c b/main/stasis.c index 98dff95a6..d0ded401c 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -114,16 +114,30 @@ struct stasis_subscription { stasis_subscription_cb callback; /*! Data pointer to be handed to the callback. */ void *data; + + /*! Lock for joining with subscription. */ + ast_mutex_t join_lock; + /*! Condition for joining with subscription. */ + ast_cond_t join_cond; + /*! Flag set when final message for sub has been received. + * Be sure join_lock is held before reading/setting. */ + int final_message_rxed; + /*! Flag set when final message for sub has been processed. + * Be sure join_lock is held before reading/setting. */ + int final_message_processed; }; static void subscription_dtor(void *obj) { struct stasis_subscription *sub = obj; ast_assert(!stasis_subscription_is_subscribed(sub)); + ast_assert(stasis_subscription_is_done(sub)); ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; + ast_mutex_destroy(&sub->join_lock); + ast_cond_destroy(&sub->join_cond); } /*! @@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) { - /* Since sub->topic doesn't change, no need to lock sub */ - sub->callback(sub->data, - sub, - topic, - message); + /* Notify that the final message has been received */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_rxed = 1; + ast_cond_signal(&sub->join_cond); + } + + /* Since sub is mostly immutable, no need to lock sub */ + sub->callback(sub->data, sub, topic, message); + + /* Notify that the final message has been processed */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_processed = 1; + ast_cond_signal(&sub->join_cond); + } } static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); @@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe( sub->topic = topic; sub->callback = callback; sub->data = data; + ast_mutex_init(&sub->join_lock); + ast_cond_init(&sub->join_cond, NULL); if (topic_add_subscription(topic, sub) != 0) { return NULL; @@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } +/*! + * \brief Block until the final message has been received on a subscription. + * + * \param subscription Subscription to wait on. + */ +void stasis_subscription_join(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + /* Wait until the processed flag has been set */ + while (!subscription->final_message_processed) { + ast_cond_wait(&subscription->join_cond, + &subscription->join_lock); + } + } +} + +int stasis_subscription_is_done(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + return subscription->final_message_rxed; + } + + /* Null subscription is about as done as you can get */ + return 1; +} + +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription) +{ + if (!subscription) { + return NULL; + } + + /* Bump refcount to hold it past the unsubscribe */ + ao2_ref(subscription, +1); + stasis_unsubscribe(subscription); + stasis_subscription_join(subscription); + /* Now decrement the refcount back */ + ao2_cleanup(subscription); + return NULL; +} + int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) { if (sub) { diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 75e6b5f95..154b4f020 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -53,6 +53,8 @@ struct stasis_caching_topic { static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + ast_assert(stasis_subscription_is_done(caching_topic->sub)); + ao2_cleanup(caching_topic->sub); caching_topic->sub = NULL; ao2_cleanup(caching_topic->cache); caching_topic->cache = NULL; @@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to { if (caching_topic) { if (stasis_subscription_is_subscribed(caching_topic->sub)) { + /* Increment the reference to hold on to it past the + * unsubscribe */ + ao2_ref(caching_topic->sub, +1); stasis_unsubscribe(caching_topic->sub); } else { ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n"); @@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to return NULL; } +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic) +{ + if (!caching_topic) { + return NULL; + } + + /* Hold a ref past the unsubscribe */ + ao2_ref(caching_topic, +1); + stasis_caching_unsubscribe(caching_topic); + stasis_subscription_join(caching_topic->sub); + ao2_cleanup(caching_topic); + return NULL; +} + struct cache_entry { struct stasis_message_type *type; char *id; diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 3df52d0b3..95f5f9d0e 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal( void ast_stasis_channels_shutdown(void) { + channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached); + ao2_cleanup(channel_topic_all); + channel_topic_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type); - ao2_cleanup(channel_topic_all); - channel_topic_all = NULL; - channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached); } void ast_stasis_channels_init(void) diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index 7428e2cf1..252614f62 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message) } -static void endpoints_stasis_shutdown(void) -{ - ao2_cleanup(endpoint_topic_all); - endpoint_topic_all = NULL; - - stasis_caching_unsubscribe(endpoint_topic_all_cached); - endpoint_topic_all_cached = NULL; -} - struct ast_json *ast_endpoint_snapshot_to_json( const struct ast_endpoint_snapshot *snapshot) { @@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json( return ast_json_ref(json); } +static void endpoints_stasis_shutdown(void) +{ + stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached); + endpoint_topic_all_cached = NULL; + + ao2_cleanup(endpoint_topic_all); + endpoint_topic_all = NULL; +} + int ast_endpoint_stasis_init(void) { ast_register_atexit(endpoints_stasis_shutdown); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 97ed7ad92..c7acca1ff 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -74,6 +74,7 @@ static void router_dtor(void *obj) size_t i; ast_assert(!stasis_subscription_is_subscribed(router->subscription)); + ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; for (i = 0; i < router->num_routes_current; ++i) { ao2_cleanup(router->routes[i]); @@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router) stasis_unsubscribe(router->subscription); } +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router) +{ + if (!router) { + return; + } + stasis_unsubscribe_and_join(router->subscription); +} + +int stasis_message_router_is_done(struct stasis_message_router *router) +{ + if (!router) { + /* Null router is about as done as you can get */ + return 1; + } + + return stasis_subscription_is_done(router->subscription); +} + + static struct stasis_message_route *route_create( struct stasis_message_type *message_type, stasis_subscription_cb callback, |