summaryrefslogtreecommitdiff
path: root/tests/test_stasis.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
committerDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
commitb23e8e19507af448986554d4b360ca3ceefc4c18 (patch)
tree365f657de1e64e80c2921f69d70c24414e3948e9 /tests/test_stasis.c
parent2d45dbc79bf3f65a1b74edfa27f4602a4ef68dd7 (diff)
stasis: Fixed message ordering issues when forwarding
This patch fixes an issue of message ordering that occurs when multiple topics are forwarded to an aggregator topic (such as ast_channel_topic_all()). It is (very reasonably) expected that the rules governing message dispatch order still apply, so long as the messages start from the same thread, and are received by the same subscription. Because the existing code had an additional layer of dispatching via the Stasis thread pool for forwards, those promises couldn't be kept. Forwarding subscriptions no longer have their own mailbox, and now dispatch directly from the forwarding topic's stasis_publish() call. This means that the topic's lock is held for the duration of not only a message's dispatch, but the dispatch of all the forwards. This shouldn't be a problem right now, but if an aggregator topic had many subscribers, it could become a problem. But I figure we can write more clever code when the time comes, if necessary. Review: https://reviewboard.asterisk.org/r/2419/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r--tests/test_stasis.c86
1 files changed, 85 insertions, 1 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 52d510f14..3a7d52c07 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -391,7 +391,6 @@ AST_TEST_DEFINE(unsubscribe_stops_messages)
return AST_TEST_PASS;
}
-
AST_TEST_DEFINE(forward)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
@@ -458,6 +457,89 @@ AST_TEST_DEFINE(forward)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(interleaving)
+{
+ RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test sending interleaved events to a parent topic";
+ info->description = "Test sending events to a parent topic.\n"
+ "This test creates three topics (one parent, two children)\n"
+ "and publishes messages alternately between the children.\n"
+ "It verifies that the messages are received in the expected\n"
+ "order.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ test_message_type = stasis_message_type_create("test");
+ ast_test_validate(test, NULL != test_message_type);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+
+ test_message1 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ parent_topic = stasis_topic_create("ParentTestTopic");
+ ast_test_validate(test, NULL != parent_topic);
+ topic1 = stasis_topic_create("Topic1");
+ ast_test_validate(test, NULL != topic1);
+ topic2 = stasis_topic_create("Topic2");
+ ast_test_validate(test, NULL != topic2);
+
+ forward_sub1 = stasis_forward_all(topic1, parent_topic);
+ ast_test_validate(test, NULL != forward_sub1);
+ forward_sub2 = stasis_forward_all(topic2, parent_topic);
+ ast_test_validate(test, NULL != forward_sub2);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != sub);
+ ao2_ref(consumer, +1);
+
+ stasis_publish(topic1, test_message1);
+ stasis_publish(topic2, test_message2);
+ stasis_publish(topic1, test_message3);
+
+ actual_len = consumer_wait_for(consumer, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
+
+ return AST_TEST_PASS;
+}
+
struct cache_test_data {
char *id;
char *value;
@@ -829,6 +911,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(interleaving);
return 0;
}
@@ -844,6 +927,7 @@ static int load_module(void)
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(interleaving);
return AST_MODULE_LOAD_SUCCESS;
}