summaryrefslogtreecommitdiff
path: root/tests/test_stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r--tests/test_stasis.c310
1 files changed, 310 insertions, 0 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index ba82e83ad..2e83e3b70 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(subscription_pool_messages)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+ int complete;
+ struct stasis_subscription_change *change;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
+ info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(0);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+ expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+ uut = stasis_unsubscribe(uut);
+ complete = consumer_wait_for_completion(consumer);
+ ast_test_validate(test, 1 == complete);
+
+ ast_test_validate(test, 2 == consumer->messages_rxed_len);
+ ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
+ ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
+
+ change = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ change = stasis_message_data(consumer->messages_rxed[1]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(publish)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(publish_pool)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test publishing with a threadpool";
+ info->description = "Test publishing to a subscriber whose\n"
+ "subscription dictates messages are received through a\n"
+ "threadpool.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage", NULL);
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_wait_for(consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(subscription_interleaving)
+{
+ RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
+
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test sending interleaved events to a parent topic with different subscribers";
+ info->description = "Test sending events to a parent topic.\n"
+ "This test creates three topics (one parent, two children)\n"
+ "and publishes messages alternately between the children.\n"
+ "It verifies that the messages are received in the expected\n"
+ "order, for different subscription types: one with a dedicated\n"
+ "thread, the other on the Stasis threadpool.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ test_message_type = stasis_message_type_create("test", NULL);
+ ast_test_validate(test, NULL != test_message_type);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+
+ test_message1 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ parent_topic = stasis_topic_create("ParentTestTopic");
+ ast_test_validate(test, NULL != parent_topic);
+ topic1 = stasis_topic_create("Topic1");
+ ast_test_validate(test, NULL != topic1);
+ topic2 = stasis_topic_create("Topic2");
+ ast_test_validate(test, NULL != topic2);
+
+ forward_sub1 = stasis_forward_all(topic1, parent_topic);
+ ast_test_validate(test, NULL != forward_sub1);
+ forward_sub2 = stasis_forward_all(topic2, parent_topic);
+ ast_test_validate(test, NULL != forward_sub2);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+
+ sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
+ ast_test_validate(test, NULL != sub1);
+ ao2_ref(consumer1, +1);
+
+ sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
+ ast_test_validate(test, NULL != sub2);
+ ao2_ref(consumer2, +1);
+
+ stasis_publish(topic1, test_message1);
+ stasis_publish(topic2, test_message2);
+ stasis_publish(topic1, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ actual_len = consumer_wait_for(consumer2, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
+
+ ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
+
+ return AST_TEST_PASS;
+}
+
struct cache_test_data {
char *id;
char *value;
@@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(router_pool)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+ int actual_len, ret;
+ struct stasis_message *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test message routing via threadpool";
+ info->description = "Test simple message routing when\n"
+ "the subscriptions dictate usage of the Stasis\n"
+ "threadpool.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+ consumer3 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer3);
+
+ test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
+ ast_test_validate(test, NULL != test_message_type1);
+ test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
+ ast_test_validate(test, NULL != test_message_type2);
+ test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
+ ast_test_validate(test, NULL != test_message_type3);
+
+ uut = stasis_message_router_create_pool(topic);
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add(
+ uut, test_message_type1, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer1, +1);
+ ret = stasis_message_router_add(
+ uut, test_message_type2, consumer_exec, consumer2);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer2, +1);
+ ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer3, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message1 = stasis_message_create(test_message_type1, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type2, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type3, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual = consumer1->messages_rxed[0];
+ ast_test_validate(test, test_message1 == actual);
+
+ actual = consumer2->messages_rxed[0];
+ ast_test_validate(test, test_message2 == actual);
+
+ actual = consumer3->messages_rxed[0];
+ ast_test_validate(test, test_message3 == actual);
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
+
+ return AST_TEST_PASS;
+}
+
static const char *cache_simple(struct stasis_message *message)
{
const char *type_name =
@@ -1748,8 +2050,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
+ AST_TEST_UNREGISTER(subscription_pool_messages);
AST_TEST_UNREGISTER(publish);
AST_TEST_UNREGISTER(publish_sync);
+ AST_TEST_UNREGISTER(publish_pool);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_filter);
@@ -1757,8 +2061,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(cache_eid_aggregate);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(router_pool);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
+ AST_TEST_UNREGISTER(subscription_interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
AST_TEST_UNREGISTER(no_to_ami);
@@ -1773,8 +2079,10 @@ static int load_module(void)
AST_TEST_REGISTER(message_type);
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
+ AST_TEST_REGISTER(subscription_pool_messages);
AST_TEST_REGISTER(publish);
AST_TEST_REGISTER(publish_sync);
+ AST_TEST_REGISTER(publish_pool);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_filter);
@@ -1782,8 +2090,10 @@ static int load_module(void)
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(cache_eid_aggregate);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(router_pool);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
+ AST_TEST_REGISTER(subscription_interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);
AST_TEST_REGISTER(no_to_ami);