diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/stasis_cache.c | 3 | ||||
-rw-r--r-- | main/stasis_cache_pattern.c | 37 |
2 files changed, 24 insertions, 16 deletions
diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 3d5065665..2ca4083df 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -426,8 +426,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, id = caching_topic->cache->id_fn(message); if (id == NULL) { - /* Object isn't cached; forward */ - stasis_forward_message(caching_topic->topic, topic, message); + /* Object isn't cached; discard */ } else { /* Update the cache */ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 18ae8617e..0ee57ba24 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -38,14 +38,16 @@ struct stasis_cp_all { struct stasis_topic *topic; struct stasis_topic *topic_cached; struct stasis_cache *cache; + + struct stasis_subscription *forward_all_to_cached; }; struct stasis_cp_single { struct stasis_topic *topic; struct stasis_caching_topic *topic_cached; - struct stasis_subscription *forward; - struct stasis_subscription *forward_cached; + struct stasis_subscription *forward_topic_to_all; + struct stasis_subscription *forward_cached_to_all; }; static void all_dtor(void *obj) @@ -53,8 +55,13 @@ static void all_dtor(void *obj) struct stasis_cp_all *all = obj; ao2_cleanup(all->topic); + all->topic = NULL; ao2_cleanup(all->topic_cached); + all->topic_cached = NULL; ao2_cleanup(all->cache); + all->cache = NULL; + stasis_unsubscribe_and_join(all->forward_all_to_cached); + all->forward_all_to_cached = NULL; } struct stasis_cp_all *stasis_cp_all_create(const char *name, @@ -76,8 +83,11 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name, all->topic = stasis_topic_create(name); all->topic_cached = stasis_topic_create(cached_name); all->cache = stasis_cache_create(id_fn); + all->forward_all_to_cached = + stasis_forward_all(all->topic, all->topic_cached); - if (!all->topic || !all->topic_cached || !all->cache) { + if (!all->topic || !all->topic_cached || !all->cache || + !all->forward_all_to_cached) { return NULL; } @@ -116,8 +126,8 @@ static void one_dtor(void *obj) /* Should already be unsubscribed */ ast_assert(one->topic_cached == NULL); - ast_assert(one->forward == NULL); - ast_assert(one->forward_cached == NULL); + ast_assert(one->forward_topic_to_all == NULL); + ast_assert(one->forward_cached_to_all == NULL); ao2_cleanup(one->topic); one->topic = NULL; @@ -142,13 +152,13 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, return NULL; } - one->forward = stasis_forward_all(one->topic, all->topic); - if (!one->forward) { + one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); + if (!one->forward_topic_to_all) { return NULL; } - one->forward_cached = stasis_forward_all( + one->forward_cached_to_all = stasis_forward_all( stasis_caching_get_topic(one->topic_cached), all->topic_cached); - if (!one->forward_cached) { + if (!one->forward_cached_to_all) { return NULL; } @@ -162,12 +172,11 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) return; } + stasis_unsubscribe(one->forward_topic_to_all); + one->forward_topic_to_all = NULL; + stasis_unsubscribe(one->forward_cached_to_all); + one->forward_cached_to_all = NULL; stasis_caching_unsubscribe(one->topic_cached); - one->topic_cached = NULL; - stasis_unsubscribe(one->forward); - one->forward = NULL; - stasis_unsubscribe(one->forward_cached); - one->forward_cached = NULL; } struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one) |