summaryrefslogtreecommitdiff
path: root/main/stasis.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
committerDavid M. Lee <dlee@digium.com>2013-04-01 13:37:51 +0000
commitb23e8e19507af448986554d4b360ca3ceefc4c18 (patch)
tree365f657de1e64e80c2921f69d70c24414e3948e9 /main/stasis.c
parent2d45dbc79bf3f65a1b74edfa27f4602a4ef68dd7 (diff)
stasis: Fixed message ordering issues when forwarding
This patch fixes an issue of message ordering that occurs when multiple topics are forwarded to an aggregator topic (such as ast_channel_topic_all()). It is (very reasonably) expected that the rules governing message dispatch order still apply, so long as the messages start from the same thread, and are received by the same subscription. Because the existing code had an additional layer of dispatching via the Stasis thread pool for forwards, those promises couldn't be kept. Forwarding subscriptions no longer have their own mailbox, and now dispatch directly from the forwarding topic's stasis_publish() call. This means that the topic's lock is held for the duration of not only a message's dispatch, but the dispatch of all the forwards. This shouldn't be a problem right now, but if an aggregator topic had many subscribers, it could become a problem. But I figure we can write more clever code when the time comes, if necessary. Review: https://reviewboard.asterisk.org/r/2419/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis.c')
-rw-r--r--main/stasis.c83
1 files changed, 59 insertions, 24 deletions
diff --git a/main/stasis.c b/main/stasis.c
index 2bd432a0c..3388a8042 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -61,7 +61,6 @@ struct stasis_topic {
};
/* Forward declarations for the tightly-coupled subscription object */
-struct stasis_subscription;
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
@@ -127,9 +126,30 @@ static void subscription_dtor(void *obj)
sub->mailbox = NULL;
}
+/*!
+ * \brief Invoke the subscription's callback.
+ * \param sub Subscription to invoke.
+ * \param topic Topic message was published to.
+ * \param message Message to send.
+ */
+static void subscription_invoke(struct stasis_subscription *sub,
+ struct stasis_topic *topic,
+ struct stasis_message *message)
+{
+ /* Since sub->topic doesn't change, no need to lock sub */
+ sub->callback(sub->data,
+ sub,
+ topic,
+ message);
+}
+
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
-struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
+static struct stasis_subscription *__stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data,
+ int needs_mailbox)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -140,9 +160,11 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
- sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
- if (!sub->mailbox) {
- return NULL;
+ if (needs_mailbox) {
+ sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ if (!sub->mailbox) {
+ return NULL;
+ }
}
ao2_ref(topic, +1);
@@ -159,6 +181,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
return sub;
}
+struct stasis_subscription *stasis_subscribe(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return __stasis_subscribe(topic, callback, data, 1);
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
@@ -305,17 +335,8 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
static int dispatch_exec(void *data)
{
RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
- RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
-
- /* Since sub->topic doesn't change, no need to lock sub */
- ast_assert(dispatch->sub->topic != NULL);
- ao2_ref(dispatch->sub->topic, +1);
- sub_topic = dispatch->sub->topic;
- dispatch->sub->callback(dispatch->sub->data,
- dispatch->sub,
- sub_topic,
- dispatch->message);
+ subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
return 0;
}
@@ -331,18 +352,28 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub
for (i = 0; i < topic->num_subscribers_current; ++i) {
struct stasis_subscription *sub = topic->subscribers[i];
- RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
ast_assert(sub != NULL);
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- break;
- }
+ if (sub->mailbox) {
+ RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
+
+ dispatch = dispatch_create(publisher_topic, message, sub);
+ if (!dispatch) {
+ ast_log(LOG_DEBUG, "Dropping dispatch\n");
+ break;
+ }
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
- dispatch = NULL; /* Ownership transferred to mailbox */
+ if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
+ /* Ownership transferred to mailbox.
+ * Don't increment ref, b/c the task processor
+ * may have already gotten rid of the object.
+ */
+ dispatch = NULL;
+ }
+ } else {
+ /* Dispatch directly */
+ subscription_invoke(sub, publisher_topic, message);
}
}
}
@@ -370,7 +401,11 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
return NULL;
}
- sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
+ /* Forwarding subscriptions should dispatch directly instead of having a
+ * mailbox. Otherwise, messages forwarded to the same topic from
+ * different topics may get reordered. Which is bad.
+ */
+ sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
if (sub) {
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref(to_topic, +1);