summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
committerDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
commitb23e8e19507af448986554d4b360ca3ceefc4c18 (patch)
tree365f657de1e64e80c2921f69d70c24414e3948e9
parent2d45dbc79bf3f65a1b74edfa27f4602a4ef68dd7 (diff)
stasis: Fixed message ordering issues when forwarding
This patch fixes an issue of message ordering that occurs when multiple topics are forwarded to an aggregator topic (such as ast_channel_topic_all()). It is (very reasonably) expected that the rules governing message dispatch order still apply, so long as the messages start from the same thread, and are received by the same subscription. Because the existing code had an additional layer of dispatching via the Stasis thread pool for forwards, those promises couldn't be kept. Forwarding subscriptions no longer have their own mailbox, and now dispatch directly from the forwarding topic's stasis_publish() call. This means that the topic's lock is held for the duration of not only a message's dispatch, but the dispatch of all the forwards. This shouldn't be a problem right now, but if an aggregator topic had many subscribers, it could become a problem. But I figure we can write more clever code when the time comes, if necessary. Review: https://reviewboard.asterisk.org/r/2419/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--main/stasis.c83
-rw-r--r--tests/test_stasis.c86
2 files changed, 144 insertions, 25 deletions
diff --git a/main/stasis.c b/main/stasis.c
index 2bd432a0c..3388a8042 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -61,7 +61,6 @@ struct stasis_topic {
};
/* Forward declarations for the tightly-coupled subscription object */
-struct stasis_subscription;
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
@@ -127,9 +126,30 @@ static void subscription_dtor(void *obj)
sub->mailbox = NULL;
}
+/*!
+ * \brief Invoke the subscription's callback.
+ * \param sub Subscription to invoke.
+ * \param topic Topic message was published to.
+ * \param message Message to send.
+ */
+static void subscription_invoke(struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ /* Since sub->topic doesn't change, no need to lock sub */
+ sub->callback(sub->data,
+ sub,
+ topic,
+ message);
+}
+
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+static struct stasis_subscription *__stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data,
+ int needs_mailbox)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -140,9 +160,11 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
- sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
- if (!sub->mailbox) {
- return NULL;
+ if (needs_mailbox) {
+ sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
+ }
}
ao2_ref(topic, +1);
@@ -159,6 +181,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
return sub;
}
+struct stasis_subscription *stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return __stasis_subscribe(topic, callback, data, 1);
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
@@ -305,17 +335,8 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
static int dispatch_exec(void *data)
{
RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
- RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
-
- /* Since sub->topic doesn't change, no need to lock sub */
- ast_assert(dispatch->sub->topic != NULL);
- ao2_ref(dispatch->sub->topic, +1);
- sub_topic = dispatch->sub->topic;
- dispatch->sub->callback(dispatch->sub->data,
- dispatch->sub,
- sub_topic,
- dispatch->message);
+ subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
return 0;
}
@@ -331,18 +352,28 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub
for (i = 0; i < topic->num_subscribers_current; ++i) {
struct stasis_subscription *sub = topic->subscribers[i];
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
ast_assert(sub != NULL);
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
+ if (sub->mailbox) {
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
+ }
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- dispatch = NULL; /* Ownership transferred to mailbox */
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ /* Ownership transferred to mailbox.
+ * Don't increment ref, b/c the task processor
+ * may have already gotten rid of the object.
+ */
+ dispatch = NULL;
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, publisher_topic, message);
}
}
}
@@ -370,7 +401,11 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
return NULL;
}
- sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
+ /* Forwarding subscriptions should dispatch directly instead of having a
+ * mailbox. Otherwise, messages forwarded to the same topic from
+ * different topics may get reordered. Which is bad.
+ */
+ sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
if (sub) {
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref(to_topic, +1);
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 52d510f14..3a7d52c07 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -391,7 +391,6 @@ AST_TEST_DEFINE(unsubscribe_stops_messages)
return AST_TEST_PASS;
}
-
AST_TEST_DEFINE(forward)
{
RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
@@ -458,6 +457,89 @@ AST_TEST_DEFINE(forward)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(interleaving)
+{
+ RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
+
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test sending interleaved events to a parent topic";
+ info->description = "Test sending events to a parent topic.\n"
+ "This test creates three topics (one parent, two children)\n"
+ "and publishes messages alternately between the children.\n"
+ "It verifies that the messages are received in the expected\n"
+ "order.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ test_message_type = stasis_message_type_create("test");
+ ast_test_validate(test, NULL != test_message_type);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+
+ test_message1 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ parent_topic = stasis_topic_create("ParentTestTopic");
+ ast_test_validate(test, NULL != parent_topic);
+ topic1 = stasis_topic_create("Topic1");
+ ast_test_validate(test, NULL != topic1);
+ topic2 = stasis_topic_create("Topic2");
+ ast_test_validate(test, NULL != topic2);
+
+ forward_sub1 = stasis_forward_all(topic1, parent_topic);
+ ast_test_validate(test, NULL != forward_sub1);
+ forward_sub2 = stasis_forward_all(topic2, parent_topic);
+ ast_test_validate(test, NULL != forward_sub2);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != sub);
+ ao2_ref(consumer, +1);
+
+ stasis_publish(topic1, test_message1);
+ stasis_publish(topic2, test_message2);
+ stasis_publish(topic1, test_message3);
+
+ actual_len = consumer_wait_for(consumer, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
+
+ return AST_TEST_PASS;
+}
+
struct cache_test_data {
char *id;
char *value;
@@ -829,6 +911,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(interleaving);
return 0;
}
@@ -844,6 +927,7 @@ static int load_module(void)
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(interleaving);
return AST_MODULE_LOAD_SUCCESS;
}