diff options
-rw-r--r-- | include/asterisk/threadpool.h | 13 | ||||
-rw-r--r-- | main/threadpool.c | 15 | ||||
-rw-r--r-- | tests/test_threadpool.c | 88 |
3 files changed, 109 insertions, 7 deletions
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 9834d9180..39792b1bb 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -86,12 +86,23 @@ struct ast_threadpool_options { #define AST_THREADPOOL_OPTIONS_VERSION 1 /*! Version of thradpool options in use */ int version; - /* ! + /*! * \brief Time limit in seconds for idle threads * * A time of 0 or less will mean an infinite timeout. */ int idle_timeout; + /*! + * \brief Number of threads to increment pool by + * + * If a task is added into a pool and no idle thread is + * available to activate, then the pool can automatically + * grow by the given amount. + * + * Zero is a perfectly valid value to give here if you want + * to control threadpool growth yourself via your listener. + */ + int auto_increment; }; /*! diff --git a/main/threadpool.c b/main/threadpool.c index 57520cf11..b3195fc4c 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -416,7 +416,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po /*! * \brief Activate idle threads * - * This function always returns CMP_MATCH because all threads that this + * This function always returns CMP_MATCH because all workers that this * function acts on need to be seen as matches so they are unlinked from the * list of idle threads. * @@ -425,7 +425,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po * \param arg The pool where the worker belongs * \retval CMP_MATCH */ -static int activate_threads(void *obj, void *arg, int flags) +static int activate_thread(void *obj, void *arg, int flags) { struct worker_thread *worker = obj; struct ast_threadpool *pool = arg; @@ -435,6 +435,8 @@ static int activate_threads(void *obj, void *arg, int flags) return CMP_MATCH; } +static void grow(struct ast_threadpool *pool, int delta); + /*! * \brief Queued task called when tasks are pushed into the threadpool * @@ -451,8 +453,13 @@ static int queued_task_pushed(void *data) int was_empty = tpd->was_empty; pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, - activate_threads, pool); + if (ao2_container_count(pool->idle_threads) == 0 && pool->options.auto_increment > 0) { + grow(pool, pool->options.auto_increment); + } else { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + } + threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; } diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 4fe63e222..38b79b58c 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -220,7 +220,7 @@ static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, s } if (!tld->empty_notice) { - ast_test_status_update(test, "Test listener never told that threadpool is empty\n"); + ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); res = AST_TEST_FAIL; } @@ -283,6 +283,7 @@ AST_TEST_DEFINE(threadpool_push) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -336,6 +337,7 @@ AST_TEST_DEFINE(threadpool_thread_creation) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -385,6 +387,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -443,6 +446,7 @@ AST_TEST_DEFINE(threadpool_thread_timeout) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 2, + .auto_increment = 0, }; switch (cmd) { @@ -505,6 +509,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -581,6 +586,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -645,7 +651,6 @@ end: ao2_cleanup(listener); ast_free(std); return res; - } AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) @@ -660,6 +665,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -737,7 +743,80 @@ end: ast_free(std2); ast_free(std3); return res; +} + +AST_TEST_DEFINE(threadpool_auto_increment) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 3, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "auto_increment"; + info->category = "/main/threadpool/"; + info->summary = "Test that the threadpool grows as tasks are added"; + info->description = + "Create an empty threadpool and push a task to it. Once the task is\n" + "pushed, the threadpool should add three threads and be able to\n" + "handle the task. The threads should then go idle\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks); + if (!listener) { + return AST_TEST_FAIL; + } + tld = listener->private_data; + + pool = ast_threadpool_create(info->name, listener, 0, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + /* Pushing the task should result in the threadpool growing + * by three threads. This will allow the task to actually execute + */ + res = wait_for_completion(test, std); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + if (res == AST_TEST_FAIL) { + goto end; + } + res = listener_check(test, listener, 1, 1, 1, 0, 3, 1); + +end: + if (pool) { + ast_threadpool_shutdown(pool); + } + ao2_cleanup(listener); + ast_free(std); + return res; } AST_TEST_DEFINE(threadpool_reactivation) @@ -751,6 +830,7 @@ AST_TEST_DEFINE(threadpool_reactivation) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -913,6 +993,7 @@ AST_TEST_DEFINE(threadpool_task_distribution) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -1001,6 +1082,7 @@ AST_TEST_DEFINE(threadpool_more_destruction) struct ast_threadpool_options options = { .version = AST_THREADPOOL_OPTIONS_VERSION, .idle_timeout = 0, + .auto_increment = 0, }; switch (cmd) { @@ -1104,6 +1186,7 @@ static int unload_module(void) ast_test_unregister(threadpool_one_task_one_thread); ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); + ast_test_unregister(threadpool_auto_increment); ast_test_unregister(threadpool_reactivation); ast_test_unregister(threadpool_task_distribution); ast_test_unregister(threadpool_more_destruction); @@ -1119,6 +1202,7 @@ static int load_module(void) ast_test_register(threadpool_one_task_one_thread); ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks); + ast_test_register(threadpool_auto_increment); ast_test_register(threadpool_reactivation); ast_test_register(threadpool_task_distribution); ast_test_register(threadpool_more_destruction); |