diff options
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r-- | tests/test_stasis.c | 310 |
1 files changed, 310 insertions, 0 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c index ba82e83ad..2e83e3b70 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_pool_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(char *, expected_uniqueid, NULL, ast_free); + int complete; + struct stasis_subscription_change *change; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription"; + info->description = "Test subscribe/unsubscribe messages using a threadpool subscription"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(0); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut)); + + uut = stasis_unsubscribe(uut); + complete = consumer_wait_for_completion(consumer); + ast_test_validate(test, 1 == complete); + + ast_test_validate(test, 2 == consumer->messages_rxed_len); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0])); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1])); + + change = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Subscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + change = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(publish) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test publishing with a threadpool"; + info->description = "Test publishing to a subscriber whose\n" + "subscription dictates messages are received through a\n" + "threadpool."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic with different subscribers"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order, for different subscription types: one with a dedicated\n" + "thread, the other on the Stasis threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test", NULL); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + + sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1); + ast_test_validate(test, NULL != sub1); + ao2_ref(consumer1, +1); + + sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2); + ast_test_validate(test, NULL != sub2); + ao2_ref(consumer2, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer1, 3); + ast_test_validate(test, 3 == actual_len); + + actual_len = consumer_wait_for(consumer2, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]); + + ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +AST_TEST_DEFINE(router_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test message routing via threadpool"; + info->description = "Test simple message routing when\n" + "the subscriptions dictate usage of the Stasis\n" + "threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("TestMessage1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("TestMessage2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("TestMessage3", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create_pool(topic); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, test_message_type2, consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, test_message1 == actual); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, test_message2 == actual); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + static const char *cache_simple(struct stasis_message *message) { const char *type_name = @@ -1748,8 +2050,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(message_type); AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); + AST_TEST_UNREGISTER(subscription_pool_messages); AST_TEST_UNREGISTER(publish); AST_TEST_UNREGISTER(publish_sync); + AST_TEST_UNREGISTER(publish_pool); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1757,8 +2061,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(cache_eid_aggregate); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_pool); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); + AST_TEST_UNREGISTER(subscription_interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); AST_TEST_UNREGISTER(no_to_ami); @@ -1773,8 +2079,10 @@ static int load_module(void) AST_TEST_REGISTER(message_type); AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); + AST_TEST_REGISTER(subscription_pool_messages); AST_TEST_REGISTER(publish); AST_TEST_REGISTER(publish_sync); + AST_TEST_REGISTER(publish_pool); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); @@ -1782,8 +2090,10 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(cache_eid_aggregate); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_pool); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); + AST_TEST_REGISTER(subscription_interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json); AST_TEST_REGISTER(no_to_ami); |