summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
committerDavid M. Lee <dlee@digium.com>2013-08-01 13:49:34 +0000
commite1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch)
tree3026c96da713bafcf1126c77bde6994f348280bb /res
parent5c1396946929ab19e94c117f8ad3db5f78a450bc (diff)
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to the current structure of stasis_caching_topics: you cannot subscribe to cache updates for a single channel/bridge/endpoint/etc. To address this, this patch splits the cache away from the stasis_caching_topic, making it a first class object. The stasis_cache object is shared amongst individual stasis_caching_topics that are created per channel/endpoint/etc. These are still forwarded to global whatever_all_cached topics, so their use from most of the code does not change. In making these changes, I noticed that we frequently used a similar pattern for bridges, endpoints and channels: single_topic ----------------> all_topic ^ | single_topic_cached ----+----> all_topic_cached | +----> cache This pattern was extracted as the 'Stasis Caching Pattern', defined in stasis_caching_pattern.h. This avoids a lot of duplicate code between the different domain objects. Since the cache is now disassociated from its upstream caching topics, this also necessitated a change to how the 'guaranteed' flag worked for retrieving from a cache. The code for handling the caching guarantee was extracted into a 'stasis_topic_wait' function, which works for any stasis_topic. (closes issue ASTERISK-22002) Review: https://reviewboard.asterisk.org/r/2672/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res')
-rw-r--r--res/ari/resource_bridges.c10
-rw-r--r--res/ari/resource_channels.c18
-rw-r--r--res/ari/resource_endpoints.c20
-rw-r--r--res/res_agi.c2
-rw-r--r--res/res_chan_stats.c2
-rw-r--r--res/res_jabber.c2
-rw-r--r--res/res_stasis.c4
-rw-r--r--res/res_xmpp.c2
-rw-r--r--res/stasis/control.c6
9 files changed, 31 insertions, 35 deletions
diff --git a/res/ari/resource_bridges.c b/res/ari/resource_bridges.c
index 17a8bb132..7730d0cd9 100644
--- a/res/ari/resource_bridges.c
+++ b/res/ari/resource_bridges.c
@@ -448,22 +448,22 @@ void ast_ari_delete_bridge(struct ast_variable *headers, struct ast_delete_bridg
void ast_ari_get_bridges(struct ast_variable *headers, struct ast_get_bridges_args *args, struct ast_ari_response *response)
{
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i;
void *obj;
- caching_topic = ast_bridge_topic_all_cached();
- if (!caching_topic) {
+ cache = ast_bridge_cache();
+ if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
- ao2_ref(caching_topic, +1);
+ ao2_ref(cache, +1);
- snapshots = stasis_cache_dump(caching_topic, ast_bridge_snapshot_type());
+ snapshots = stasis_cache_dump(cache, ast_bridge_snapshot_type());
if (!snapshots) {
ast_ari_response_alloc_failed(response);
return;
diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c
index 7f3a91fba..dd323bac5 100644
--- a/res/ari/resource_channels.c
+++ b/res/ari/resource_channels.c
@@ -466,18 +466,18 @@ void ast_ari_get_channel(struct ast_variable *headers,
struct ast_ari_response *response)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- struct stasis_caching_topic *caching_topic;
+ struct stasis_cache *cache;
struct ast_channel_snapshot *snapshot;
- caching_topic = ast_channel_topic_all_cached();
- if (!caching_topic) {
+ cache = ast_channel_cache();
+ if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
- msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(),
+ msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
args->channel_id);
if (!msg) {
ast_ari_response_error(
@@ -516,22 +516,22 @@ void ast_ari_get_channels(struct ast_variable *headers,
struct ast_get_channels_args *args,
struct ast_ari_response *response)
{
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i;
void *obj;
- caching_topic = ast_channel_topic_all_cached();
- if (!caching_topic) {
+ cache = ast_channel_cache();
+ if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
- ao2_ref(caching_topic, +1);
+ ao2_ref(cache, +1);
- snapshots = stasis_cache_dump(caching_topic, ast_channel_snapshot_type());
+ snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type());
if (!snapshots) {
ast_ari_response_alloc_failed(response);
return;
diff --git a/res/ari/resource_endpoints.c b/res/ari/resource_endpoints.c
index bb28df03c..35d8a45cc 100644
--- a/res/ari/resource_endpoints.c
+++ b/res/ari/resource_endpoints.c
@@ -37,22 +37,22 @@ void ast_ari_get_endpoints(struct ast_variable *headers,
struct ast_get_endpoints_args *args,
struct ast_ari_response *response)
{
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i;
void *obj;
- caching_topic = ast_endpoint_topic_all_cached();
- if (!caching_topic) {
+ cache = ast_endpoint_cache();
+ if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
- ao2_ref(caching_topic, +1);
+ ao2_ref(cache, +1);
- snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type());
if (!snapshots) {
ast_ari_response_alloc_failed(response);
return;
@@ -83,7 +83,7 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers,
struct ast_get_endpoints_by_tech_args *args,
struct ast_ari_response *response)
{
- RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i;
@@ -91,16 +91,16 @@ void ast_ari_get_endpoints_by_tech(struct ast_variable *headers,
/* TODO - if tech isn't a recognized type of endpoint, it should 404 */
- caching_topic = ast_endpoint_topic_all_cached();
- if (!caching_topic) {
+ cache = ast_endpoint_cache();
+ if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
- ao2_ref(caching_topic, +1);
+ ao2_ref(cache, +1);
- snapshots = stasis_cache_dump(caching_topic, ast_endpoint_snapshot_type());
+ snapshots = stasis_cache_dump(cache, ast_endpoint_snapshot_type());
if (!snapshots) {
ast_ari_response_alloc_failed(response);
return;
diff --git a/res/res_agi.c b/res/res_agi.c
index 07735130c..5c79ec27f 100644
--- a/res/res_agi.c
+++ b/res/res_agi.c
@@ -2771,7 +2771,7 @@ static int handle_channelstatus(struct ast_channel *chan, AGI *agi, int argc, co
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
/* one argument: look for info on the specified channel */
- if ((msg = stasis_cache_get(ast_channel_topic_all_cached_by_name(), ast_channel_snapshot_type(), argv[2]))) {
+ if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) {
struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
ast_agi_send(agi->fd, chan, "200 result=%d\n", snapshot->state);
diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c
index 0f39f071f..a43c564b1 100644
--- a/res/res_chan_stats.c
+++ b/res/res_chan_stats.c
@@ -154,7 +154,7 @@ static int load_module(void)
{
/* You can create a message router to route messages by type */
router = stasis_message_router_create(
- stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ ast_channel_topic_all_cached());
if (!router) {
return AST_MODULE_LOAD_FAILURE;
}
diff --git a/res/res_jabber.c b/res/res_jabber.c
index 7ca0bf81e..0e373e5e7 100644
--- a/res/res_jabber.c
+++ b/res/res_jabber.c
@@ -3310,7 +3310,7 @@ static void aji_init_event_distribution(struct aji_client *client)
RAII_VAR(struct ao2_container *, cached, NULL, ao2_cleanup);
device_state_sub = stasis_subscribe(ast_device_state_topic_all(),
aji_devstate_cb, client);
- cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+ cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
}
diff --git a/res/res_stasis.c b/res/res_stasis.c
index 624950399..e4ad97eae 100644
--- a/res/res_stasis.c
+++ b/res/res_stasis.c
@@ -993,7 +993,7 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
- channel_router = stasis_message_router_create(stasis_caching_get_topic(ast_channel_topic_all_cached()));
+ channel_router = stasis_message_router_create(ast_channel_topic_all_cached());
if (!channel_router) {
return AST_MODULE_LOAD_FAILURE;
}
@@ -1013,7 +1013,7 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
- bridge_router = stasis_message_router_create(stasis_caching_get_topic(ast_bridge_topic_all_cached()));
+ bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached());
if (!bridge_router) {
return AST_MODULE_LOAD_FAILURE;
}
diff --git a/res/res_xmpp.c b/res/res_xmpp.c
index 89eb45d18..3be8aa458 100644
--- a/res/res_xmpp.c
+++ b/res/res_xmpp.c
@@ -1605,7 +1605,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
return;
}
- cached = stasis_cache_dump(ast_device_state_topic_cached(), NULL);
+ cached = stasis_cache_dump(ast_device_state_cache(), NULL);
ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);
xmpp_pubsub_subscribe(client, "device_state");
diff --git a/res/stasis/control.c b/res/stasis/control.c
index 1fbae0c7c..94f1d700d 100644
--- a/res/stasis/control.c
+++ b/res/stasis/control.c
@@ -364,13 +364,9 @@ struct ast_channel_snapshot *stasis_app_control_get_snapshot(
const struct stasis_app_control *control)
{
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- struct stasis_caching_topic *caching_topic;
struct ast_channel_snapshot *snapshot;
- caching_topic = ast_channel_topic_all_cached();
- ast_assert(caching_topic != NULL);
-
- msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(),
+ msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
stasis_app_control_get_channel_id(control));
if (!msg) {
return NULL;