summaryrefslogtreecommitdiff
path: root/tests/test_stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r--tests/test_stasis.c128
1 files changed, 125 insertions, 3 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 0dc9182a3..663663355 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -230,7 +230,7 @@ static int consumer_wait_for_completion(struct consumer *consumer)
{
struct timeval start = ast_tvnow();
struct timespec end = {
- .tv_sec = start.tv_sec + 30,
+ .tv_sec = start.tv_sec + 3,
.tv_nsec = start.tv_usec * 1000
};
@@ -867,7 +867,7 @@ AST_TEST_DEFINE(cache_dump)
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
@@ -913,7 +913,7 @@ AST_TEST_DEFINE(route_conflicts)
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(char *, test_data, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
@@ -1006,6 +1006,126 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS;
}
+static const char *cache_simple(struct stasis_message *message) {
+ const char *type_name =
+ stasis_message_type_name(stasis_message_type(message));
+ if (!ast_begins_with(type_name, "Cache")) {
+ return NULL;
+ }
+
+ return "cached";
+}
+
+AST_TEST_DEFINE(router_cache_updates)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
+ RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
+ struct stasis_cache_update *update;
+ int actual_len, ret;
+ struct stasis_message *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test special handling cache_update messages";
+ info->description = "Test special handling cache_update messages";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ caching_topic = stasis_caching_topic_create(topic, cache_simple);
+ ast_test_validate(test, NULL != caching_topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+ consumer3 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer3);
+
+ test_message_type1 = stasis_message_type_create("Cache1", NULL);
+ ast_test_validate(test, NULL != test_message_type1);
+ test_message_type2 = stasis_message_type_create("Cache2", NULL);
+ ast_test_validate(test, NULL != test_message_type2);
+ test_message_type3 = stasis_message_type_create("NonCache", NULL);
+ ast_test_validate(test, NULL != test_message_type3);
+
+ uut = stasis_message_router_create(
+ stasis_caching_get_topic(caching_topic));
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add_cache_update(
+ uut, test_message_type1, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer1, +1);
+ ret = stasis_message_router_add(
+ uut, stasis_cache_update_type(), consumer_exec, consumer2);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer2, +1);
+ ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer3, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message1 = stasis_message_create(test_message_type1, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type2, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type3, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual = consumer1->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type1 == update->type);
+ ast_test_validate(test, test_message1 == update->new_snapshot);
+
+ actual = consumer2->messages_rxed[0];
+ ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
+ update = stasis_message_data(actual);
+ ast_test_validate(test, test_message_type2 == update->type);
+ ast_test_validate(test, test_message2 == update->new_snapshot);
+
+ actual = consumer3->messages_rxed[0];
+ ast_test_validate(test, test_message3 == actual);
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(no_to_json)
{
RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
@@ -1160,6 +1280,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
@@ -1181,6 +1302,7 @@ static int load_module(void)
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);