summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES12
-rw-r--r--apps/app_queue.c4
-rw-r--r--channels/chan_dahdi.c2
-rw-r--r--channels/chan_iax2.c2
-rw-r--r--channels/chan_mgcp.c2
-rw-r--r--channels/chan_sip.c2
-rw-r--r--channels/chan_skinny.c2
-rw-r--r--channels/sig_pri.c2
-rw-r--r--configs/samples/stasis.conf.sample10
-rw-r--r--include/asterisk/stasis.h25
-rw-r--r--include/asterisk/stasis_internal.h7
-rw-r--r--include/asterisk/stasis_message_router.h16
-rw-r--r--main/endpoints.c2
-rw-r--r--main/stasis.c152
-rw-r--r--main/stasis_cache.c2
-rw-r--r--main/stasis_message_router.c22
-rw-r--r--res/parking/parking_applications.c2
-rw-r--r--res/parking/parking_bridge_features.c2
-rw-r--r--res/res_pjsip_mwi.c2
-rw-r--r--res/res_pjsip_pubsub.c2
-rw-r--r--res/res_pjsip_refer.c2
-rw-r--r--res/res_stasis_device_state.c2
-rw-r--r--res/res_xmpp.c2
-rw-r--r--tests/test_stasis.c310
24 files changed, 542 insertions, 46 deletions
diff --git a/CHANGES b/CHANGES
index 0c46e0d6d..f2c1314dd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -39,6 +39,16 @@ chan_pjsip
the message will automatically be associated with the configured endpoint on the
outbound registration.
+
+Core
+------------------
+ * The core of Asterisk uses a message bus called "Stasis" to distribute
+ information to internal components. For performance reasons, the message
+ distribution was modified to make use of a thread pool instead of a
+ dedicated thread per consumer in certain cases. The initial settings for
+ the thread pool can now be configured in 'stasis.conf'.
+
+
Functions
------------------
@@ -355,7 +365,7 @@ AMI
* AMI action PJSIPNotify may now send to a URI instead of only to a PJSIP
endpoint as long as a default outbound endpoint is set. This also applies
to the equivalent CLI command (pjsip send notify)
-
+
* The AMI action PJSIPShowEndpoint now includes ContactStatusDetail sections
that give information on Asterisk's attempts to qualify the endpoint.
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 4820c662d..1cf1f6ad3 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -6097,7 +6097,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
return -1;
}
- queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
+ queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all());
if (!queue_data->bridge_router) {
ao2_ref(queue_data, -1);
return -1;
@@ -6112,7 +6112,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str
stasis_message_router_set_default(queue_data->bridge_router,
queue_bridge_cb, queue_data);
- queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all());
+ queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all());
if (!queue_data->channel_router) {
/* Unsubscribing from the bridge router will remove the only ref of queue_data,
* thus beginning the destruction process
diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c
index 9ccec641f..8ef0cec30 100644
--- a/channels/chan_dahdi.c
+++ b/channels/chan_dahdi.c
@@ -12581,7 +12581,7 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
mailbox_specific_topic = ast_mwi_topic(tmp->mailbox);
if (mailbox_specific_topic) {
- tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+ tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
}
}
#ifdef HAVE_DAHDI_LINEREVERSE_VMWI
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index d093438c8..39861db0e 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -13096,7 +13096,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
mailbox_specific_topic = ast_mwi_topic(peer->mailbox);
if (mailbox_specific_topic) {
- peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+ peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
}
}
diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c
index 72898b8ff..95fa2dc7c 100644
--- a/channels/chan_mgcp.c
+++ b/channels/chan_mgcp.c
@@ -4237,7 +4237,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
mailbox_specific_topic = ast_mwi_topic(e->mailbox);
if (mailbox_specific_topic) {
- e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+ e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL);
}
}
snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random());
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index 192956362..2cbef5243 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -27258,7 +27258,7 @@ static void add_peer_mwi_subs(struct sip_peer *peer)
if (!peer_name) {
return;
}
- mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name);
+ mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name);
}
}
}
diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c
index 4b7f353a0..b8202d33b 100644
--- a/channels/chan_skinny.c
+++ b/channels/chan_skinny.c
@@ -8295,7 +8295,7 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
mailbox_specific_topic = ast_mwi_topic(l->mailbox);
if (mailbox_specific_topic) {
- l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l);
+ l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l);
}
}
diff --git a/channels/sig_pri.c b/channels/sig_pri.c
index e9e17322f..a26b56611 100644
--- a/channels/sig_pri.c
+++ b/channels/sig_pri.c
@@ -9174,7 +9174,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
mailbox_specific_topic = ast_mwi_topic(mbox_id);
if (mailbox_specific_topic) {
- pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
+ pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
}
if (!pri->mbox[i].sub) {
ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n",
diff --git a/configs/samples/stasis.conf.sample b/configs/samples/stasis.conf.sample
index 3aac230cb..e591e7637 100644
--- a/configs/samples/stasis.conf.sample
+++ b/configs/samples/stasis.conf.sample
@@ -1,3 +1,13 @@
+[threadpool]
+;initial_size = 5 ; Initial size of the threadpool.
+; ; 0 means the threadpool has no threads initially
+; ; until a task needs a thread.
+;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before
+; ; dying. 0 means threads never time out.
+;max_size = 50 ; Maximum number of threads in the Stasis threadpool.
+; ; 0 means no limit to the number of threads in the
+; ; threadpool.
+
[declined_message_types]
; This config section contains the names of message types that should be prevented
; from being created. By default, all message types are allowed to be created.
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 4189513ac..0b1b1e83f 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -542,6 +542,31 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback, void *data);
/*!
+ * \brief Create a subscription whose callbacks occur on a thread pool
+ *
+ * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
+ * up this reference), the subscription must be explicitly unsubscribed from its
+ * topic using stasis_unsubscribe().
+ *
+ * The invocations of the callback are serialized, but will almost certainly not
+ * always happen on the same thread. The invocation order of different subscriptions
+ * is unspecified.
+ *
+ * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
+ * dispatch items to its \c callback. This form of subscription should be used
+ * when many subscriptions may be made to the specified \c topic.
+ *
+ * \param topic Topic to subscribe to.
+ * \param callback Callback function for subscription messages.
+ * \param data Data to be passed to the callback, in addition to the message.
+ * \return New \ref stasis_subscription object.
+ * \return \c NULL on error.
+ * \since 12.8.0
+ */
+struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
+ stasis_subscription_cb callback, void *data);
+
+/*!
* \brief Cancel a subscription.
*
* Note that in an asynchronous system, there may still be messages queued or
diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h
index bb7b6cc0a..bc6122c2b 100644
--- a/include/asterisk/stasis_internal.h
+++ b/include/asterisk/stasis_internal.h
@@ -52,8 +52,10 @@
* \param callback Callback function for subscription messages.
* \param data Data to be passed to the callback, in addition to the message.
* \param needs_mailbox Determines whether or not the subscription requires a mailbox.
- * Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool;
+ * Subscriptions with mailboxes will be delivered on some non-publisher thread;
* subscriptions without mailboxes will be delivered on the publisher thread.
+ * \param use_thread_pool Use the thread pool for the subscription. This is only
+ * relevant if \c needs_mailbox is non-zero.
* \return New \ref stasis_subscription object.
* \return \c NULL on error.
* \since 12
@@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox);
+ int needs_mailbox,
+ int use_thread_pool);
#endif /* STASIS_INTERNAL_H_ */
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 613a2bd7f..89657a5ee 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -59,6 +59,22 @@ struct stasis_message_router *stasis_message_router_create(
struct stasis_topic *topic);
/*!
+ * \brief Create a new message router object.
+ *
+ * The subscription created for this message router will dispatch
+ * callbacks on a thread pool.
+ *
+ * \param topic Topic to subscribe route to.
+ *
+ * \return New \ref stasis_message_router.
+ * \return \c NULL on error.
+ *
+ * \since 12.8.0
+ */
+struct stasis_message_router *stasis_message_router_create_pool(
+ struct stasis_topic *topic);
+
+/*!
* \brief Unsubscribe the router from the upstream topic.
*
* \param router Router to unsubscribe.
diff --git a/main/endpoints.c b/main/endpoints.c
index cc2eccc70..f8cca45b8 100644
--- a/main/endpoints.c
+++ b/main/endpoints.c
@@ -310,7 +310,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
}
if (!ast_strlen_zero(resource)) {
- endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
+ endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
if (!endpoint->router) {
return NULL;
}
diff --git a/main/stasis.c b/main/stasis.c
index b85135a5b..dbb6e4c12 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
@@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
</managerEvent>
<configInfo name="stasis" language="en_US">
<configFile name="stasis.conf">
+ <configObject name="threadpool">
+ <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+ <configOption name="initial_size" default="5">
+ <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+ </configOption>
+ <configOption name="idle_timeout_sec" default="20">
+ <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+ </configOption>
+ <configOption name="max_size" default="50">
+ <synopsis>Maximum number of threads in the threadpool.</synopsis>
+ </configOption>
+ </configObject>
<configObject name="declined_message_types">
<synopsis>Stasis message types for which to decline creation.</synopsis>
<configOption name="decline">
@@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
@@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox)
+ int needs_mailbox,
+ int use_thread_pool)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe(
if (!sub) {
return NULL;
}
-
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
/* With a small number of subscribers, a thread-per-sub is
- * acceptable. If our usage changes so that we have larger
- * numbers of subscribers, we'll probably want to consider
- * a threadpool. We had that originally, but with so few
- * subscribers it was actually a performance loss instead of
- * a gain.
+ * acceptable. For larger number of subscribers, a thread
+ * pool should be used.
*/
- sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
- TPS_REF_DEFAULT);
+ if (use_thread_pool) {
+ sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ } else {
+ sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+ TPS_REF_DEFAULT);
+ }
if (!sub->mailbox) {
return NULL;
}
@@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe(
stasis_subscription_cb callback,
void *data)
{
- return internal_stasis_subscribe(topic, callback, data, 1);
+ return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
static int sub_cleanup(void *data)
@@ -1365,11 +1390,33 @@ struct stasis_declined_config {
struct ao2_container *declined;
};
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+ /*! Initial size of the thread pool */
+ int initial_size;
+ /*! Time, in seconds, before we expire a thread */
+ int idle_timeout_sec;
+ /*! Maximum number of thread to allow */
+ int max_size;
+};
struct stasis_config {
+ /*! Thread pool configuration options */
+ struct stasis_threadpool_conf *threadpool_options;
+ /*! Declined message types */
struct stasis_declined_config *declined_message_types;
};
+static struct aco_type threadpool_option = {
+ .type = ACO_GLOBAL,
+ .name = "threadpool",
+ .item_offset = offsetof(struct stasis_config, threadpool_options),
+ .category = "^threadpool$",
+ .category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
static struct aco_type declined_option = {
.type = ACO_GLOBAL,
@@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
struct aco_file stasis_conf = {
.filename = "stasis.conf",
- .types = ACO_TYPES(&declined_option),
+ .types = ACO_TYPES(&declined_option, &threadpool_option),
};
/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
@@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
static void stasis_declined_config_destructor(void *obj)
{
struct stasis_declined_config *declined = obj;
+
ao2_cleanup(declined->declined);
}
static void stasis_config_destructor(void *obj)
{
struct stasis_config *cfg = obj;
+
ao2_cleanup(cfg->declined_message_types);
+ ast_free(cfg->threadpool_options);
}
static void *stasis_config_alloc(void)
@@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void)
return NULL;
}
- /* Allocate/initialize memory */
- cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
+ cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+ if (!cfg->threadpool_options) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+ stasis_declined_config_destructor);
if (!cfg->declined_message_types) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
cfg->declined_message_types->declined = ast_str_container_alloc(13);
if (!cfg->declined_message_types->declined) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
return cfg;
-error:
- ao2_ref(cfg, -1);
- return NULL;
}
int stasis_message_type_declined(const char *name)
@@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -1489,27 +1551,71 @@ static void stasis_cleanup(void)
int stasis_init(void)
{
+ RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
int cache_init;
+ struct ast_threadpool_options threadpool_opts = { 0, };
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
+ ast_register_atexit(stasis_exit);
if (aco_info_init(&cfg_info)) {
return -1;
}
- aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
+ aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+ declined_options, "", declined_handler, 0);
+ aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+ threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+ threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+ threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, max_size), 0,
+ INT_MAX);
if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
- RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
+ struct stasis_config *default_cfg = stasis_config_alloc();
+
+ if (!default_cfg) {
+ return -1;
+ }
- if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
+ if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+ ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+ ao2_ref(default_cfg, -1);
+ return -1;
+ }
+
+ if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
return -1;
}
- ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
- ao2_global_obj_replace_unref(globals, stasis_cfg);
+ ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+ ao2_global_obj_replace_unref(globals, default_cfg);
+ cfg = default_cfg;
+ } else {
+ cfg = ao2_global_obj_ref(globals);
+ if (!cfg) {
+ ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+ return -1;
+ }
+ }
+
+ threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+ threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+ threadpool_opts.auto_increment = 1;
+ threadpool_opts.max_size = cfg->threadpool_options->max_size;
+ threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+ pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+ return -1;
}
cache_init = stasis_cache_init();
diff --git a/main/stasis_cache.c b/main/stasis_cache.c
index c492307d6..9129c0064 100644
--- a/main/stasis_cache.c
+++ b/main/stasis_cache.c
@@ -894,7 +894,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
ao2_ref(cache, +1);
caching_topic->cache = cache;
- sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
+ sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
if (sub == NULL) {
return NULL;
}
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index da288e864..a9e458456 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -206,8 +206,8 @@ static void router_dispatch(void *data,
}
}
-struct stasis_message_router *stasis_message_router_create(
- struct stasis_topic *topic)
+static struct stasis_message_router *stasis_message_router_create_internal(
+ struct stasis_topic *topic, int use_thread_pool)
{
int res;
RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
@@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
- router->subscription = stasis_subscribe(topic, router_dispatch, router);
+ if (use_thread_pool) {
+ router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
+ } else {
+ router->subscription = stasis_subscribe(topic, router_dispatch, router);
+ }
if (!router->subscription) {
return NULL;
}
@@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create(
return router;
}
+struct stasis_message_router *stasis_message_router_create(
+ struct stasis_topic *topic)
+{
+ return stasis_message_router_create_internal(topic, 0);
+}
+
+struct stasis_message_router *stasis_message_router_create_pool(
+ struct stasis_topic *topic)
+{
+ return stasis_message_router_create_internal(topic, 1);
+}
+
void stasis_message_router_unsubscribe(struct stasis_message_router *router)
{
if (!router) {
diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c
index 8bb57b62b..c5214b36a 100644
--- a/res/parking/parking_applications.c
+++ b/res/parking/parking_applications.c
@@ -832,7 +832,7 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data
return -1;
}
- if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) {
+ if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) {
/* Failed to create subscription */
park_announce_subscription_data_destroy(pa_data);
return -1;
diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c
index 61cb85f00..a21be9068 100644
--- a/res/parking/parking_bridge_features.c
+++ b/res/parking/parking_bridge_features.c
@@ -192,7 +192,7 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char
strcpy(subscription_data->parkee_uuid, parkee_uuid);
strcpy(subscription_data->parker_uuid, parker_uuid);
- if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) {
+ if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) {
return -1;
}
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index eaf0f32af..bf0925dd4 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -138,7 +138,7 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char
strcpy(mwi_stasis_sub->mailbox, mailbox);
ao2_ref(mwi_sub, +1);
ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id);
- mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub);
+ mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub);
return mwi_stasis_sub;
}
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index 344bda3cd..02deeb668 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -4257,7 +4257,7 @@ static int load_module(void)
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, subscription_persistence_load, NULL);
} else {
- stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+ stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
}
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c
index 99d43fd2f..7b8c53761 100644
--- a/res/res_pjsip_refer.c
+++ b/res/res_pjsip_refer.c
@@ -550,7 +550,7 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann
/* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to
* detect if the transfer target has answered the call
*/
- refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
+ refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress);
if (!refer->progress->bridge_sub) {
struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200,
PJSIP_EVSUB_STATE_TERMINATED);
diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c
index 40219c007..8a1c23049 100644
--- a/res/res_stasis_device_state.c
+++ b/res/res_stasis_device_state.c
@@ -330,7 +330,7 @@ static int subscribe_device_state(struct stasis_app *app, void *obj)
return 0;
}
- if (!(sub->sub = stasis_subscribe(
+ if (!(sub->sub = stasis_subscribe_pool(
ast_device_state_topic(sub->device_name),
device_state_cb, sub))) {
ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
diff --git a/res/res_xmpp.c b/res/res_xmpp.c
index 3cb6fc572..e3eff9390 100644
--- a/res/res_xmpp.c
+++ b/res/res_xmpp.c
@@ -1606,7 +1606,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
xmpp_pubsub_unsubscribe(client, "device_state");
xmpp_pubsub_unsubscribe(client, "message_waiting");
- if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
+ if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
return;
}
diff --git a/tests/test_stasis.c b/tests/test_stasis.c
index ba82e83ad..2e83e3b70 100644
--- a/tests/test_stasis.c
+++ b/tests/test_stasis.c
@@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(subscription_pool_messages)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
+ int complete;
+ struct stasis_subscription_change *change;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
+ info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(0);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+ expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
+
+ uut = stasis_unsubscribe(uut);
+ complete = consumer_wait_for_completion(consumer);
+ ast_test_validate(test, 1 == complete);
+
+ ast_test_validate(test, 2 == consumer->messages_rxed_len);
+ ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
+ ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
+
+ change = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ change = stasis_message_data(consumer->messages_rxed[1]);
+ ast_test_validate(test, topic == change->topic);
+ ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
+ ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(publish)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(publish_pool)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
+ int actual_len;
+ const char *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test publishing with a threadpool";
+ info->description = "Test publishing to a subscriber whose\n"
+ "subscription dictates messages are received through a\n"
+ "threadpool.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer = consumer_create(1);
+ ast_test_validate(test, NULL != consumer);
+
+ uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
+ ast_test_validate(test, NULL != uut);
+ ao2_ref(consumer, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message_type = stasis_message_type_create("TestMessage", NULL);
+ test_message = stasis_message_create(test_message_type, test_data);
+
+ stasis_publish(topic, test_message);
+
+ actual_len = consumer_wait_for(consumer, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual = stasis_message_data(consumer->messages_rxed[0]);
+ ast_test_validate(test, test_data == actual);
+
+ return AST_TEST_PASS;
+}
+
AST_TEST_DEFINE(unsubscribe_stops_messages)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(subscription_interleaving)
+{
+ RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
+
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+
+ RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
+ RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
+ RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
+
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+
+ int actual_len;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test sending interleaved events to a parent topic with different subscribers";
+ info->description = "Test sending events to a parent topic.\n"
+ "This test creates three topics (one parent, two children)\n"
+ "and publishes messages alternately between the children.\n"
+ "It verifies that the messages are received in the expected\n"
+ "order, for different subscription types: one with a dedicated\n"
+ "thread, the other on the Stasis threadpool.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ test_message_type = stasis_message_type_create("test", NULL);
+ ast_test_validate(test, NULL != test_message_type);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+
+ test_message1 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ parent_topic = stasis_topic_create("ParentTestTopic");
+ ast_test_validate(test, NULL != parent_topic);
+ topic1 = stasis_topic_create("Topic1");
+ ast_test_validate(test, NULL != topic1);
+ topic2 = stasis_topic_create("Topic2");
+ ast_test_validate(test, NULL != topic2);
+
+ forward_sub1 = stasis_forward_all(topic1, parent_topic);
+ ast_test_validate(test, NULL != forward_sub1);
+ forward_sub2 = stasis_forward_all(topic2, parent_topic);
+ ast_test_validate(test, NULL != forward_sub2);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+
+ sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
+ ast_test_validate(test, NULL != sub1);
+ ao2_ref(consumer1, +1);
+
+ sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
+ ast_test_validate(test, NULL != sub2);
+ ao2_ref(consumer2, +1);
+
+ stasis_publish(topic1, test_message1);
+ stasis_publish(topic2, test_message2);
+ stasis_publish(topic1, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ actual_len = consumer_wait_for(consumer2, 3);
+ ast_test_validate(test, 3 == actual_len);
+
+ ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
+
+ ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
+ ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
+ ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
+
+ return AST_TEST_PASS;
+}
+
struct cache_test_data {
char *id;
char *value;
@@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS;
}
+AST_TEST_DEFINE(router_pool)
+{
+ RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
+ RAII_VAR(char *, test_data, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
+ RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
+ RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
+ int actual_len, ret;
+ struct stasis_message *actual;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = test_category;
+ info->summary = "Test message routing via threadpool";
+ info->description = "Test simple message routing when\n"
+ "the subscriptions dictate usage of the Stasis\n"
+ "threadpool.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ topic = stasis_topic_create("TestTopic");
+ ast_test_validate(test, NULL != topic);
+
+ consumer1 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer1);
+ consumer2 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer2);
+ consumer3 = consumer_create(1);
+ ast_test_validate(test, NULL != consumer3);
+
+ test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
+ ast_test_validate(test, NULL != test_message_type1);
+ test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
+ ast_test_validate(test, NULL != test_message_type2);
+ test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
+ ast_test_validate(test, NULL != test_message_type3);
+
+ uut = stasis_message_router_create_pool(topic);
+ ast_test_validate(test, NULL != uut);
+
+ ret = stasis_message_router_add(
+ uut, test_message_type1, consumer_exec, consumer1);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer1, +1);
+ ret = stasis_message_router_add(
+ uut, test_message_type2, consumer_exec, consumer2);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer2, +1);
+ ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
+ ast_test_validate(test, 0 == ret);
+ ao2_ref(consumer3, +1);
+
+ test_data = ao2_alloc(1, NULL);
+ ast_test_validate(test, NULL != test_data);
+ test_message1 = stasis_message_create(test_message_type1, test_data);
+ ast_test_validate(test, NULL != test_message1);
+ test_message2 = stasis_message_create(test_message_type2, test_data);
+ ast_test_validate(test, NULL != test_message2);
+ test_message3 = stasis_message_create(test_message_type3, test_data);
+ ast_test_validate(test, NULL != test_message3);
+
+ stasis_publish(topic, test_message1);
+ stasis_publish(topic, test_message2);
+ stasis_publish(topic, test_message3);
+
+ actual_len = consumer_wait_for(consumer1, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer2, 1);
+ ast_test_validate(test, 1 == actual_len);
+ actual_len = consumer_wait_for(consumer3, 1);
+ ast_test_validate(test, 1 == actual_len);
+
+ actual = consumer1->messages_rxed[0];
+ ast_test_validate(test, test_message1 == actual);
+
+ actual = consumer2->messages_rxed[0];
+ ast_test_validate(test, test_message2 == actual);
+
+ actual = consumer3->messages_rxed[0];
+ ast_test_validate(test, test_message3 == actual);
+
+ /* consumer1 and consumer2 do not get the final message. */
+ ao2_cleanup(consumer1);
+ ao2_cleanup(consumer2);
+
+ return AST_TEST_PASS;
+}
+
static const char *cache_simple(struct stasis_message *message)
{
const char *type_name =
@@ -1748,8 +2050,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER(message_type);
AST_TEST_UNREGISTER(message);
AST_TEST_UNREGISTER(subscription_messages);
+ AST_TEST_UNREGISTER(subscription_pool_messages);
AST_TEST_UNREGISTER(publish);
AST_TEST_UNREGISTER(publish_sync);
+ AST_TEST_UNREGISTER(publish_pool);
AST_TEST_UNREGISTER(unsubscribe_stops_messages);
AST_TEST_UNREGISTER(forward);
AST_TEST_UNREGISTER(cache_filter);
@@ -1757,8 +2061,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(cache_eid_aggregate);
AST_TEST_UNREGISTER(router);
+ AST_TEST_UNREGISTER(router_pool);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
+ AST_TEST_UNREGISTER(subscription_interleaving);
AST_TEST_UNREGISTER(no_to_json);
AST_TEST_UNREGISTER(to_json);
AST_TEST_UNREGISTER(no_to_ami);
@@ -1773,8 +2079,10 @@ static int load_module(void)
AST_TEST_REGISTER(message_type);
AST_TEST_REGISTER(message);
AST_TEST_REGISTER(subscription_messages);
+ AST_TEST_REGISTER(subscription_pool_messages);
AST_TEST_REGISTER(publish);
AST_TEST_REGISTER(publish_sync);
+ AST_TEST_REGISTER(publish_pool);
AST_TEST_REGISTER(unsubscribe_stops_messages);
AST_TEST_REGISTER(forward);
AST_TEST_REGISTER(cache_filter);
@@ -1782,8 +2090,10 @@ static int load_module(void)
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(cache_eid_aggregate);
AST_TEST_REGISTER(router);
+ AST_TEST_REGISTER(router_pool);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);
+ AST_TEST_REGISTER(subscription_interleaving);
AST_TEST_REGISTER(no_to_json);
AST_TEST_REGISTER(to_json);
AST_TEST_REGISTER(no_to_ami);