diff options
author | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-08-01 13:49:34 +0000 |
commit | e1b959ccbb4e47421b37a0f75a2bf89ccd34dcb1 (patch) | |
tree | 3026c96da713bafcf1126c77bde6994f348280bb /tests | |
parent | 5c1396946929ab19e94c117f8ad3db5f78a450bc (diff) |
Split caching out from the stasis_caching_topic.
In working with res_stasis, I discovered a significant limitation to
the current structure of stasis_caching_topics: you cannot subscribe
to cache updates for a single channel/bridge/endpoint/etc.
To address this, this patch splits the cache away from the
stasis_caching_topic, making it a first class object. The stasis_cache
object is shared amongst individual stasis_caching_topics that are
created per channel/endpoint/etc. These are still forwarded to global
whatever_all_cached topics, so their use from most of the code does
not change.
In making these changes, I noticed that we frequently used a similar
pattern for bridges, endpoints and channels:
single_topic ----------------> all_topic
^
|
single_topic_cached ----+----> all_topic_cached
|
+----> cache
This pattern was extracted as the 'Stasis Caching Pattern', defined in
stasis_caching_pattern.h. This avoids a lot of duplicate code between
the different domain objects.
Since the cache is now disassociated from its upstream caching topics,
this also necessitated a change to how the 'guaranteed' flag worked
for retrieving from a cache. The code for handling the caching
guarantee was extracted into a 'stasis_topic_wait' function, which
works for any stasis_topic.
(closes issue ASTERISK-22002)
Review: https://reviewboard.asterisk.org/r/2672/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_cel.c | 5 | ||||
-rw-r--r-- | tests/test_devicestate.c | 12 | ||||
-rw-r--r-- | tests/test_stasis.c | 38 | ||||
-rw-r--r-- | tests/test_stasis_endpoints.c | 2 |
4 files changed, 35 insertions, 22 deletions
diff --git a/tests/test_cel.c b/tests/test_cel.c index 31f73468c..395ec0ccc 100644 --- a/tests/test_cel.c +++ b/tests/test_cel.c @@ -224,8 +224,9 @@ static void do_sleep(void) ast_hangup((channel)); \ HANGUP_EVENT(channel, cause, dialstatus); \ APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL, NULL); \ - ao2_cleanup(stasis_cache_get_extended(ast_channel_topic_all_cached(), \ - ast_channel_snapshot_type(), ast_channel_uniqueid(channel), 1)); \ + stasis_topic_wait(ast_channel_topic_all_cached()); \ + ao2_cleanup(stasis_cache_get(ast_channel_cache(), \ + ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \ ao2_cleanup(channel); \ channel = NULL; \ } while (0) diff --git a/tests/test_devicestate.c b/tests/test_devicestate.c index 4aff9394b..ff5d681f4 100644 --- a/tests/test_devicestate.c +++ b/tests/test_devicestate.c @@ -394,7 +394,7 @@ static void cache_cleanup(int unused) { RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup); /* remove all device states created during this test */ - cache_dump = stasis_cache_dump(ast_device_state_topic_cached(), NULL); + cache_dump = stasis_cache_dump(ast_device_state_cache(), NULL); if (!cache_dump) { return; } @@ -434,7 +434,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) consumer = consumer_create(); ast_test_validate(test, NULL != consumer); - device_msg_router = stasis_message_router_create(stasis_caching_get_topic(ast_device_state_topic_cached())); + device_msg_router = stasis_message_router_create(ast_device_state_topic_cached()); ast_test_validate(test, NULL != device_msg_router); ao2_ref(consumer, +1); @@ -451,7 +451,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); ao2_cleanup(msg); @@ -466,7 +466,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_NOT_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_NOT_INUSE == device_state->state); ao2_cleanup(msg); @@ -479,7 +479,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_INUSE == consumer->state); ast_test_validate(test, AST_DEVICE_INUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_INUSE == device_state->state); ao2_cleanup(msg); @@ -492,7 +492,7 @@ AST_TEST_DEFINE(device_state_aggregation_test) ast_test_validate(test, AST_DEVICE_RINGING == consumer->state); ast_test_validate(test, AST_DEVICE_RINGINUSE == consumer->aggregate_state); - msg = stasis_cache_get(ast_device_state_topic_cached(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); + msg = stasis_cache_get(ast_device_state_cache(), ast_device_state_message_type(), UNIT_TEST_DEVICE_IDENTIFIER); device_state = stasis_message_data(msg); ast_test_validate(test, AST_DEVICE_RINGINUSE == device_state->state); ao2_cleanup(msg); diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 663663355..0b63da42e 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -614,6 +614,7 @@ AST_TEST_DEFINE(cache_passthrough) { RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -636,7 +637,9 @@ AST_TEST_DEFINE(cache_passthrough) ast_test_validate(test, NULL != non_cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -664,6 +667,7 @@ AST_TEST_DEFINE(cache) { RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -689,7 +693,9 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, NULL != cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -714,7 +720,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message1_1 == actual_update->new_snapshot); - ast_test_validate(test, test_message1_1 == stasis_cache_get(caching_topic, cache_type, "1")); + ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1")); /* stasis_cache_get returned a ref, so unref test_message1_1 */ ao2_ref(test_message1_1, -1); @@ -723,7 +729,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, NULL == actual_update->old_snapshot); ast_test_validate(test, test_message2_1 == actual_update->new_snapshot); - ast_test_validate(test, test_message2_1 == stasis_cache_get(caching_topic, cache_type, "2")); + ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2")); /* stasis_cache_get returned a ref, so unref test_message2_1 */ ao2_ref(test_message2_1, -1); @@ -739,7 +745,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message2_1 == actual_update->old_snapshot); ast_test_validate(test, test_message2_2 == actual_update->new_snapshot); - ast_test_validate(test, test_message2_2 == stasis_cache_get(caching_topic, cache_type, "2")); + ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2")); /* stasis_cache_get returned a ref, so unref test_message2_2 */ ao2_ref(test_message2_2, -1); @@ -755,7 +761,7 @@ AST_TEST_DEFINE(cache) ast_test_validate(test, topic == actual_update->topic); ast_test_validate(test, test_message1_1 == actual_update->old_snapshot); ast_test_validate(test, NULL == actual_update->new_snapshot); - ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1")); + ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1")); return AST_TEST_PASS; } @@ -764,6 +770,7 @@ AST_TEST_DEFINE(cache_dump) { RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); @@ -791,7 +798,9 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, NULL != cache_type); topic = stasis_topic_create("SomeTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_test_data_id); + cache = stasis_cache_create(cache_test_data_id); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer = consumer_create(1); ast_test_validate(test, NULL != consumer); @@ -811,7 +820,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 2 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 2 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -829,7 +838,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 3 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 2 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -847,7 +856,7 @@ AST_TEST_DEFINE(cache_dump) ast_test_validate(test, 4 == actual_len); /* Check the cache */ - cache_dump = stasis_cache_dump(caching_topic, NULL); + cache_dump = stasis_cache_dump(cache, NULL); ast_test_validate(test, NULL != cache_dump); ast_test_validate(test, 1 == ao2_container_count(cache_dump)); i = ao2_iterator_init(cache_dump, 0); @@ -858,7 +867,7 @@ AST_TEST_DEFINE(cache_dump) /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */ ao2_cleanup(cache_dump); - cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change_type()); + cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type()); ast_test_validate(test, 0 == ao2_container_count(cache_dump)); return AST_TEST_PASS; @@ -1019,7 +1028,8 @@ static const char *cache_simple(struct stasis_message *message) { AST_TEST_DEFINE(router_cache_updates) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join); RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); @@ -1051,7 +1061,9 @@ AST_TEST_DEFINE(router_cache_updates) topic = stasis_topic_create("TestTopic"); ast_test_validate(test, NULL != topic); - caching_topic = stasis_caching_topic_create(topic, cache_simple); + cache = stasis_cache_create(cache_simple); + ast_test_validate(test, NULL != cache); + caching_topic = stasis_caching_topic_create(topic, cache); ast_test_validate(test, NULL != caching_topic); consumer1 = consumer_create(1); diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index 9fe3ecfad..c0be07ca8 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -152,7 +152,7 @@ AST_TEST_DEFINE(cache_clear) ast_test_validate(test, NULL != sink); sub = stasis_subscribe( - stasis_caching_get_topic(ast_endpoint_topic_all_cached()), + ast_endpoint_topic_all_cached(), stasis_message_sink_cb(), sink); ast_test_validate(test, NULL != sub); |