summaryrefslogtreecommitdiff
path: root/main/stasis.c
diff options
context:
space:
mode:
authorMatthew Jordan <mjordan@digium.com>2014-12-01 17:59:21 +0000
committerMatthew Jordan <mjordan@digium.com>2014-12-01 17:59:21 +0000
commit1106e8fd0f831c79eb2a7cf079739561da11d6ef (patch)
tree3e841771528c621760a6b0471935bc432e704054 /main/stasis.c
parentef9ca8bc32c7cce781c35bca81d9ae544fd22dae (diff)
main/stasis: Allow subscriptions to use a threadpool for message delivery
Prior to this patch, all Stasis subscriptions would receive a dedicated thread for servicing published messages. In contrast, prior to r400178 (see review https://reviewboard.asterisk.org/r/2881/), the subscriptions shared a thread pool. It was discovered during some initial work on Stasis that, for a low subscription count with high message throughput, the threadpool was not as performant as simply having a dedicated thread per subscriber. For situations where a subscriber receives a substantial number of messages and is always present, the model of having a dedicated thread per subscriber makes sense. While we still have plenty of subscriptions that would follow this model, e.g., AMI, CDRs, CEL, etc., there are plenty that also fall into the following two categories: * Large number of subscriptions, specifically those tied to endpoints/peers. * Low number of messages. Some subscriptions exist specifically to coordinate a single message - the subscription is created, a message is published, the delivery is synchronized, and the subscription is destroyed. In both of the latter two cases, creating a dedicated thread is wasteful (and in the case of a large number of peers/endpoints, harmful). In those cases, having shared delivery threads is far more performant. This patch adds the ability of a subscriber to Stasis to choose whether or not their messages are dispatched on a dedicated thread or on a threadpool. The threadpool is configurable through stasis.conf. Review: https://reviewboard.asterisk.org/r/4193 ASTERISK-24533 #close Reported by: xrobau Tested by: xrobau ........ Merged revisions 428681 from http://svn.asterisk.org/svn/asterisk/branches/12 ........ Merged revisions 428687 from http://svn.asterisk.org/svn/asterisk/branches/13 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@428688 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/stasis.c')
-rw-r--r--main/stasis.c152
1 files changed, 129 insertions, 23 deletions
diff --git a/main/stasis.c b/main/stasis.c
index b85135a5b..dbb6e4c12 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
@@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
</managerEvent>
<configInfo name="stasis" language="en_US">
<configFile name="stasis.conf">
+ <configObject name="threadpool">
+ <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis>
+ <configOption name="initial_size" default="5">
+ <synopsis>Initial number of threads in the message bus threadpool.</synopsis>
+ </configOption>
+ <configOption name="idle_timeout_sec" default="20">
+ <synopsis>Number of seconds before an idle thread is disposed of.</synopsis>
+ </configOption>
+ <configOption name="max_size" default="50">
+ <synopsis>Maximum number of threads in the threadpool.</synopsis>
+ </configOption>
+ </configObject>
<configObject name="declined_message_types">
<synopsis>Stasis message types for which to decline creation.</synopsis>
<configOption name="decline">
@@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
+/*! Thread pool for topics that don't want a dedicated taskprocessor */
+static struct ast_threadpool *pool;
+
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
@@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data,
- int needs_mailbox)
+ int needs_mailbox,
+ int use_thread_pool)
{
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
@@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe(
if (!sub) {
return NULL;
}
-
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
/* With a small number of subscribers, a thread-per-sub is
- * acceptable. If our usage changes so that we have larger
- * numbers of subscribers, we'll probably want to consider
- * a threadpool. We had that originally, but with so few
- * subscribers it was actually a performance loss instead of
- * a gain.
+ * acceptable. For larger number of subscribers, a thread
+ * pool should be used.
*/
- sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
- TPS_REF_DEFAULT);
+ if (use_thread_pool) {
+ sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
+ } else {
+ sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
+ TPS_REF_DEFAULT);
+ }
if (!sub->mailbox) {
return NULL;
}
@@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe(
stasis_subscription_cb callback,
void *data)
{
- return internal_stasis_subscribe(topic, callback, data, 1);
+ return internal_stasis_subscribe(topic, callback, data, 1, 0);
+}
+
+struct stasis_subscription *stasis_subscribe_pool(
+ struct stasis_topic *topic,
+ stasis_subscription_cb callback,
+ void *data)
+{
+ return internal_stasis_subscribe(topic, callback, data, 1, 1);
}
static int sub_cleanup(void *data)
@@ -1365,11 +1390,33 @@ struct stasis_declined_config {
struct ao2_container *declined;
};
+/*! \brief Threadpool configuration options */
+struct stasis_threadpool_conf {
+ /*! Initial size of the thread pool */
+ int initial_size;
+ /*! Time, in seconds, before we expire a thread */
+ int idle_timeout_sec;
+ /*! Maximum number of thread to allow */
+ int max_size;
+};
struct stasis_config {
+ /*! Thread pool configuration options */
+ struct stasis_threadpool_conf *threadpool_options;
+ /*! Declined message types */
struct stasis_declined_config *declined_message_types;
};
+static struct aco_type threadpool_option = {
+ .type = ACO_GLOBAL,
+ .name = "threadpool",
+ .item_offset = offsetof(struct stasis_config, threadpool_options),
+ .category = "^threadpool$",
+ .category_match = ACO_WHITELIST,
+};
+
+static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
+
/*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */
static struct aco_type declined_option = {
.type = ACO_GLOBAL,
@@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option);
struct aco_file stasis_conf = {
.filename = "stasis.conf",
- .types = ACO_TYPES(&declined_option),
+ .types = ACO_TYPES(&declined_option, &threadpool_option),
};
/*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */
@@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc,
static void stasis_declined_config_destructor(void *obj)
{
struct stasis_declined_config *declined = obj;
+
ao2_cleanup(declined->declined);
}
static void stasis_config_destructor(void *obj)
{
struct stasis_config *cfg = obj;
+
ao2_cleanup(cfg->declined_message_types);
+ ast_free(cfg->threadpool_options);
}
static void *stasis_config_alloc(void)
@@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void)
return NULL;
}
- /* Allocate/initialize memory */
- cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor);
+ cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options));
+ if (!cfg->threadpool_options) {
+ ao2_ref(cfg, -1);
+ return NULL;
+ }
+
+ cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types),
+ stasis_declined_config_destructor);
if (!cfg->declined_message_types) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
cfg->declined_message_types->declined = ast_str_container_alloc(13);
if (!cfg->declined_message_types->declined) {
- goto error;
+ ao2_ref(cfg, -1);
+ return NULL;
}
return cfg;
-error:
- ao2_ref(cfg, -1);
- return NULL;
}
int stasis_message_type_declined(const char *name)
@@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
+/*! \brief Shutdown function */
+static void stasis_exit(void)
+{
+ ast_threadpool_shutdown(pool);
+ pool = NULL;
+}
+
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@@ -1489,27 +1551,71 @@ static void stasis_cleanup(void)
int stasis_init(void)
{
+ RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup);
int cache_init;
+ struct ast_threadpool_options threadpool_opts = { 0, };
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
+ ast_register_atexit(stasis_exit);
if (aco_info_init(&cfg_info)) {
return -1;
}
- aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0);
+ aco_option_register_custom(&cfg_info, "decline", ACO_EXACT,
+ declined_options, "", declined_handler, 0);
+ aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
+ threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, initial_size), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
+ threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
+ INT_MAX);
+ aco_option_register(&cfg_info, "max_size", ACO_EXACT,
+ threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE,
+ FLDSET(struct stasis_threadpool_conf, max_size), 0,
+ INT_MAX);
if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) {
- RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup);
+ struct stasis_config *default_cfg = stasis_config_alloc();
+
+ if (!default_cfg) {
+ return -1;
+ }
- if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) {
+ if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
+ ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
+ ao2_ref(default_cfg, -1);
+ return -1;
+ }
+
+ if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
return -1;
}
- ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n");
- ao2_global_obj_replace_unref(globals, stasis_cfg);
+ ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n");
+ ao2_global_obj_replace_unref(globals, default_cfg);
+ cfg = default_cfg;
+ } else {
+ cfg = ao2_global_obj_ref(globals);
+ if (!cfg) {
+ ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
+ return -1;
+ }
+ }
+
+ threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION;
+ threadpool_opts.initial_size = cfg->threadpool_options->initial_size;
+ threadpool_opts.auto_increment = 1;
+ threadpool_opts.max_size = cfg->threadpool_options->max_size;
+ threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
+ pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
+ return -1;
}
cache_init = stasis_cache_init();