summaryrefslogtreecommitdiff
path: root/tests/test_stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_stasis.c')
-rw-r--r--tests/test_stasis.c74
1 files changed, 72 insertions, 2 deletions
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 48d5a6c2a..0cfce2c3f 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
ast_cond_signal(&consumer->out);
}
+static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct consumer *consumer = data;
+ RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
+
+ ++consumer->messages_rxed_len;
+ consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+ ast_assert(consumer->messages_rxed != NULL);
+ consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+ ao2_ref(message, +1);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ consumer->complete = 1;
+ consumer_needs_cleanup = consumer;
+ }
+}
+
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
@@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish)
case TEST_INIT:
info->name = __func__;
info->category = test_category;
- info->summary = "Test simple subscriptions";
- info->description = "Test simple subscriptions";
+ info->summary = "Test publishing";
+ info->description = "Test publishing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
@@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(publish_sync)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test synchronous publishing";
+ info->description = "Test synchronous publishing";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage", NULL);
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish_sync(uut, test_message);
+
+ actual_len = consumer->messages_rxed_len;
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1324,6 +1392,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
AST_TEST_UNREGISTER(publish);
+ AST_TEST_UNREGISTER(publish_sync);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_filter);
@@ -1347,6 +1416,7 @@ static int load_module(void)
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
AST_TEST_REGISTER(publish);
+ AST_TEST_REGISTER(publish_sync);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_filter);