diff options
author | David M. Lee <dlee@digium.com> | 2013-04-01 13:37:51 +0000 |
---|---|---|
committer | David M. Lee <dlee@digium.com> | 2013-04-01 13:37:51 +0000 |
commit | b23e8e19507af448986554d4b360ca3ceefc4c18 (patch) | |
tree | 365f657de1e64e80c2921f69d70c24414e3948e9 /tests/test_stasis.c | |
parent | 2d45dbc79bf3f65a1b74edfa27f4602a4ef68dd7 (diff) |
stasis: Fixed message ordering issues when forwarding
This patch fixes an issue of message ordering that occurs when
multiple topics are forwarded to an aggregator topic (such as
ast_channel_topic_all()).
It is (very reasonably) expected that the rules governing message
dispatch order still apply, so long as the messages start from the
same thread, and are received by the same subscription. Because the
existing code had an additional layer of dispatching via the Stasis
thread pool for forwards, those promises couldn't be kept.
Forwarding subscriptions no longer have their own mailbox, and now
dispatch directly from the forwarding topic's stasis_publish()
call. This means that the topic's lock is held for the duration of not
only a message's dispatch, but the dispatch of all the forwards. This
shouldn't be a problem right now, but if an aggregator topic had many
subscribers, it could become a problem. But I figure we can write more
clever code when the time comes, if necessary.
Review: https://reviewboard.asterisk.org/r/2419/
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r-- | tests/test_stasis.c | 86 |
1 files changed, 85 insertions, 1 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 52d510f14..3a7d52c07 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -391,7 +391,6 @@ AST_TEST_DEFINE(unsubscribe_stops_messages) return AST_TEST_PASS; } - AST_TEST_DEFINE(forward) { RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); @@ -458,6 +457,89 @@ AST_TEST_DEFINE(forward) return AST_TEST_PASS; } +AST_TEST_DEFINE(interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test"); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + sub = stasis_subscribe(parent_topic, consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -829,6 +911,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(interleaving); return 0; } @@ -844,6 +927,7 @@ static int load_module(void) AST_TEST_REGISTER(cache); AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(interleaving); return AST_MODULE_LOAD_SUCCESS; } |