diff options
author | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
commit | e1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch) | |
tree | 3026c96da713bafcf1126c77bde6994f348280bb /main/endpoints.c | |
parent | 5c1396946929ab19e94c117f8ad3db5f78a450bc (diff) |
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.
To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.
In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:
single_topic ----------------> all_topic
^
|
single_topic_cached ----+----> all_topic_cached
|
+----> cache
This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.
Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.
(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/endpoints.c')
-rw-r--r-- | main/endpoints.c | 53 |
1 files changed, 26 insertions, 27 deletions
diff --git a/main/endpoints.c b/main/endpoints.c index d689f2e6e..b33e33f1a 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -58,17 +58,29 @@ struct ast_endpoint { */ int max_channels; /*! Topic for this endpoint's messages */ - struct stasis_topic *topic; - /*! - * Forwarding subscription sending messages to ast_endpoint_topic_all() - */ - struct stasis_subscription *forward; + struct stasis_cp_single *topics; /*! Router for handling this endpoint's messages */ struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; }; +struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all(); + } + return stasis_cp_single_topic(endpoint->topics); +} + +struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint) +{ + if (!endpoint) { + return ast_endpoint_topic_all_cached(); + } + return stasis_cp_single_topic_cached(endpoint->topics); +} + const char *ast_endpoint_state_to_string(enum ast_endpoint_state state) { switch (state) { @@ -88,7 +100,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); ast_assert(endpoint != NULL); - ast_assert(endpoint->topic != NULL); + ast_assert(endpoint->topics != NULL); snapshot = ast_endpoint_snapshot_create(endpoint); if (!snapshot) { @@ -98,7 +110,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint) if (!message) { return; } - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } static void endpoint_dtor(void *obj) @@ -110,11 +122,8 @@ static void endpoint_dtor(void *obj) ao2_cleanup(endpoint->router); endpoint->router = NULL; - stasis_unsubscribe(endpoint->forward); - endpoint->forward = NULL; - - ao2_cleanup(endpoint->topic); - endpoint->topic = NULL; + stasis_cp_single_unsubscribe(endpoint->topics); + endpoint->topics = NULL; ao2_cleanup(endpoint->channel_ids); endpoint->channel_ids = NULL; @@ -214,18 +223,13 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource) return NULL; } - endpoint->topic = stasis_topic_create(endpoint->id); - if (!endpoint->topic) { - return NULL; - } - - endpoint->forward = - stasis_forward_all(endpoint->topic, ast_endpoint_topic_all()); - if (!endpoint->forward) { + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { return NULL; } - endpoint->router = stasis_message_router_create(endpoint->topic); + endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } @@ -271,7 +275,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); message = stasis_cache_clear_create(clear_msg); if (message) { - stasis_publish(endpoint->topic, message); + stasis_publish(ast_endpoint_topic(endpoint), message); } } @@ -285,11 +289,6 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) return endpoint->resource; } -struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint) -{ - return endpoint ? endpoint->topic : ast_endpoint_topic_all(); -} - void ast_endpoint_set_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) { |