summaryrefslogtreecommitdiff
path: root/tests/test_stasis.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-01-12 22:07:01 +0000
committerMatthew Jordan <mjordan@digium.com>2014-01-12 22:07:01 +0000
commitf8aaf585a39d496479eb8a4e55f2e327d02b37ca (patch)
treecd9926c9fd134466c6ec6adee3ec7392e4aac754 /tests/test_stasis.c
parent60ed8159a10b6eed56095dd52cf17c039cd2275c (diff)
stasis: Add methods to allow for synchronous publishing to subscriber
This patch adds an API call to Stasis that allows a publisher to publish a stasis message that will not return until a specific subscriber handles the message. Since a subscriber can have their own forwarding topic which orders messages from many topics, this allows a publisher who knows of that subscriber to synchronize to that subscriber regardless of the forwarding relationships between topics. This is of particular use for dialplan applications that need to synchronize on a particular subscriber's handling of a message. (issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405313 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r--tests/test_stasis.c74
1 files changed, 72 insertions, 2 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 48d5a6c2a..0cfce2c3f 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
ast_cond_signal(&consumer->out);
}
+static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct consumer *consumer = data;
+ RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
+
+ ++consumer->messages_rxed_len;
+ consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+ ast_assert(consumer->messages_rxed != NULL);
+ consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+ ao2_ref(message, +1);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ consumer->complete = 1;
+ consumer_needs_cleanup = consumer;
+ }
+}
+
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
@@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish)
case TEST_INIT:
info->name = __func__;
info->category = test_category;
- info->summary = "Test simple subscriptions";
- info->description = "Test simple subscriptions";
+ info->summary = "Test publishing";
+ info->description = "Test publishing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
@@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(publish_sync)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test synchronous publishing";
+ info->description = "Test synchronous publishing";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage", NULL);
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish_sync(uut, test_message);
+
+ actual_len = consumer->messages_rxed_len;
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1324,6 +1392,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
AST_TEST_UNREGISTER(publish);
+ AST_TEST_UNREGISTER(publish_sync);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_filter);
@@ -1347,6 +1416,7 @@ static int load_module(void)
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
AST_TEST_REGISTER(publish);
+ AST_TEST_REGISTER(publish_sync);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_filter);