summaryrefslogtreecommitdiff
path: root/main/endpoints.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
committerDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
commite1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch)
tree3026c96da713bafcf1126c77bde6994f348280bb /main/endpoints.c
parent5c1396946929ab19e94c117f8ad3db5f78a450bc (diff)
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to the current structure of stasis_caching_topics: you cannot subscribe to cache updates for a single channel/bridge/endpoint/etc. To address this, this patch splits the cache away from the stasis_caching_topic, making it a first class object. The stasis_cache object is shared amongst individual stasis_caching_topics that are created per channel/endpoint/etc. These are still forwarded to global whatever_all_cached topics, so their use from most of the code does not change. In making these changes, I noticed that we frequently used a similar pattern for bridges, endpoints and channels: single_topic ----------------> all_topic ^ | single_topic_cached ----+----> all_topic_cached | +----> cache This pattern was extracted as the 'Stasis Caching Pattern', defined in stasis_caching_pattern.h. This avoids a lot of duplicate code between the different domain objects. Since the cache is now disassociated from its upstream caching topics, this also necessitated a change to how the 'guaranteed' flag worked for retrieving from a cache. The code for handling the caching guarantee was extracted into a 'stasis_topic_wait' function, which works for any stasis_topic. (closes issue ASTERISK-22002) Review: https://reviewboard.asterisk.org/r/2672/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/endpoints.c')
-rw-r--r--main/endpoints.c53
1 files changed, 26 insertions, 27 deletions
diff --git a/main/endpoints.c b/main/endpoints.c
index d689f2e6e..b33e33f1a 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -58,17 +58,29 @@ struct ast_endpoint {
*/
int max_channels;
/*! Topic for this endpoint's messages */
- struct stasis_topic *topic;
- /*!
- * Forwarding subscription sending messages to ast_endpoint_topic_all()
- */
- struct stasis_subscription *forward;
+ struct stasis_cp_single *topics;
/*! Router for handling this endpoint's messages */
struct stasis_message_router *router;
/*! ast_str_container of channels associated with this endpoint */
struct ao2_container *channel_ids;
};
+struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
+{
+ if (!endpoint) {
+ return ast_endpoint_topic_all();
+ }
+ return stasis_cp_single_topic(endpoint->topics);
+}
+
+struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
+{
+ if (!endpoint) {
+ return ast_endpoint_topic_all_cached();
+ }
+ return stasis_cp_single_topic_cached(endpoint->topics);
+}
+
const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
{
switch (state) {
@@ -88,7 +100,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
ast_assert(endpoint != NULL);
- ast_assert(endpoint->topic != NULL);
+ ast_assert(endpoint->topics != NULL);
snapshot = ast_endpoint_snapshot_create(endpoint);
if (!snapshot) {
@@ -98,7 +110,7 @@ static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
if (!message) {
return;
}
- stasis_publish(endpoint->topic, message);
+ stasis_publish(ast_endpoint_topic(endpoint), message);
}
static void endpoint_dtor(void *obj)
@@ -110,11 +122,8 @@ static void endpoint_dtor(void *obj)
ao2_cleanup(endpoint->router);
endpoint->router = NULL;
- stasis_unsubscribe(endpoint->forward);
- endpoint->forward = NULL;
-
- ao2_cleanup(endpoint->topic);
- endpoint->topic = NULL;
+ stasis_cp_single_unsubscribe(endpoint->topics);
+ endpoint->topics = NULL;
ao2_cleanup(endpoint->channel_ids);
endpoint->channel_ids = NULL;
@@ -214,18 +223,13 @@ struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
return NULL;
}
- endpoint->topic = stasis_topic_create(endpoint->id);
- if (!endpoint->topic) {
- return NULL;
- }
-
- endpoint->forward =
- stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
- if (!endpoint->forward) {
+ endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
+ endpoint->id);
+ if (!endpoint->topics) {
return NULL;
}
- endpoint->router = stasis_message_router_create(endpoint->topic);
+ endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
return NULL;
}
@@ -271,7 +275,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
message = stasis_cache_clear_create(clear_msg);
if (message) {
- stasis_publish(endpoint->topic, message);
+ stasis_publish(ast_endpoint_topic(endpoint), message);
}
}
@@ -285,11 +289,6 @@ const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
return endpoint->resource;
}
-struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
-{
- return endpoint ? endpoint->topic : ast_endpoint_topic_all();
-}
-
void ast_endpoint_set_state(struct ast_endpoint *endpoint,
enum ast_endpoint_state state)
{