From f8aaf585a39d496479eb8a4e55f2e327d02b37ca Mon Sep 17 00:00:00 2001 From: Matthew Jordan Date: Sun, 12 Jan 2014 22:07:01 +0000 Subject: stasis: Add methods to allow for synchronous publishing to subscriber This patch adds an API call to Stasis that allows a publisher to publish a stasis message that will not return until a specific subscriber handles the message. Since a subscriber can have their own forwarding topic which orders messages from many topics, this allows a publisher who knows of that subscriber to synchronize to that subscriber regardless of the forwarding relationships between topics. This is of particular use for dialplan applications that need to synchronize on a particular subscriber's handling of a message. (issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405313 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- tests/test_stasis.c | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) (limited to 'tests') diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 48d5a6c2a..0cfce2c3f 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st ast_cond_signal(&consumer->out); } +static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + SCOPED_MUTEX(lock, &consumer->lock); + + if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) { + + ++consumer->messages_rxed_len; + consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len); + ast_assert(consumer->messages_rxed != NULL); + consumer->messages_rxed[consumer->messages_rxed_len - 1] = message; + ao2_ref(message, +1); + } + + if (stasis_subscription_final_message(sub, message)) { + consumer->complete = 1; + consumer_needs_cleanup = consumer; + } +} + static int consumer_wait_for(struct consumer *consumer, size_t expected_len) { struct timeval start = ast_tvnow(); @@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish) case TEST_INIT: info->name = __func__; info->category = test_category; - info->summary = "Test simple subscriptions"; - info->description = "Test simple subscriptions"; + info->summary = "Test publishing"; + info->description = "Test publishing"; return AST_TEST_NOT_RUN; case TEST_EXECUTE: break; @@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_sync) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test synchronous publishing"; + info->description = "Test synchronous publishing"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec_sync, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish_sync(uut, test_message); + + actual_len = consumer->messages_rxed_len; + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1324,6 +1392,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); AST_TEST_UNREGISTER(publish); + AST_TEST_UNREGISTER(publish_sync); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1347,6 +1416,7 @@ static int load_module(void) AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); AST_TEST_REGISTER(publish); + AST_TEST_REGISTER(publish_sync); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); -- cgit v1.2.3