diff options
author | George Joseph <george.joseph@fairview5.com> | 2016-01-05 10:06:32 -0700 |
---|---|---|
committer | George Joseph <george.joseph@fairview5.com> | 2016-01-05 12:29:31 -0600 |
commit | d228b62fd437e02c0638684c1f44c92e5f1e3948 (patch) | |
tree | d1a4408b6242f0614e2c5fe6c7257e9648381cdf | |
parent | e13719bff1c4a723edf08252da17fef04b6f88cf (diff) |
stasis_cache_pattern: Backport to 13
Somehow stasis_cache_pattern got out of sync between 13 and master
and it was causing duplicate channel message issues in 13 when
related to a specific endpoint. I.E. from statsd,
'endpoints.PJSIP.1174.channels 0|g' was being emitted twice.
Backporting stasis_cache_pattern from master to 13 solved
the issue and running the unit and testsuite tests confirmed
that no new ones were created.
ASTERISK-25317 #close
Change-Id: Ia8707462f62d15eed14541c37f332a7bbbceb548
-rw-r--r-- | include/asterisk/stasis_cache_pattern.h | 21 | ||||
-rw-r--r-- | main/endpoints.c | 12 | ||||
-rw-r--r-- | main/stasis_cache_pattern.c | 21 |
3 files changed, 15 insertions, 39 deletions
diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h index 27761351a..e61d3e931 100644 --- a/include/asterisk/stasis_cache_pattern.h +++ b/include/asterisk/stasis_cache_pattern.h @@ -121,9 +121,12 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, const char *name); /*! - * \brief Create the 'one' side of the cache pattern. + * \brief Create a sink in the cache pattern * - * Create the 'one' but do not automatically forward. + * Create the 'one' but do not automatically forward to the all's topic. + * This is useful when aggregating other topic's messages created with + * \c stasis_cp_single_create in another caching topic without replicating + * those messages in the all's topics. * * Dispose of using stasis_cp_single_unsubscribe(). * @@ -131,22 +134,10 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, * \param name Base name for the topics. * \return One side instance */ -struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, +struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all, const char *name); /*! - * \brief Set up a topic and topic cache forward. - * - * Forward 'from' to 'to'. - * - * \param from Source 'one' side instance. - * \param to Destination 'one' side instance. - * \retval 0 Success - * \retval -1 Failure - */ -int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to); - -/*! * \brief Stops caching and forwarding messages. * * \param one One side of the cache pattern. diff --git a/main/endpoints.c b/main/endpoints.c index 8f3ae366e..80e7f87fd 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -74,6 +74,8 @@ struct ast_endpoint { struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; + /*! Forwarding subscription from an endpoint to its tech endpoint */ + struct stasis_forward *tech_forward; }; static int endpoint_hash(const void *obj, int flags) @@ -303,7 +305,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha if (!ast_strlen_zero(resource)) { - endpoint->topics = stasis_cp_single_create_only(ast_endpoint_cache_all(), + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), endpoint->id); if (!endpoint->topics) { return NULL; @@ -322,14 +324,13 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } - if (stasis_cp_single_forward(endpoint->topics, tech_endpoint->topics)) { - return NULL; - } + endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), + stasis_cp_single_topic(tech_endpoint->topics)); endpoint_publish_snapshot(endpoint); ao2_link(endpoints, endpoint); } else { - endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(), endpoint->id); if (!endpoint->topics) { return NULL; @@ -382,6 +383,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) } ao2_unlink(endpoints, endpoint); + endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward); clear_msg = create_endpoint_snapshot_message(endpoint); if (clear_msg) { diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index ccc9ebf08..7ccf1c181 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -138,7 +138,7 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, { RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); - one = stasis_cp_single_create_only(all, name); + one = stasis_cp_sink_create(all, name); if (!one) { return NULL; } @@ -157,7 +157,7 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, return one; } -struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, +struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all, const char *name) { RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); @@ -180,23 +180,6 @@ struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, return one; } -int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to) -{ - from->forward_topic_to_all = stasis_forward_all(from->topic, to->topic); - if (!from->forward_topic_to_all) { - return -1;; - } - - from->forward_cached_to_all = stasis_forward_all( - stasis_caching_get_topic(from->topic_cached), - stasis_caching_get_topic(to->topic_cached)); - if (!from->forward_cached_to_all) { - return -1; - } - - return 0; -} - void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) { if (!one) { |