summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Jordan <mjordan@digium.com>2015-12-28 15:11:06 -0600
committerGerrit Code Review <gerrit2@gerrit.digium.api>2015-12-28 15:11:07 -0600
commit37f9050f777ae559777ba41a28f143d185f85b46 (patch)
treeec8cc854a5bc2145d7458efcb3a6ae725899e5ec
parent96b32e0321bf37322c85bc57721903dc4a3cc968 (diff)
parent22db16fa8143bc669de1037462f23ed93b8f1043 (diff)
Merge "endpoint/stasis: Eliminate duplicate events on endpoint status change"
-rw-r--r--include/asterisk/stasis_cache_pattern.h19
-rw-r--r--main/endpoints.c20
-rw-r--r--main/stasis_cache_pattern.c34
3 files changed, 57 insertions, 16 deletions
diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h
index 2ea643e19..e61d3e931 100644
--- a/include/asterisk/stasis_cache_pattern.h
+++ b/include/asterisk/stasis_cache_pattern.h
@@ -109,6 +109,8 @@ struct stasis_cp_single;
/*!
* \brief Create the 'one' side of the cache pattern.
*
+ * Create the 'one' and forward to all's topic and topic_cached.
+ *
* Dispose of using stasis_cp_single_unsubscribe().
*
* \param all Corresponding all side.
@@ -119,6 +121,23 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
const char *name);
/*!
+ * \brief Create a sink in the cache pattern
+ *
+ * Create the 'one' but do not automatically forward to the all's topic.
+ * This is useful when aggregating other topic's messages created with
+ * \c stasis_cp_single_create in another caching topic without replicating
+ * those messages in the all's topics.
+ *
+ * Dispose of using stasis_cp_single_unsubscribe().
+ *
+ * \param all Corresponding all side.
+ * \param name Base name for the topics.
+ * \return One side instance
+ */
+struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all,
+ const char *name);
+
+/*!
* \brief Stops caching and forwarding messages.
*
* \param one One side of the cache pattern.
diff --git a/main/endpoints.c b/main/endpoints.c
index 21326561c..b73edd379 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -303,13 +303,14 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
return NULL;
}
- endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
- endpoint->id);
- if (!endpoint->topics) {
- return NULL;
- }
-
if (!ast_strlen_zero(resource)) {
+
+ endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
+ return NULL;
+ }
+
endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
return NULL;
@@ -325,9 +326,16 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
stasis_cp_single_topic(tech_endpoint->topics));
+
endpoint_publish_snapshot(endpoint);
ao2_link(endpoints, endpoint);
} else {
+ endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
+ return NULL;
+ }
+
ao2_link(tech_endpoints, endpoint);
}
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index bbe63ba1d..66563c4c6 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -138,20 +138,11 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
{
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
- one = ao2_t_alloc(sizeof(*one), one_dtor, name);
+ one = stasis_cp_sink_create(all, name);
if (!one) {
return NULL;
}
- one->topic = stasis_topic_create(name);
- if (!one->topic) {
- return NULL;
- }
- one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
- if (!one->topic_cached) {
- return NULL;
- }
-
one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
if (!one->forward_topic_to_all) {
return NULL;
@@ -166,6 +157,29 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
return one;
}
+struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all,
+ const char *name)
+{
+ RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
+
+ one = ao2_t_alloc(sizeof(*one), one_dtor, name);
+ if (!one) {
+ return NULL;
+ }
+
+ one->topic = stasis_topic_create(name);
+ if (!one->topic) {
+ return NULL;
+ }
+ one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
+ if (!one->topic_cached) {
+ return NULL;
+ }
+
+ ao2_ref(one, +1);
+ return one;
+}
+
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
{
if (!one) {