diff options
author | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
commit | e1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch) | |
tree | 3026c96da713bafcf1126c77bde6994f348280bb /res | |
parent | 5c1396946929ab19e94c117f8ad3db5f78a450bc (diff) |
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.
To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.
In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:
single_topic ----------------> all_topic
^
|
single_topic_cached ----+----> all_topic_cached
|
+----> cache
This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.
Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.
(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r-- | res/ari/resource_bridges.c | 10 | ||||
-rw-r--r-- | res/ari/resource_channels.c | 18 | ||||
-rw-r--r-- | res/ari/resource_endpoints.c | 20 | ||||
-rw-r--r-- | res/res_agi.c | 2 | ||||
-rw-r--r-- | res/res_chan_stats.c | 2 | ||||
-rw-r--r-- | res/res_jabber.c | 2 | ||||
-rw-r--r-- | res/res_stasis.c | 4 | ||||
-rw-r--r-- | res/res_xmpp.c | 2 | ||||
-rw-r--r-- | res/stasis/control.c | 6 |
9 files changed, 31 insertions, 35 deletions
diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c index 17a8bb132..7730d0cd9 100644 --- a/res/ari/resource_bridges.c +++ b/res/ari/resource_bridges.c @@ -448,22 +448,22 @@ void ast_ari_delete_bridge(struct ast_variable *headers, struct ast_delete_bridg void ast_ari_get_bridges(struct ast_variable *headers, struct ast_get_bridges_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_bridge_topic_all_cached(); - if (!caching_topic) { + cache = ast_bridge_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_bridge_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_bridge_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index 7f3a91fba..dd323bac5 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -466,18 +466,18 @@ void ast_ari_get_channel(struct ast_variable *headers, struct ast_ari_response *response) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct stasis_caching_topic *caching_topic; + struct stasis_cache *cache; struct ast_channel_snapshot *snapshot; - caching_topic = ast_channel_topic_all_cached(); - if (!caching_topic) { + cache = ast_channel_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(), + msg = stasis_cache_get(cache, ast_channel_snapshot_type(), args->channel_id); if (!msg) { ast_ari_response_error( @@ -516,22 +516,22 @@ void ast_ari_get_channels(struct ast_variable *headers, struct ast_get_channels_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_channel_topic_all_cached(); - if (!caching_topic) { + cache = ast_channel_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_channel_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/ari/resource_endpoints.c b/res/ari/resource_endpoints.c index bb28df03c..35d8a45cc 100644 --- a/res/ari/resource_endpoints.c +++ b/res/ari/resource_endpoints.c @@ -37,22 +37,22 @@ void ast_ari_get_endpoints(struct ast_variable *headers, struct ast_get_endpoints_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; - caching_topic = ast_endpoint_topic_all_cached(); - if (!caching_topic) { + cache = ast_endpoint_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; @@ -83,7 +83,7 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers, struct ast_get_endpoints_by_tech_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; @@ -91,16 +91,16 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers, /* TODO - if tech isn't a recognized type of endpoint, it should 404 */ - caching_topic = ast_endpoint_topic_all_cached(); - if (!caching_topic) { + cache = ast_endpoint_cache(); + if (!cache) { ast_ari_response_error( response, 500, "Internal Server Error", "Message bus not initialized"); return; } - ao2_ref(caching_topic, +1); + ao2_ref(cache, +1); - snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type()); + snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type()); if (!snapshots) { ast_ari_response_alloc_failed(response); return; diff --git a/res/res_agi.c b/res/res_agi.c index 07735130c..5c79ec27f 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -2771,7 +2771,7 @@ static int handle_channelstatus(struct ast_channel *chan, AGI *agi, int argc, co RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); /* one argument: look for info on the specified channel */ - if ((msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), argv[2]))) { + if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) { struct ast_channel_snapshot *snapshot = stasis_message_data(msg); ast_agi_send(agi->fd, chan, "200 result=%d\n", snapshot->state); diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index 0f39f071f..a43c564b1 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -154,7 +154,7 @@ static int load_module(void) { /* You can create a message router to route messages by type */ router = stasis_message_router_create( - stasis_caching_get_topic(ast_channel_topic_all_cached())); + ast_channel_topic_all_cached()); if (!router) { return AST_MODULE_LOAD_FAILURE; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 7ca0bf81e..0e373e5e7 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -3310,7 +3310,7 @@ static void aji_init_event_distribution(struct aji_client *client) RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup); device_state_sub = stasis_subscribe(ast_device_state_topic_all(), aji_devstate_cb, client); - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); } diff --git a/res/res_stasis.c b/res/res_stasis.c index 624950399..e4ad97eae 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -993,7 +993,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached())); + channel_router = stasis_message_router_create(ast_channel_topic_all_cached()); if (!channel_router) { return AST_MODULE_LOAD_FAILURE; } @@ -1013,7 +1013,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached())); + bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached()); if (!bridge_router) { return AST_MODULE_LOAD_FAILURE; } diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 89eb45d18..3be8aa458 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1605,7 +1605,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) return; } - cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client); xmpp_pubsub_subscribe(client, "device_state"); diff --git a/res/stasis/control.c b/res/stasis/control.c index 1fbae0c7c..94f1d700d 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -364,13 +364,9 @@ struct ast_channel_snapshot *stasis_app_control_get_snapshot( const struct stasis_app_control *control) { RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct stasis_caching_topic *caching_topic; struct ast_channel_snapshot *snapshot; - caching_topic = ast_channel_topic_all_cached(); - ast_assert(caching_topic != NULL); - - msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(), + msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), stasis_app_control_get_channel_id(control)); if (!msg) { return NULL; |