summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/stasis.h33
-rw-r--r--include/asterisk/stasis_message_router.h18
-rw-r--r--main/stasis.c120
-rw-r--r--main/stasis_message_router.c10
-rw-r--r--tests/test_stasis.c74
5 files changed, 234 insertions, 21 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 6bc5171e0..1a420da2f 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -187,6 +187,12 @@ struct stasis_message_type;
struct stasis_message;
/*!
+ * \brief Opaque type for a Stasis subscription.
+ * \since 12
+ */
+struct stasis_subscription;
+
+/*!
* \brief Structure containing callbacks for Stasis message sanitization
*
* \note If either callback is implemented, both should be implemented since
@@ -377,21 +383,36 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
* \brief Publish a message to a topic's subscribers.
* \param topic Topic.
* \param message Message to publish.
+ *
+ * This call is asynchronous and will return immediately upon queueing
+ * the message for delivery to the topic's subscribers.
+ *
* \since 12
*/
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
+/*!
+ * \brief Publish a message to a topic's subscribers, synchronizing
+ * on the specified subscriber
+ * \param sub Subscription to synchronize on.
+ * \param message Message to publish.
+ *
+ * The caller of stasis_publish_sync will block until the specified
+ * subscriber completes handling of the message.
+ *
+ * All other subscribers to the topic the \ref stasis_subpscription
+ * is subscribed to are also delivered the message; this delivery however
+ * happens asynchronously.
+ *
+ * \since 12.1.0
+ */
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
+
/*! @} */
/*! @{ */
/*!
- * \brief Opaque type for a Stasis subscription.
- * \since 12
- */
-struct stasis_subscription;
-
-/*!
* \brief Callback function type for Stasis subscriptions.
* \param data Data field provided with subscription.
* \param message Published message.
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 3209adb16..613a2bd7f 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -93,6 +93,24 @@ void stasis_message_router_unsubscribe_and_join(
int stasis_message_router_is_done(struct stasis_message_router *router);
/*!
+ * \brief Publish a message to a message router's subscription synchronously
+ *
+ * \param router Router
+ * \param message The \ref stasis message
+ *
+ * This should be used when a message needs to be published synchronously to
+ * the underlying subscription created by a message router. This is analagous
+ * to \ref stasis_publish_sync.
+ *
+ * Note that the caller will be blocked until the thread servicing the message
+ * on the message router's subscription completes handling of the message.
+ *
+ * \since 12.1.0
+ */
+void stasis_message_router_publish_sync(struct stasis_message_router *router,
+ struct stasis_message *message);
+
+/*!
* \brief Add a route to a message router.
*
* A particular \a message_type may have at most one route per \a router. If
diff --git a/main/stasis.c b/main/stasis.c
index 587a6a85e..0a5db2f16 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -505,11 +505,11 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
}
/*!
- * \brief Dispatch a message to a subscriber
- * \param data \ref dispatch object
+ * \internal \brief Dispatch a message to a subscriber asynchronously
+ * \param local \ref ast_taskprocessor_local object
* \return 0
*/
-static int dispatch_exec(struct ast_taskprocessor_local *local)
+static int dispatch_exec_async(struct ast_taskprocessor_local *local)
{
struct stasis_subscription *sub = local->local_data;
struct stasis_message *message = local->data;
@@ -520,23 +520,105 @@ static int dispatch_exec(struct ast_taskprocessor_local *local)
return 0;
}
+/*!
+ * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize
+ * a published message to a subscriber
+ */
+struct sync_task_data {
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ int complete;
+ void *task_data;
+};
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber synchronously
+ * \param local \ref ast_taskprocessor_local object
+ * \return 0
+ */
+static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
+{
+ struct stasis_subscription *sub = local->local_data;
+ struct sync_task_data *std = local->data;
+ struct stasis_message *message = std->task_data;
+
+ subscription_invoke(sub, message);
+ ao2_cleanup(message);
+
+ ast_mutex_lock(&std->lock);
+ std->complete = 1;
+ ast_cond_signal(&std->cond);
+ ast_mutex_unlock(&std->lock);
+
+ return 0;
+}
+
+/*!
+ * \internal \brief Dispatch a message to a subscriber
+ * \param sub The subscriber to dispatch to
+ * \param message The message to send
+ * \param synchronous If non-zero, synchronize on the subscriber receiving
+ * the message
+ */
static void dispatch_message(struct stasis_subscription *sub,
- struct stasis_message *message)
+ struct stasis_message *message,
+ int synchronous)
{
- if (sub->mailbox) {
- ao2_bump(message);
- if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+ if (!sub->mailbox) {
+ /* Dispatch directly */
+ subscription_invoke(sub, message);
+ return;
+ }
+
+ /* Bump the message for the taskprocessor push. This will get de-ref'd
+ * by the task processor callback.
+ */
+ ao2_bump(message);
+ if (!synchronous) {
+ if (ast_taskprocessor_push_local(sub->mailbox,
+ dispatch_exec_async,
+ message) != 0) {
/* Push failed; ugh. */
- ast_log(LOG_ERROR, "Dropping dispatch\n");
+ ast_log(LOG_ERROR, "Dropping async dispatch\n");
ao2_cleanup(message);
}
} else {
- /* Dispatch directly */
- subscription_invoke(sub, message);
+ struct sync_task_data std;
+
+ ast_mutex_init(&std.lock);
+ ast_cond_init(&std.cond, NULL);
+ std.complete = 0;
+ std.task_data = message;
+
+ if (ast_taskprocessor_push_local(sub->mailbox,
+ dispatch_exec_sync,
+ &std)) {
+ /* Push failed; ugh. */
+ ast_log(LOG_ERROR, "Dropping sync dispatch\n");
+ ao2_cleanup(message);
+ return;
+ }
+
+ ast_mutex_lock(&std.lock);
+ while (!std.complete) {
+ ast_cond_wait(&std.cond, &std.lock);
+ }
+ ast_mutex_unlock(&std.lock);
+
+ ast_mutex_destroy(&std.lock);
+ ast_cond_destroy(&std.cond);
}
}
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+/*!
+ * \internal \brief Publish a message to a topic's subscribers
+ * \brief topic The topic to publish to
+ * \brief message The message to publish
+ * \brief sync_sub An optional subscriber of the topic to publish synchronously
+ * to
+ */
+static void publish_msg(struct stasis_topic *topic,
+ struct stasis_message *message, struct stasis_subscription *sync_sub)
{
size_t i;
@@ -554,12 +636,24 @@ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
ast_assert(sub != NULL);
- dispatch_message(sub, message);
+ dispatch_message(sub, message, (sub == sync_sub));
}
ao2_unlock(topic);
ao2_ref(topic, -1);
}
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
+{
+ publish_msg(topic, message, NULL);
+}
+
+void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message)
+{
+ ast_assert(sub != NULL);
+
+ publish_msg(sub->topic, message, sub);
+}
+
/*!
* \brief Forwarding information
*
@@ -721,7 +815,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
stasis_publish(topic, msg);
/* Now we have to dispatch to the subscription itself */
- dispatch_message(sub, msg);
+ dispatch_message(sub, msg, 0);
}
struct topic_pool_entry {
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index ec0448bef..41495db11 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -261,6 +261,16 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
return stasis_subscription_is_done(router->subscription);
}
+void stasis_message_router_publish_sync(struct stasis_message_router *router,
+ struct stasis_message *message)
+{
+ ast_assert(router != NULL);
+
+ ao2_bump(router);
+ stasis_publish_sync(router->subscription, message);
+ ao2_cleanup(router);
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index 48d5a6c2a..0cfce2c3f 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
ast_cond_signal(&consumer->out);
}
+static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct consumer *consumer = data;
+ RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
+ SCOPED_MUTEX(lock, &consumer->lock);
+
+ if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
+
+ ++consumer->messages_rxed_len;
+ consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
+ ast_assert(consumer->messages_rxed != NULL);
+ consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
+ ao2_ref(message, +1);
+ }
+
+ if (stasis_subscription_final_message(sub, message)) {
+ consumer->complete = 1;
+ consumer_needs_cleanup = consumer;
+ }
+}
+
static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
{
struct timeval start = ast_tvnow();
@@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish)
case TEST_INIT:
info->name = __func__;
info->category = test_category;
- info->summary = "Test simple subscriptions";
- info->description = "Test simple subscriptions";
+ info->summary = "Test publishing";
+ info->description = "Test publishing";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
@@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(publish_sync)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test synchronous publishing";
+ info->description = "Test synchronous publishing";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage", NULL);
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish_sync(uut, test_message);
+
+ actual_len = consumer->messages_rxed_len;
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -1324,6 +1392,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
AST_TEST_UNREGISTER(publish);
+ AST_TEST_UNREGISTER(publish_sync);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_filter);
@@ -1347,6 +1416,7 @@ static int load_module(void)
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
AST_TEST_REGISTER(publish);
+ AST_TEST_REGISTER(publish_sync);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_filter);