summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/app.c2
-rw-r--r--main/devicestate.c2
-rw-r--r--main/endpoints.c7
-rw-r--r--main/manager.c4
-rw-r--r--main/manager_channels.c2
-rw-r--r--main/pbx.c8
-rw-r--r--main/stasis.c81
-rw-r--r--main/stasis_cache.c19
-rw-r--r--main/stasis_channels.c6
-rw-r--r--main/stasis_endpoints.c18
-rw-r--r--main/stasis_message_router.c21
11 files changed, 139 insertions, 31 deletions
diff --git a/main/app.c b/main/app.c
index 0cdd9d31d..0e8a68f7b 100644
--- a/main/app.c
+++ b/main/app.c
@@ -2727,7 +2727,7 @@ static void app_exit(void)
{
ao2_cleanup(mwi_topic_all);
mwi_topic_all = NULL;
- mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+ mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
ao2_cleanup(mwi_topic_pool);
mwi_topic_pool = NULL;
diff --git a/main/devicestate.c b/main/devicestate.c
index aa31dbfd6..afa9621d3 100644
--- a/main/devicestate.c
+++ b/main/devicestate.c
@@ -776,7 +776,7 @@ static void devstate_exit(void)
{
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
- device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
+ device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;
diff --git a/main/endpoints.c b/main/endpoints.c
index 95397f960..c2d0577f9 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj)
struct ast_endpoint *endpoint = obj;
/* The router should be shut down already */
- ast_assert(endpoint->router == NULL);
+ ast_assert(stasis_message_router_is_done(endpoint->router));
+ ao2_cleanup(endpoint->router);
+ endpoint->router = NULL;
stasis_unsubscribe(endpoint->forward);
endpoint->forward = NULL;
@@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
stasis_publish(endpoint->topic, message);
}
+ /* Bump refcount to hold on to the router */
+ ao2_ref(endpoint->router, +1);
stasis_message_router_unsubscribe(endpoint->router);
- endpoint->router = NULL;
}
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
diff --git a/main/manager.c b/main/manager.c
index 4d2923eb5..c9b2fbe1e 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
- if (acl_change_sub) {
- acl_change_sub = stasis_unsubscribe(acl_change_sub);
- }
+ acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
/* In order to understand what the heck is going on with the
diff --git a/main/manager_channels.c b/main/manager_channels.c
index 63380a762..e1f918868 100644
--- a/main/manager_channels.c
+++ b/main/manager_channels.c
@@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
- stasis_message_router_unsubscribe(channel_state_router);
+ stasis_message_router_unsubscribe_and_join(channel_state_router);
channel_state_router = NULL;
}
diff --git a/main/pbx.c b/main/pbx.c
index 74aeee928..9492d9559 100644
--- a/main/pbx.c
+++ b/main/pbx.c
@@ -11700,12 +11700,8 @@ static void unload_pbx(void)
{
int x;
- if (presence_state_sub) {
- presence_state_sub = stasis_unsubscribe(presence_state_sub);
- }
- if (device_state_sub) {
- device_state_sub = stasis_unsubscribe(device_state_sub);
- }
+ presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
+ device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
/* Unregister builtin applications */
for (x = 0; x < ARRAY_LEN(builtins); x++) {
diff --git a/main/stasis.c b/main/stasis.c
index 98dff95a6..d0ded401c 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -114,16 +114,30 @@ struct stasis_subscription {
stasis_subscription_cb callback;
/*! Data pointer to be handed to the callback. */
void *data;
+
+ /*! Lock for joining with subscription. */
+ ast_mutex_t join_lock;
+ /*! Condition for joining with subscription. */
+ ast_cond_t join_cond;
+ /*! Flag set when final message for sub has been received.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_rxed;
+ /*! Flag set when final message for sub has been processed.
+ * Be sure join_lock is held before reading/setting. */
+ int final_message_processed;
};
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
ast_assert(!stasis_subscription_is_subscribed(sub));
+ ast_assert(stasis_subscription_is_done(sub));
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
+ ast_mutex_destroy(&sub->join_lock);
+ ast_cond_destroy(&sub->join_cond);
}
/*!
@@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
- /* Since sub->topic doesn't change, no need to lock sub */
- sub->callback(sub->data,
- sub,
- topic,
- message);
+ /* Notify that the final message has been received */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_rxed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
+
+ /* Since sub is mostly immutable, no need to lock sub */
+ sub->callback(sub->data, sub, topic, message);
+
+ /* Notify that the final message has been processed */
+ if (stasis_subscription_final_message(sub, message)) {
+ SCOPED_MUTEX(lock, &sub->join_lock);
+ sub->final_message_processed = 1;
+ ast_cond_signal(&sub->join_cond);
+ }
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe(
sub->topic = topic;
sub->callback = callback;
sub->data = data;
+ ast_mutex_init(&sub->join_lock);
+ ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
@@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL;
}
+/*!
+ * \brief Block until the final message has been received on a subscription.
+ *
+ * \param subscription Subscription to wait on.
+ */
+void stasis_subscription_join(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ /* Wait until the processed flag has been set */
+ while (!subscription->final_message_processed) {
+ ast_cond_wait(&subscription->join_cond,
+ &subscription->join_lock);
+ }
+ }
+}
+
+int stasis_subscription_is_done(struct stasis_subscription *subscription)
+{
+ if (subscription) {
+ SCOPED_MUTEX(lock, &subscription->join_lock);
+ return subscription->final_message_rxed;
+ }
+
+ /* Null subscription is about as done as you can get */
+ return 1;
+}
+
+struct stasis_subscription *stasis_unsubscribe_and_join(
+ struct stasis_subscription *subscription)
+{
+ if (!subscription) {
+ return NULL;
+ }
+
+ /* Bump refcount to hold it past the unsubscribe */
+ ao2_ref(subscription, +1);
+ stasis_unsubscribe(subscription);
+ stasis_subscription_join(subscription);
+ /* Now decrement the refcount back */
+ ao2_cleanup(subscription);
+ return NULL;
+}
+
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
{
if (sub) {
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index 75e6b5f95..154b4f020 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -53,6 +53,8 @@ struct stasis_caching_topic {
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
+ ast_assert(stasis_subscription_is_done(caching_topic->sub));
+ ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
caching_topic->cache = NULL;
@@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
{
if (caching_topic) {
if (stasis_subscription_is_subscribed(caching_topic->sub)) {
+ /* Increment the reference to hold on to it past the
+ * unsubscribe */
+ ao2_ref(caching_topic->sub, +1);
stasis_unsubscribe(caching_topic->sub);
} else {
ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
@@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
return NULL;
}
+struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
+{
+ if (!caching_topic) {
+ return NULL;
+ }
+
+ /* Hold a ref past the unsubscribe */
+ ao2_ref(caching_topic, +1);
+ stasis_caching_unsubscribe(caching_topic);
+ stasis_subscription_join(caching_topic->sub);
+ ao2_cleanup(caching_topic);
+ return NULL;
+}
+
struct cache_entry {
struct stasis_message_type *type;
char *id;
diff --git a/main/stasis_channels.c b/main/stasis_channels.c
index 3df52d0b3..95f5f9d0e 100644
--- a/main/stasis_channels.c
+++ b/main/stasis_channels.c
@@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal(
void ast_stasis_channels_shutdown(void)
{
+ channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
+ ao2_cleanup(channel_topic_all);
+ channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
- ao2_cleanup(channel_topic_all);
- channel_topic_all = NULL;
- channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
}
void ast_stasis_channels_init(void)
diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c
index 7428e2cf1..252614f62 100644
--- a/main/stasis_endpoints.c
+++ b/main/stasis_endpoints.c
@@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message)
}
-static void endpoints_stasis_shutdown(void)
-{
- ao2_cleanup(endpoint_topic_all);
- endpoint_topic_all = NULL;
-
- stasis_caching_unsubscribe(endpoint_topic_all_cached);
- endpoint_topic_all_cached = NULL;
-}
-
struct ast_json *ast_endpoint_snapshot_to_json(
const struct ast_endpoint_snapshot *snapshot)
{
@@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json(
return ast_json_ref(json);
}
+static void endpoints_stasis_shutdown(void)
+{
+ stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
+ endpoint_topic_all_cached = NULL;
+
+ ao2_cleanup(endpoint_topic_all);
+ endpoint_topic_all = NULL;
+}
+
int ast_endpoint_stasis_init(void)
{
ast_register_atexit(endpoints_stasis_shutdown);
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index 97ed7ad92..c7acca1ff 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -74,6 +74,7 @@ static void router_dtor(void *obj)
size_t i;
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
+ ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
for (i = 0; i < router->num_routes_current; ++i) {
ao2_cleanup(router->routes[i]);
@@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router)
stasis_unsubscribe(router->subscription);
}
+void stasis_message_router_unsubscribe_and_join(
+ struct stasis_message_router *router)
+{
+ if (!router) {
+ return;
+ }
+ stasis_unsubscribe_and_join(router->subscription);
+}
+
+int stasis_message_router_is_done(struct stasis_message_router *router)
+{
+ if (!router) {
+ /* Null router is about as done as you can get */
+ return 1;
+ }
+
+ return stasis_subscription_is_done(router->subscription);
+}
+
+
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,