summaryrefslogtreecommitdiff
path: root/main/stasis.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-01-12 22:07:01 +0000
committerMatthew Jordan <mjordan@digium.com>2014-01-12 22:07:01 +0000
commitf8aaf585a39d496479eb8a4e55f2e327d02b37ca (patch)
treecd9926c9fd134466c6ec6adee3ec7392e4aac754 /main/stasis.c
parent60ed8159a10b6eed56095dd52cf17c039cd2275c (diff)
stasis: Add methods to allow for synchronous publishing to subscriber
This patch adds an API call to Stasis that allows a publisher to publish a stasis message that will not return until a specific subscriber handles the message. Since a subscriber can have their own forwarding topic which orders messages from many topics, this allows a publisher who knows of that subscriber to synchronize to that subscriber regardless of the forwarding relationships between topics. This is of particular use for dialplan applications that need to synchronize on a particular subscriber's handling of a message. (issue ASTERISK-22884) Reported by: Matt Jordan Review: https://reviewboard.asterisk.org/r/3099/ ........ Merged revisions 405311 from http://svn.asterisk.org/svn/asterisk/branches/12 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@405313 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis.c')
-rw-r--r--main/stasis.c120
1 files changed, 107 insertions, 13 deletions
diff --git a/main/stasis.c b/main/stasis.c
index 587a6a85e..0a5db2f16 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -505,11 +505,11 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
}
/*!
- * \brief Dispatch a message to a subscriber
- * \param data \ref dispatch object
+ * \internal \brief Dispatch a message to a subscriber asynchronously
+ * \param local \ref ast_taskprocessor_local object
* \return 0
*/
-static int dispatch_exec(struct ast_taskprocessor_local *local)
+static int dispatch_exec_async(struct ast_taskprocessor_local *local)
{
struct stasis_subscription *sub = local->local_data;
struct stasis_message *message = local->data;
@@ -520,23 +520,105 @@ static int dispatch_exec(struct ast_taskprocessor_local *local)
return 0;
}
+/*!
+ * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
+ * a published message to a subscriber
+ */
+struct sync_task_data {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ int complete;
+ void *task_data;
+};
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber synchronously
+ * \param local \ref ast_taskprocessor_local object
+ * \return 0
+ */
+static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
+{
+ struct stasis_subscription *sub = local->local_data;
+ struct sync_task_data *std = local->data;
+ struct stasis_message *message = std->task_data;
+
+ subscription_invoke(sub, message);
+ ao2_cleanup(message);
+
+ ast_mutex_lock(&std->lock);
+ std->complete = 1;
+ ast_cond_signal(&std->cond);
+ ast_mutex_unlock(&std->lock);
+
+ return 0;
+}
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber
+ * \param sub The subscriber to dispatch to
+ * \param message The message to send
+ * \param synchronous If non-zero, synchronize on the subscriber receiving
+ * the message
+ */
static void dispatch_message(struct stasis_subscription *sub,
- struct stasis_message *message)
+ struct stasis_message *message,
+ int synchronous)
{
- if (sub->mailbox) {
- ao2_bump(message);
- if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+ if (!sub->mailbox) {
+ /* Dispatch directly */
+ subscription_invoke(sub, message);
+ return;
+ }
+
+ /* Bump the message for the taskprocessor push. This will get de-ref'd
+ * by the task processor callback.
+ */
+ ao2_bump(message);
+ if (!synchronous) {
+ if (ast_taskprocessor_push_local(sub->mailbox,
+ dispatch_exec_async,
+ message) != 0) {
/* Push failed; ugh. */
- ast_log(LOG_ERROR, "Dropping dispatch\n");
+ ast_log(LOG_ERROR, "Dropping async dispatch\n");
ao2_cleanup(message);
}
} else {
- /* Dispatch directly */
- subscription_invoke(sub, message);
+ struct sync_task_data std;
+
+ ast_mutex_init(&std.lock);
+ ast_cond_init(&std.cond, NULL);
+ std.complete = 0;
+ std.task_data = message;
+
+ if (ast_taskprocessor_push_local(sub->mailbox,
+ dispatch_exec_sync,
+ &std)) {
+ /* Push failed; ugh. */
+ ast_log(LOG_ERROR, "Dropping sync dispatch\n");
+ ao2_cleanup(message);
+ return;
+ }
+
+ ast_mutex_lock(&std.lock);
+ while (!std.complete) {
+ ast_cond_wait(&std.cond, &std.lock);
+ }
+ ast_mutex_unlock(&std.lock);
+
+ ast_mutex_destroy(&std.lock);
+ ast_cond_destroy(&std.cond);
}
}
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \internal \brief Publish a message to a topic's subscribers
+ * \brief topic The topic to publish to
+ * \brief message The message to publish
+ * \brief sync_sub An optional subscriber of the topic to publish synchronously
+ * to
+ */
+static void publish_msg(struct stasis_topic *topic,
+ struct stasis_message *message, struct stasis_subscription *sync_sub)
{
size_t i;
@@ -554,12 +636,24 @@ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
ast_assert(sub != NULL);
- dispatch_message(sub, message);
+ dispatch_message(sub, message, (sub == sync_sub));
}
ao2_unlock(topic);
ao2_ref(topic, -1);
}
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ publish_msg(topic, message, NULL);
+}
+
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
+{
+ ast_assert(sub != NULL);
+
+ publish_msg(sub->topic, message, sub);
+}
+
/*!
* \brief Forwarding information
*
@@ -721,7 +815,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
stasis_publish(topic, msg);
/* Now we have to dispatch to the subscription itself */
- dispatch_message(sub, msg);
+ dispatch_message(sub, msg, 0);
}
struct topic_pool_entry {