diff options
author | Matthew Jordan <mjordan@digium.com> | 2014-01-12 22:07:01 +0000 |
---|---|---|
committer | Matthew Jordan <mjordan@digium.com> | 2014-01-12 22:07:01 +0000 |
commit | f8aaf585a39d496479eb8a4e55f2e327d02b37ca (patch) | |
tree | cd9926c9fd134466c6ec6adee3ec7392e4aac754 /tests | |
parent | 60ed8159a10b6eed56095dd52cf17c039cd2275c (diff) |
stasis: Add methods to allow for synchronous publishing to subscriber
This patch adds an API call to Stasis that allows a publisher to publish a
stasis message that will not return until a specific subscriber handles the
message. Since a subscriber can have their own forwarding topic which orders
messages from many topics, this allows a publisher who knows of that subscriber
to synchronize to that subscriber regardless of the forwarding relationships
between topics.
This is of particular use for dialplan applications that need to synchronize
on a particular subscriber's handling of a message.
(issue ASTERISK-22884)
Reported by: Matt Jordan
Review: https://reviewboard.asterisk.org/r/3099/
........
Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405313 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_stasis.c | 74 |
1 files changed, 72 insertions, 2 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 48d5a6c2a..0cfce2c3f 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st ast_cond_signal(&consumer->out); } +static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + SCOPED_MUTEX(lock, &consumer->lock); + + if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) { + + ++consumer->messages_rxed_len; + consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len); + ast_assert(consumer->messages_rxed != NULL); + consumer->messages_rxed[consumer->messages_rxed_len - 1] = message; + ao2_ref(message, +1); + } + + if (stasis_subscription_final_message(sub, message)) { + consumer->complete = 1; + consumer_needs_cleanup = consumer; + } +} + static int consumer_wait_for(struct consumer *consumer, size_t expected_len) { struct timeval start = ast_tvnow(); @@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish) case TEST_INIT: info->name = __func__; info->category = test_category; - info->summary = "Test simple subscriptions"; - info->description = "Test simple subscriptions"; + info->summary = "Test publishing"; + info->description = "Test publishing"; return AST_TEST_NOT_RUN; case TEST_EXECUTE: break; @@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_sync) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test synchronous publishing"; + info->description = "Test synchronous publishing"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec_sync, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish_sync(uut, test_message); + + actual_len = consumer->messages_rxed_len; + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1324,6 +1392,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); AST_TEST_UNREGISTER(publish); + AST_TEST_UNREGISTER(publish_sync); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1347,6 +1416,7 @@ static int load_module(void) AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); AST_TEST_REGISTER(publish); + AST_TEST_REGISTER(publish_sync); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); |