summaryrefslogtreecommitdiff
path: root/main/stasis.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/stasis.c')
-rw-r--r--main/stasis.c152
1 files changed, 129 insertions, 23 deletions
diff --git a/main/stasis.c b/main/stasis.c
index b85135a5b..dbb6e4c12 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
@@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
</managerEvent>
<configInfo name="stasis" language="en_US">
<configFile name="stasis.conf">
+ <configObject name="threadpool">
+ <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+ <configOption name="initial_size" default="5">
+ <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+ </configOption>
+ <configOption name="idle_timeout_sec" default="20">
+ <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+ </configOption>
+ <configOption name="max_size" default="50">
+ <synopsis>Maximum number of threads in the threadpool.</synopsis>
+ </configOption>
+ </configObject>
<configObject name="declined_message_types">
<synopsis>Stasis message types for which to decline creation.</synopsis>
<configOption name="decline">
@@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
@@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox)
+ int needs_mailbox,
+ int use_thread_pool)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe(
if (!sub) {
return NULL;
}
-
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
/* With a small number of subscribers, a thread-per-sub is
- * acceptable. If our usage changes so that we have larger
- * numbers of subscribers, we'll probably want to consider
- * a threadpool. We had that originally, but with so few
- * subscribers it was actually a performance loss instead of
- * a gain.
+ * acceptable. For larger number of subscribers, a thread
+ * pool should be used.
*/
- sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
- TPS_REF_DEFAULT);
+ if (use_thread_pool) {
+ sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ } else {
+ sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+ TPS_REF_DEFAULT);
+ }
if (!sub->mailbox) {
return NULL;
}
@@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe(
stasis_subscription_cb callback,
void *data)
{
- return internal_stasis_subscribe(topic, callback, data, 1);
+ return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
static int sub_cleanup(void *data)
@@ -1365,11 +1390,33 @@ struct stasis_declined_config {
struct ao2_container *declined;
};
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+ /*! Initial size of the thread pool */
+ int initial_size;
+ /*! Time, in seconds, before we expire a thread */
+ int idle_timeout_sec;
+ /*! Maximum number of thread to allow */
+ int max_size;
+};
struct stasis_config {
+ /*! Thread pool configuration options */
+ struct stasis_threadpool_conf *threadpool_options;
+ /*! Declined message types */
struct stasis_declined_config *declined_message_types;
};
+static struct aco_type threadpool_option = {
+ .type = ACO_GLOBAL,
+ .name = "threadpool",
+ .item_offset = offsetof(struct stasis_config, threadpool_options),
+ .category = "^threadpool$",
+ .category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
static struct aco_type declined_option = {
.type = ACO_GLOBAL,
@@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
struct aco_file stasis_conf = {
.filename = "stasis.conf",
- .types = ACO_TYPES(&declined_option),
+ .types = ACO_TYPES(&declined_option, &threadpool_option),
};
/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
@@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
static void stasis_declined_config_destructor(void *obj)
{
struct stasis_declined_config *declined = obj;
+
ao2_cleanup(declined->declined);
}
static void stasis_config_destructor(void *obj)
{
struct stasis_config *cfg = obj;
+
ao2_cleanup(cfg->declined_message_types);
+ ast_free(cfg->threadpool_options);
}
static void *stasis_config_alloc(void)
@@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void)
return NULL;
}
- /* Allocate/initialize memory */
- cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
+ cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+ if (!cfg->threadpool_options) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+ stasis_declined_config_destructor);
if (!cfg->declined_message_types) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
cfg->declined_message_types->declined = ast_str_container_alloc(13);
if (!cfg->declined_message_types->declined) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
return cfg;
-error:
- ao2_ref(cfg, -1);
- return NULL;
}
int stasis_message_type_declined(const char *name)
@@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -1489,27 +1551,71 @@ static void stasis_cleanup(void)
int stasis_init(void)
{
+ RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
int cache_init;
+ struct ast_threadpool_options threadpool_opts = { 0, };
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
+ ast_register_atexit(stasis_exit);
if (aco_info_init(&cfg_info)) {
return -1;
}
- aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
+ aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+ declined_options, "", declined_handler, 0);
+ aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+ threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+ threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+ threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, max_size), 0,
+ INT_MAX);
if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
- RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
+ struct stasis_config *default_cfg = stasis_config_alloc();
+
+ if (!default_cfg) {
+ return -1;
+ }
- if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
+ if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+ ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+ ao2_ref(default_cfg, -1);
+ return -1;
+ }
+
+ if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
return -1;
}
- ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
- ao2_global_obj_replace_unref(globals, stasis_cfg);
+ ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+ ao2_global_obj_replace_unref(globals, default_cfg);
+ cfg = default_cfg;
+ } else {
+ cfg = ao2_global_obj_ref(globals);
+ if (!cfg) {
+ ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+ return -1;
+ }
+ }
+
+ threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+ threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+ threadpool_opts.auto_increment = 1;
+ threadpool_opts.max_size = cfg->threadpool_options->max_size;
+ threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+ pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+ return -1;
}
cache_init = stasis_cache_init();