diff options
-rw-r--r-- | include/asterisk/stasis_cache_pattern.h | 28 | ||||
-rw-r--r-- | main/endpoints.c | 28 | ||||
-rw-r--r-- | main/stasis_cache_pattern.c | 49 |
3 files changed, 85 insertions, 20 deletions
diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h index 2ea643e19..27761351a 100644 --- a/include/asterisk/stasis_cache_pattern.h +++ b/include/asterisk/stasis_cache_pattern.h @@ -109,6 +109,8 @@ struct stasis_cp_single; /*! * \brief Create the 'one' side of the cache pattern. * + * Create the 'one' and forward to all's topic and topic_cached. + * * Dispose of using stasis_cp_single_unsubscribe(). * * \param all Corresponding all side. @@ -119,6 +121,32 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, const char *name); /*! + * \brief Create the 'one' side of the cache pattern. + * + * Create the 'one' but do not automatically forward. + * + * Dispose of using stasis_cp_single_unsubscribe(). + * + * \param all Corresponding all side. + * \param name Base name for the topics. + * \return One side instance + */ +struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, + const char *name); + +/*! + * \brief Set up a topic and topic cache forward. + * + * Forward 'from' to 'to'. + * + * \param from Source 'one' side instance. + * \param to Destination 'one' side instance. + * \retval 0 Success + * \retval -1 Failure + */ +int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to); + +/*! * \brief Stops caching and forwarding messages. * * \param one One side of the cache pattern. diff --git a/main/endpoints.c b/main/endpoints.c index ce0ab0292..a64001403 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -74,8 +74,6 @@ struct ast_endpoint { struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; - /*! Forwarding subscription from an endpoint to its tech endpoint */ - struct stasis_forward *tech_forward; }; static int endpoint_hash(const void *obj, int flags) @@ -303,13 +301,14 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } - endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), - endpoint->id); - if (!endpoint->topics) { - return NULL; - } - if (!ast_strlen_zero(resource)) { + + endpoint->topics = stasis_cp_single_create_only(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { + return NULL; + } + endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; @@ -323,11 +322,19 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } - endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), - stasis_cp_single_topic(tech_endpoint->topics)); + if (stasis_cp_single_forward(endpoint->topics, tech_endpoint->topics)) { + return NULL; + } + endpoint_publish_snapshot(endpoint); ao2_link(endpoints, endpoint); } else { + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { + return NULL; + } + ao2_link(tech_endpoints, endpoint); } @@ -375,7 +382,6 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) } ao2_unlink(endpoints, endpoint); - endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward); clear_msg = create_endpoint_snapshot_message(endpoint); if (clear_msg) { diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 9e3de367a..ccc9ebf08 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -138,6 +138,30 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, { RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); + one = stasis_cp_single_create_only(all, name); + if (!one) { + return NULL; + } + + one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); + if (!one->forward_topic_to_all) { + return NULL; + } + one->forward_cached_to_all = stasis_forward_all( + stasis_caching_get_topic(one->topic_cached), all->topic_cached); + if (!one->forward_cached_to_all) { + return NULL; + } + + ao2_ref(one, +1); + return one; +} + +struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, + const char *name) +{ + RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); + one = ao2_t_alloc(sizeof(*one), one_dtor, name); if (!one) { return NULL; @@ -152,18 +176,25 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, return NULL; } - one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); - if (!one->forward_topic_to_all) { - return NULL; + ao2_ref(one, +1); + return one; +} + +int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to) +{ + from->forward_topic_to_all = stasis_forward_all(from->topic, to->topic); + if (!from->forward_topic_to_all) { + return -1;; } - one->forward_cached_to_all = stasis_forward_all( - stasis_caching_get_topic(one->topic_cached), all->topic_cached); - if (!one->forward_cached_to_all) { - return NULL; + + from->forward_cached_to_all = stasis_forward_all( + stasis_caching_get_topic(from->topic_cached), + stasis_caching_get_topic(to->topic_cached)); + if (!from->forward_cached_to_all) { + return -1; } - ao2_ref(one, +1); - return one; + return 0; } void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) |