summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/stasis_cache_pattern.h28
-rw-r--r--main/endpoints.c28
-rw-r--r--main/stasis_cache_pattern.c49
3 files changed, 85 insertions, 20 deletions
diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h
index 2ea643e19..27761351a 100644
--- a/include/asterisk/stasis_cache_pattern.h
+++ b/include/asterisk/stasis_cache_pattern.h
@@ -109,6 +109,8 @@ struct stasis_cp_single;
/*!
* \brief Create the 'one' side of the cache pattern.
*
+ * Create the 'one' and forward to all's topic and topic_cached.
+ *
* Dispose of using stasis_cp_single_unsubscribe().
*
* \param all Corresponding all side.
@@ -119,6 +121,32 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
const char *name);
/*!
+ * \brief Create the 'one' side of the cache pattern.
+ *
+ * Create the 'one' but do not automatically forward.
+ *
+ * Dispose of using stasis_cp_single_unsubscribe().
+ *
+ * \param all Corresponding all side.
+ * \param name Base name for the topics.
+ * \return One side instance
+ */
+struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all,
+ const char *name);
+
+/*!
+ * \brief Set up a topic and topic cache forward.
+ *
+ * Forward 'from' to 'to'.
+ *
+ * \param from Source 'one' side instance.
+ * \param to Destination 'one' side instance.
+ * \retval 0 Success
+ * \retval -1 Failure
+ */
+int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to);
+
+/*!
* \brief Stops caching and forwarding messages.
*
* \param one One side of the cache pattern.
diff --git a/main/endpoints.c b/main/endpoints.c
index 0155adfeb..8f3ae366e 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -74,8 +74,6 @@ struct ast_endpoint {
struct stasis_message_router *router;
/*! ast_str_container of channels associated with this endpoint */
struct ao2_container *channel_ids;
- /*! Forwarding subscription from an endpoint to its tech endpoint */
- struct stasis_forward *tech_forward;
};
static int endpoint_hash(const void *obj, int flags)
@@ -303,13 +301,14 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
return NULL;
}
- endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
- endpoint->id);
- if (!endpoint->topics) {
- return NULL;
- }
-
if (!ast_strlen_zero(resource)) {
+
+ endpoint->topics = stasis_cp_single_create_only(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
+ return NULL;
+ }
+
endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
return NULL;
@@ -323,11 +322,19 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
return NULL;
}
- endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
- stasis_cp_single_topic(tech_endpoint->topics));
+ if (stasis_cp_single_forward(endpoint->topics, tech_endpoint->topics)) {
+ return NULL;
+ }
+
endpoint_publish_snapshot(endpoint);
ao2_link(endpoints, endpoint);
} else {
+ endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
+ return NULL;
+ }
+
ao2_link(tech_endpoints, endpoint);
}
@@ -375,7 +382,6 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
}
ao2_unlink(endpoints, endpoint);
- endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
clear_msg = create_endpoint_snapshot_message(endpoint);
if (clear_msg) {
diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c
index 9e3de367a..ccc9ebf08 100644
--- a/main/stasis_cache_pattern.c
+++ b/main/stasis_cache_pattern.c
@@ -138,6 +138,30 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
{
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
+ one = stasis_cp_single_create_only(all, name);
+ if (!one) {
+ return NULL;
+ }
+
+ one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
+ if (!one->forward_topic_to_all) {
+ return NULL;
+ }
+ one->forward_cached_to_all = stasis_forward_all(
+ stasis_caching_get_topic(one->topic_cached), all->topic_cached);
+ if (!one->forward_cached_to_all) {
+ return NULL;
+ }
+
+ ao2_ref(one, +1);
+ return one;
+}
+
+struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all,
+ const char *name)
+{
+ RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
+
one = ao2_t_alloc(sizeof(*one), one_dtor, name);
if (!one) {
return NULL;
@@ -152,18 +176,25 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
return NULL;
}
- one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
- if (!one->forward_topic_to_all) {
- return NULL;
+ ao2_ref(one, +1);
+ return one;
+}
+
+int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to)
+{
+ from->forward_topic_to_all = stasis_forward_all(from->topic, to->topic);
+ if (!from->forward_topic_to_all) {
+ return -1;;
}
- one->forward_cached_to_all = stasis_forward_all(
- stasis_caching_get_topic(one->topic_cached), all->topic_cached);
- if (!one->forward_cached_to_all) {
- return NULL;
+
+ from->forward_cached_to_all = stasis_forward_all(
+ stasis_caching_get_topic(from->topic_cached),
+ stasis_caching_get_topic(to->topic_cached));
+ if (!from->forward_cached_to_all) {
+ return -1;
}
- ao2_ref(one, +1);
- return one;
+ return 0;
}
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)