summaryrefslogtreecommitdiff
path: root/main/stasis_cache_pattern.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/stasis_cache_pattern.c')
-rw-r--r--main/stasis_cache_pattern.c37
1 files changed, 23 insertions, 14 deletions
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index 18ae8617e..0ee57ba24 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -38,14 +38,16 @@ struct stasis_cp_all {
struct stasis_topic *topic;
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
+
+ struct stasis_subscription *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
- struct stasis_subscription *forward;
- struct stasis_subscription *forward_cached;
+ struct stasis_subscription *forward_topic_to_all;
+ struct stasis_subscription *forward_cached_to_all;
};
static void all_dtor(void *obj)
@@ -53,8 +55,13 @@ static void all_dtor(void *obj)
struct stasis_cp_all *all = obj;
ao2_cleanup(all->topic);
+ all->topic = NULL;
ao2_cleanup(all->topic_cached);
+ all->topic_cached = NULL;
ao2_cleanup(all->cache);
+ all->cache = NULL;
+ stasis_unsubscribe_and_join(all->forward_all_to_cached);
+ all->forward_all_to_cached = NULL;
}
struct stasis_cp_all *stasis_cp_all_create(const char *name,
@@ -76,8 +83,11 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name,
all->topic = stasis_topic_create(name);
all->topic_cached = stasis_topic_create(cached_name);
all->cache = stasis_cache_create(id_fn);
+ all->forward_all_to_cached =
+ stasis_forward_all(all->topic, all->topic_cached);
- if (!all->topic || !all->topic_cached || !all->cache) {
+ if (!all->topic || !all->topic_cached || !all->cache ||
+ !all->forward_all_to_cached) {
return NULL;
}
@@ -116,8 +126,8 @@ static void one_dtor(void *obj)
/* Should already be unsubscribed */
ast_assert(one->topic_cached == NULL);
- ast_assert(one->forward == NULL);
- ast_assert(one->forward_cached == NULL);
+ ast_assert(one->forward_topic_to_all == NULL);
+ ast_assert(one->forward_cached_to_all == NULL);
ao2_cleanup(one->topic);
one->topic = NULL;
@@ -142,13 +152,13 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
return NULL;
}
- one->forward = stasis_forward_all(one->topic, all->topic);
- if (!one->forward) {
+ one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
+ if (!one->forward_topic_to_all) {
return NULL;
}
- one->forward_cached = stasis_forward_all(
+ one->forward_cached_to_all = stasis_forward_all(
stasis_caching_get_topic(one->topic_cached), all->topic_cached);
- if (!one->forward_cached) {
+ if (!one->forward_cached_to_all) {
return NULL;
}
@@ -162,12 +172,11 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
return;
}
+ stasis_unsubscribe(one->forward_topic_to_all);
+ one->forward_topic_to_all = NULL;
+ stasis_unsubscribe(one->forward_cached_to_all);
+ one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
- one->topic_cached = NULL;
- stasis_unsubscribe(one->forward);
- one->forward = NULL;
- stasis_unsubscribe(one->forward_cached);
- one->forward_cached = NULL;
}
struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)