diff options
-rw-r--r-- | include/asterisk/stasis.h | 33 | ||||
-rw-r--r-- | include/asterisk/stasis_message_router.h | 18 | ||||
-rw-r--r-- | main/stasis.c | 120 | ||||
-rw-r--r-- | main/stasis_message_router.c | 10 | ||||
-rw-r--r-- | tests/test_stasis.c | 74 |
5 files changed, 234 insertions, 21 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 6bc5171e0..1a420da2f 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -187,6 +187,12 @@ struct stasis_message_type; struct stasis_message; /*! + * \brief Opaque type for a Stasis subscription. + * \since 12 + */ +struct stasis_subscription; + +/*! * \brief Structure containing callbacks for Stasis message sanitization * * \note If either callback is implemented, both should be implemented since @@ -377,21 +383,36 @@ const char *stasis_topic_name(const struct stasis_topic *topic); * \brief Publish a message to a topic's subscribers. * \param topic Topic. * \param message Message to publish. + * + * This call is asynchronous and will return immediately upon queueing + * the message for delivery to the topic's subscribers. + * * \since 12 */ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); +/*! + * \brief Publish a message to a topic's subscribers, synchronizing + * on the specified subscriber + * \param sub Subscription to synchronize on. + * \param message Message to publish. + * + * The caller of stasis_publish_sync will block until the specified + * subscriber completes handling of the message. + * + * All other subscribers to the topic the \ref stasis_subpscription + * is subscribed to are also delivered the message; this delivery however + * happens asynchronously. + * + * \since 12.1.0 + */ +void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message); + /*! @} */ /*! @{ */ /*! - * \brief Opaque type for a Stasis subscription. - * \since 12 - */ -struct stasis_subscription; - -/*! * \brief Callback function type for Stasis subscriptions. * \param data Data field provided with subscription. * \param message Published message. diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 3209adb16..613a2bd7f 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -93,6 +93,24 @@ void stasis_message_router_unsubscribe_and_join( int stasis_message_router_is_done(struct stasis_message_router *router); /*! + * \brief Publish a message to a message router's subscription synchronously + * + * \param router Router + * \param message The \ref stasis message + * + * This should be used when a message needs to be published synchronously to + * the underlying subscription created by a message router. This is analagous + * to \ref stasis_publish_sync. + * + * Note that the caller will be blocked until the thread servicing the message + * on the message router's subscription completes handling of the message. + * + * \since 12.1.0 + */ +void stasis_message_router_publish_sync(struct stasis_message_router *router, + struct stasis_message *message); + +/*! * \brief Add a route to a message router. * * A particular \a message_type may have at most one route per \a router. If diff --git a/main/stasis.c b/main/stasis.c index 587a6a85e..0a5db2f16 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -505,11 +505,11 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s } /*! - * \brief Dispatch a message to a subscriber - * \param data \ref dispatch object + * \internal \brief Dispatch a message to a subscriber asynchronously + * \param local \ref ast_taskprocessor_local object * \return 0 */ -static int dispatch_exec(struct ast_taskprocessor_local *local) +static int dispatch_exec_async(struct ast_taskprocessor_local *local) { struct stasis_subscription *sub = local->local_data; struct stasis_message *message = local->data; @@ -520,23 +520,105 @@ static int dispatch_exec(struct ast_taskprocessor_local *local) return 0; } +/*! + * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize + * a published message to a subscriber + */ +struct sync_task_data { + ast_mutex_t lock; + ast_cond_t cond; + int complete; + void *task_data; +}; + +/*! + * \internal \brief Dispatch a message to a subscriber synchronously + * \param local \ref ast_taskprocessor_local object + * \return 0 + */ +static int dispatch_exec_sync(struct ast_taskprocessor_local *local) +{ + struct stasis_subscription *sub = local->local_data; + struct sync_task_data *std = local->data; + struct stasis_message *message = std->task_data; + + subscription_invoke(sub, message); + ao2_cleanup(message); + + ast_mutex_lock(&std->lock); + std->complete = 1; + ast_cond_signal(&std->cond); + ast_mutex_unlock(&std->lock); + + return 0; +} + +/*! + * \internal \brief Dispatch a message to a subscriber + * \param sub The subscriber to dispatch to + * \param message The message to send + * \param synchronous If non-zero, synchronize on the subscriber receiving + * the message + */ static void dispatch_message(struct stasis_subscription *sub, - struct stasis_message *message) + struct stasis_message *message, + int synchronous) { - if (sub->mailbox) { - ao2_bump(message); - if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { + if (!sub->mailbox) { + /* Dispatch directly */ + subscription_invoke(sub, message); + return; + } + + /* Bump the message for the taskprocessor push. This will get de-ref'd + * by the task processor callback. + */ + ao2_bump(message); + if (!synchronous) { + if (ast_taskprocessor_push_local(sub->mailbox, + dispatch_exec_async, + message) != 0) { /* Push failed; ugh. */ - ast_log(LOG_ERROR, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping async dispatch\n"); ao2_cleanup(message); } } else { - /* Dispatch directly */ - subscription_invoke(sub, message); + struct sync_task_data std; + + ast_mutex_init(&std.lock); + ast_cond_init(&std.cond, NULL); + std.complete = 0; + std.task_data = message; + + if (ast_taskprocessor_push_local(sub->mailbox, + dispatch_exec_sync, + &std)) { + /* Push failed; ugh. */ + ast_log(LOG_ERROR, "Dropping sync dispatch\n"); + ao2_cleanup(message); + return; + } + + ast_mutex_lock(&std.lock); + while (!std.complete) { + ast_cond_wait(&std.cond, &std.lock); + } + ast_mutex_unlock(&std.lock); + + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); } } -void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +/*! + * \internal \brief Publish a message to a topic's subscribers + * \brief topic The topic to publish to + * \brief message The message to publish + * \brief sync_sub An optional subscriber of the topic to publish synchronously + * to + */ +static void publish_msg(struct stasis_topic *topic, + struct stasis_message *message, struct stasis_subscription *sync_sub) { size_t i; @@ -554,12 +636,24 @@ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) ast_assert(sub != NULL); - dispatch_message(sub, message); + dispatch_message(sub, message, (sub == sync_sub)); } ao2_unlock(topic); ao2_ref(topic, -1); } +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +{ + publish_msg(topic, message, NULL); +} + +void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message) +{ + ast_assert(sub != NULL); + + publish_msg(sub->topic, message, sub); +} + /*! * \brief Forwarding information * @@ -721,7 +815,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic, stasis_publish(topic, msg); /* Now we have to dispatch to the subscription itself */ - dispatch_message(sub, msg); + dispatch_message(sub, msg, 0); } struct topic_pool_entry { diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index ec0448bef..41495db11 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -261,6 +261,16 @@ int stasis_message_router_is_done(struct stasis_message_router *router) return stasis_subscription_is_done(router->subscription); } +void stasis_message_router_publish_sync(struct stasis_message_router *router, + struct stasis_message *message) +{ + ast_assert(router != NULL); + + ao2_bump(router); + stasis_publish_sync(router->subscription, message); + ao2_cleanup(router); +} + int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 48d5a6c2a..0cfce2c3f 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st ast_cond_signal(&consumer->out); } +static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + SCOPED_MUTEX(lock, &consumer->lock); + + if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) { + + ++consumer->messages_rxed_len; + consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len); + ast_assert(consumer->messages_rxed != NULL); + consumer->messages_rxed[consumer->messages_rxed_len - 1] = message; + ao2_ref(message, +1); + } + + if (stasis_subscription_final_message(sub, message)) { + consumer->complete = 1; + consumer_needs_cleanup = consumer; + } +} + static int consumer_wait_for(struct consumer *consumer, size_t expected_len) { struct timeval start = ast_tvnow(); @@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish) case TEST_INIT: info->name = __func__; info->category = test_category; - info->summary = "Test simple subscriptions"; - info->description = "Test simple subscriptions"; + info->summary = "Test publishing"; + info->description = "Test publishing"; return AST_TEST_NOT_RUN; case TEST_EXECUTE: break; @@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_sync) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test synchronous publishing"; + info->description = "Test synchronous publishing"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec_sync, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish_sync(uut, test_message); + + actual_len = consumer->messages_rxed_len; + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1324,6 +1392,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); AST_TEST_UNREGISTER(publish); + AST_TEST_UNREGISTER(publish_sync); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1347,6 +1416,7 @@ static int load_module(void) AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); AST_TEST_REGISTER(publish); + AST_TEST_REGISTER(publish_sync); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); |