diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/endpoints.c | 2 | ||||
-rw-r--r-- | main/stasis.c | 152 | ||||
-rw-r--r-- | main/stasis_cache.c | 2 | ||||
-rw-r--r-- | main/stasis_message_router.c | 22 |
4 files changed, 150 insertions, 28 deletions
diff --git a/main/endpoints.c b/main/endpoints.c index cc2eccc70..f8cca45b8 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -310,7 +310,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha } if (!ast_strlen_zero(resource)) { - endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); + endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } diff --git a/main/stasis.c b/main/stasis.c index b85135a5b..dbb6e4c12 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" #include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" @@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); </managerEvent> <configInfo name="stasis" language="en_US"> <configFile name="stasis.conf"> + <configObject name="threadpool"> + <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis> + <configOption name="initial_size" default="5"> + <synopsis>Initial number of threads in the message bus threadpool.</synopsis> + </configOption> + <configOption name="idle_timeout_sec" default="20"> + <synopsis>Number of seconds before an idle thread is disposed of.</synopsis> + </configOption> + <configOption name="max_size" default="50"> + <synopsis>Maximum number of threads in the threadpool.</synopsis> + </configOption> + </configObject> <configObject name="declined_message_types"> <synopsis>Stasis message types for which to decline creation.</synopsis> <configOption name="decline"> @@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 +/*! Thread pool for topics that don't want a dedicated taskprocessor */ +static struct ast_threadpool *pool; + STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ @@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, - int needs_mailbox) + int needs_mailbox, + int use_thread_pool) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); @@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe( if (!sub) { return NULL; } - ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); if (needs_mailbox) { /* With a small number of subscribers, a thread-per-sub is - * acceptable. If our usage changes so that we have larger - * numbers of subscribers, we'll probably want to consider - * a threadpool. We had that originally, but with so few - * subscribers it was actually a performance loss instead of - * a gain. + * acceptable. For larger number of subscribers, a thread + * pool should be used. */ - sub->mailbox = ast_taskprocessor_get(sub->uniqueid, - TPS_REF_DEFAULT); + if (use_thread_pool) { + sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + } else { + sub->mailbox = ast_taskprocessor_get(sub->uniqueid, + TPS_REF_DEFAULT); + } if (!sub->mailbox) { return NULL; } @@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe( stasis_subscription_cb callback, void *data) { - return internal_stasis_subscribe(topic, callback, data, 1); + return internal_stasis_subscribe(topic, callback, data, 1, 0); +} + +struct stasis_subscription *stasis_subscribe_pool( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data) +{ + return internal_stasis_subscribe(topic, callback, data, 1, 1); } static int sub_cleanup(void *data) @@ -1365,11 +1390,33 @@ struct stasis_declined_config { struct ao2_container *declined; }; +/*! \brief Threadpool configuration options */ +struct stasis_threadpool_conf { + /*! Initial size of the thread pool */ + int initial_size; + /*! Time, in seconds, before we expire a thread */ + int idle_timeout_sec; + /*! Maximum number of thread to allow */ + int max_size; +}; struct stasis_config { + /*! Thread pool configuration options */ + struct stasis_threadpool_conf *threadpool_options; + /*! Declined message types */ struct stasis_declined_config *declined_message_types; }; +static struct aco_type threadpool_option = { + .type = ACO_GLOBAL, + .name = "threadpool", + .item_offset = offsetof(struct stasis_config, threadpool_options), + .category = "^threadpool$", + .category_match = ACO_WHITELIST, +}; + +static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option); + /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */ static struct aco_type declined_option = { .type = ACO_GLOBAL, @@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option); struct aco_file stasis_conf = { .filename = "stasis.conf", - .types = ACO_TYPES(&declined_option), + .types = ACO_TYPES(&declined_option, &threadpool_option), }; /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */ @@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc, static void stasis_declined_config_destructor(void *obj) { struct stasis_declined_config *declined = obj; + ao2_cleanup(declined->declined); } static void stasis_config_destructor(void *obj) { struct stasis_config *cfg = obj; + ao2_cleanup(cfg->declined_message_types); + ast_free(cfg->threadpool_options); } static void *stasis_config_alloc(void) @@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void) return NULL; } - /* Allocate/initialize memory */ - cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor); + cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options)); + if (!cfg->threadpool_options) { + ao2_ref(cfg, -1); + return NULL; + } + + cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), + stasis_declined_config_destructor); if (!cfg->declined_message_types) { - goto error; + ao2_ref(cfg, -1); + return NULL; } cfg->declined_message_types->declined = ast_str_container_alloc(13); if (!cfg->declined_message_types->declined) { - goto error; + ao2_ref(cfg, -1); + return NULL; } return cfg; -error: - ao2_ref(cfg, -1); - return NULL; } int stasis_message_type_declined(const char *name) @@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, /*! @} */ +/*! \brief Shutdown function */ +static void stasis_exit(void) +{ + ast_threadpool_shutdown(pool); + pool = NULL; +} + /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { @@ -1489,27 +1551,71 @@ static void stasis_cleanup(void) int stasis_init(void) { + RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup); int cache_init; + struct ast_threadpool_options threadpool_opts = { 0, }; /* Be sure the types are cleaned up after the message bus */ ast_register_cleanup(stasis_cleanup); + ast_register_atexit(stasis_exit); if (aco_info_init(&cfg_info)) { return -1; } - aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0); + aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, + declined_options, "", declined_handler, 0); + aco_option_register(&cfg_info, "initial_size", ACO_EXACT, + threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, initial_size), 0, + INT_MAX); + aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT, + threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0, + INT_MAX); + aco_option_register(&cfg_info, "max_size", ACO_EXACT, + threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, max_size), 0, + INT_MAX); if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) { - RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup); + struct stasis_config *default_cfg = stasis_config_alloc(); + + if (!default_cfg) { + return -1; + } - if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) { + if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { + ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); + ao2_ref(default_cfg, -1); + return -1; + } + + if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) { ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n"); return -1; } - ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n"); - ao2_global_obj_replace_unref(globals, stasis_cfg); + ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n"); + ao2_global_obj_replace_unref(globals, default_cfg); + cfg = default_cfg; + } else { + cfg = ao2_global_obj_ref(globals); + if (!cfg) { + ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n"); + return -1; + } + } + + threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION; + threadpool_opts.initial_size = cfg->threadpool_options->initial_size; + threadpool_opts.auto_increment = 1; + threadpool_opts.max_size = cfg->threadpool_options->max_size; + threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; + pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts); + if (!pool) { + ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); + return -1; } cache_init = stasis_cache_init(); diff --git a/main/stasis_cache.c b/main/stasis_cache.c index c492307d6..9129c0064 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -894,7 +894,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or ao2_ref(cache, +1); caching_topic->cache = cache; - sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0); + sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0); if (sub == NULL) { return NULL; } diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index da288e864..a9e458456 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -206,8 +206,8 @@ static void router_dispatch(void *data, } } -struct stasis_message_router *stasis_message_router_create( - struct stasis_topic *topic) +static struct stasis_message_router *stasis_message_router_create_internal( + struct stasis_topic *topic, int use_thread_pool) { int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); @@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->subscription = stasis_subscribe(topic, router_dispatch, router); + if (use_thread_pool) { + router->subscription = stasis_subscribe_pool(topic, router_dispatch, router); + } else { + router->subscription = stasis_subscribe(topic, router_dispatch, router); + } if (!router->subscription) { return NULL; } @@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create( return router; } +struct stasis_message_router *stasis_message_router_create( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 0); +} + +struct stasis_message_router *stasis_message_router_create_pool( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 1); +} + void stasis_message_router_unsubscribe(struct stasis_message_router *router) { if (!router) { |