diff options
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r-- | tests/test_stasis.c | 128 |
1 files changed, 125 insertions, 3 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 0dc9182a3..663663355 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -230,7 +230,7 @@ static int consumer_wait_for_completion(struct consumer *consumer) { struct timeval start = ast_tvnow(); struct timespec end = { - .tv_sec = start.tv_sec + 30, + .tv_sec = start.tv_sec + 3, .tv_nsec = start.tv_usec * 1000 }; @@ -867,7 +867,7 @@ AST_TEST_DEFINE(cache_dump) AST_TEST_DEFINE(route_conflicts) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); @@ -913,7 +913,7 @@ AST_TEST_DEFINE(route_conflicts) AST_TEST_DEFINE(router) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); RAII_VAR(char *, test_data, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); @@ -1006,6 +1006,126 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +static const char *cache_simple(struct stasis_message *message) { + const char *type_name = + stasis_message_type_name(stasis_message_type(message)); + if (!ast_begins_with(type_name, "Cache")) { + return NULL; + } + + return "cached"; +} + +AST_TEST_DEFINE(router_cache_updates) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup); + struct stasis_cache_update *update; + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test special handling cache_update messages"; + info->description = "Test special handling cache_update messages"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + caching_topic = stasis_caching_topic_create(topic, cache_simple); + ast_test_validate(test, NULL != caching_topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("Cache1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("Cache2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("NonCache", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create( + stasis_caching_get_topic(caching_topic)); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add_cache_update( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, stasis_cache_update_type(), consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual)); + update = stasis_message_data(actual); + ast_test_validate(test, test_message_type1 == update->type); + ast_test_validate(test, test_message1 == update->new_snapshot); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual)); + update = stasis_message_data(actual); + ast_test_validate(test, test_message_type2 == update->type); + ast_test_validate(test, test_message2 == update->new_snapshot); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(no_to_json) { RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup); @@ -1160,6 +1280,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); @@ -1181,6 +1302,7 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json); |