From bdd8da406bdce8dcb92104f8bcbbb8c42f57646d Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Mon, 7 Jan 2013 22:16:06 +0000 Subject: Address review board feedback from Matt and Richard * Remove extraneous whitespace * Bump up debug levels of messages and add identifying info to messages. * Account for potential failures of ao2_link() * Add additional test and some more test data * Add some comments in places where they could be useful * Make threadpool listeners and their callbacks optional git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@378652 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- include/asterisk/taskprocessor.h | 11 +++- include/asterisk/threadpool.h | 6 +- main/taskprocessor.c | 4 +- main/threadpool.c | 125 ++++++++++++++++++++++++--------------- tests/test_threadpool.c | 107 +++++++++++++++++++++++++++++---- 5 files changed, 190 insertions(+), 63 deletions(-) diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index dc110e2e5..7720547d6 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -133,6 +133,8 @@ struct ast_taskprocessor_listener_callbacks { /*! * \brief A listener for taskprocessors * + * \since 12.0.0 + * * When a taskprocessor's state changes, the listener * is notified of the change. This allows for tasks * to be addressed in whatever way is appropriate for @@ -148,7 +150,9 @@ struct ast_taskprocessor_listener { }; /*! - * Allocate a taskprocessor listener + * \brief Allocate a taskprocessor listener + * + * \since 12.0.0 * * This will result in the listener being allocated with the specified * callbacks. @@ -176,6 +180,8 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o /*! * \brief Create a taskprocessor with a custom listener * + * \since 12.0.0 + * * The listener's alloc() and start() callbacks will be called during this function. * * \param name The name of the taskprocessor to create @@ -209,6 +215,9 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * /*! * \brief Pop a task off the taskprocessor and execute it. + * + * \since 12.0.0 + * * \param tps The taskprocessor from which to execute. * \retval 0 There is no further work to be done. * \retval 1 Tasks still remain in the taskprocessor queue. diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index e492eae9c..f3995ffe7 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -55,8 +55,8 @@ struct ast_threadpool_listener_callbacks { struct ast_threadpool_listener *listener, int was_empty); /*! - * \brief Indicates the threadpoo's taskprocessor has become empty - * + * \brief Indicates the threadpool's taskprocessor has become empty + * * \param listener The threadpool's listener */ void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener); @@ -139,7 +139,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name, * * This number may be more or less than the current number of * threads in the threadpool. - * + * * \param threadpool The threadpool to adjust * \param size The new desired size of the threadpool */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 3ba544292..9b26263bb 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -38,7 +38,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/cli.h" #include "asterisk/taskprocessor.h" - /*! * \brief tps_task structure is queued to a taskprocessor * @@ -560,6 +559,9 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam ao2_ref(p, +1); listener->tps = p; + /* Allocation of private data must come after setting taskprocessor parameters + * so that listeners who rely on taskprocessor data will have access to it. + */ listener->private_data = listener->callbacks->alloc(listener); if (!listener->private_data) { return NULL; diff --git a/main/threadpool.c b/main/threadpool.c index 62664428d..05a5f8dd1 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -35,24 +35,24 @@ struct ast_threadpool { /*! Threadpool listener */ struct ast_threadpool_listener *listener; - /*! + /*! * \brief The container of active threads. * Active threads are those that are currently running tasks */ struct ao2_container *active_threads; - /*! + /*! * \brief The container of idle threads. * Idle threads are those that are currenly waiting to run tasks */ struct ao2_container *idle_threads; - /*! + /*! * \brief The container of zombie threads. * Zombie threads may be running tasks, but they are scheduled to die soon */ struct ao2_container *zombie_threads; - /*! + /*! * \brief The main taskprocessor - * + * * Tasks that are queued in this taskprocessor are * doled out to the worker threads. Worker threads that * execute tasks from the threadpool are executing tasks @@ -122,14 +122,36 @@ enum worker_state { DEAD, }; +/*! + * A thread that executes threadpool tasks + */ +struct worker_thread { + /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */ + int id; + /*! Condition used in conjunction with state changes */ + ast_cond_t cond; + /*! Lock used alongside the condition for state changes */ + ast_mutex_t lock; + /*! The actual thread that is executing tasks */ + pthread_t thread; + /*! A pointer to the threadpool. Needed to be able to execute tasks */ + struct ast_threadpool *pool; + /*! The current state of the worker thread */ + enum worker_state state; + /*! A boolean used to determine if an idle thread should become active */ + int wake_up; + /*! Options for this threadpool */ + struct ast_threadpool_options options; +}; + /* Worker thread forward declarations. See definitions for documentation */ -struct worker_thread; static int worker_thread_hash(const void *obj, int flags); static int worker_thread_cmp(void *obj, void *arg, int flags); static void worker_thread_destroy(void *obj); static void worker_active(struct worker_thread *worker); static void *worker_start(void *arg); static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); +static int worker_thread_start(struct worker_thread *worker); static int worker_idle(struct worker_thread *worker); static void worker_set_state(struct worker_thread *worker, enum worker_state state); static void worker_shutdown(struct worker_thread *worker); @@ -145,7 +167,9 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool) int active_size = ao2_container_count(pool->active_threads); int idle_size = ao2_container_count(pool->idle_threads); - pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size); + if (pool->listener && pool->listener->callbacks->state_changed) { + pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size); + } } /*! @@ -296,7 +320,7 @@ static void threadpool_idle_thread_dead(struct ast_threadpool *pool, /*! * \brief Execute a task in the threadpool - * + * * This is the function that worker threads call in order to execute tasks * in the threadpool * @@ -430,7 +454,14 @@ static int activate_thread(void *obj, void *arg, int flags) struct worker_thread *worker = obj; struct ast_threadpool *pool = arg; - ao2_link(pool->active_threads, worker); + if (!ao2_link(pool->active_threads, worker)) { + /* If we can't link the idle thread into the active container, then + * we'll just leave the thread idle and not wake it up. + */ + ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n", + worker->id); + return 0; + } worker_set_state(worker, ALIVE); return CMP_MATCH; } @@ -446,14 +477,22 @@ static void grow(struct ast_threadpool *pool, int delta) { int i; - ast_debug(1, "Going to increase threadpool size by %d\n", delta); + ast_debug(3, "Increasing threadpool %s's size by %d\n", + ast_taskprocessor_name(pool->tps), delta); for (i = 0; i < delta; ++i) { struct worker_thread *worker = worker_thread_alloc(pool); if (!worker) { return; } - ao2_link(pool->active_threads, worker); + if (ao2_link(pool->active_threads, worker)) { + if (worker_thread_start(worker)) { + ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id); + ao2_unlink(pool->active_threads, worker); + } + } else { + ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id); + } ao2_ref(worker, -1); } } @@ -474,7 +513,9 @@ static int queued_task_pushed(void *data) int was_empty = tpd->was_empty; int state_changed; - pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); + if (pool->listener && pool->listener->callbacks->task_pushed) { + pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); + } if (ao2_container_count(pool->idle_threads) == 0) { if (pool->options.auto_increment > 0) { grow(pool, pool->options.auto_increment); @@ -530,6 +571,7 @@ static int queued_emptied(void *data) { struct ast_threadpool *pool = data; + /* We already checked for existence of this callback when this was queued */ pool->listener->callbacks->emptied(pool, pool->listener); return 0; } @@ -550,7 +592,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) return; } - ast_taskprocessor_push(pool->control_tps, queued_emptied, pool); + if (pool->listener && pool->listener->callbacks->emptied) { + ast_taskprocessor_push(pool->control_tps, queued_emptied, pool); + } } /*! @@ -649,7 +693,10 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags) int *num_to_zombify = data; if ((*num_to_zombify)-- > 0) { - ao2_link(pool->zombie_threads, worker); + if (!ao2_link(pool->zombie_threads, worker)) { + ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id); + return 0; + } worker_set_state(worker, ZOMBIE); return CMP_MATCH; } else { @@ -671,7 +718,7 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags) */ static void shrink(struct ast_threadpool *pool, int delta) { - /* + /* * Preference is to kill idle threads, but * we'll move on to deactivating active threads * if we have to @@ -680,12 +727,14 @@ static void shrink(struct ast_threadpool *pool, int delta) int idle_threads_to_kill = MIN(delta, idle_threads); int active_threads_to_zombify = delta - idle_threads_to_kill; - ast_debug(1, "Going to kill off %d idle threads\n", idle_threads_to_kill); + ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill, + ast_taskprocessor_name(pool->tps)); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, kill_threads, &idle_threads_to_kill); - ast_debug(1, "Going to kill off %d active threads\n", active_threads_to_zombify); + ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify, + ast_taskprocessor_name(pool->tps)); ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, zombify_threads, pool, &active_threads_to_zombify); @@ -828,8 +877,10 @@ struct ast_threadpool *ast_threadpool_create(const char *name, pool = tps_listener->private_data; pool->tps = tps; - ao2_ref(listener, +1); - pool->listener = listener; + if (listener) { + ao2_ref(listener, +1); + pool->listener = listener; + } pool->options = *options; ast_threadpool_set_size(pool, initial_size); return pool; @@ -856,28 +907,6 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool) ast_taskprocessor_unreference(pool->tps); } -/*! - * A thread that executes threadpool tasks - */ -struct worker_thread { - /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */ - int id; - /*! Condition used in conjunction with state changes */ - ast_cond_t cond; - /*! Lock used alongside the condition for state changes */ - ast_mutex_t lock; - /*! The actual thread that is executing tasks */ - pthread_t thread; - /*! A pointer to the threadpool. Needed to be able to execute tasks */ - struct ast_threadpool *pool; - /*! The current state of the worker thread */ - enum worker_state state; - /*! A boolean used to determine if an idle thread should become active */ - int wake_up; - /*! Options for this threadpool */ - struct ast_threadpool_options options; -}; - /*! * A monotonically increasing integer used for worker * thread identification. @@ -926,7 +955,7 @@ static void worker_shutdown(struct worker_thread *worker) static void worker_thread_destroy(void *obj) { struct worker_thread *worker = obj; - ast_debug(1, "Destroying worker thread\n"); + ast_debug(3, "Destroying worker thread %d\n", worker->id); worker_shutdown(worker); ast_mutex_destroy(&worker->lock); ast_cond_destroy(&worker->cond); @@ -972,14 +1001,14 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) worker->thread = AST_PTHREADT_NULL; worker->state = ALIVE; worker->options = pool->options; - if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { - ast_log(LOG_ERROR, "Unable to start worker thread!\n"); - ao2_ref(worker, -1); - return NULL; - } return worker; } +static int worker_thread_start(struct worker_thread *worker) +{ + return ast_pthread_create(&worker->thread, NULL, worker_start, worker); +} + /*! * \brief Active loop for worker threads * @@ -994,7 +1023,7 @@ static void worker_active(struct worker_thread *worker) { int alive = 1; while (alive) { - if (threadpool_execute(worker->pool) == 0) { + if (!threadpool_execute(worker->pool)) { alive = worker_idle(worker); } } diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 38b79b58c..fca87b4b4 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -328,6 +328,53 @@ end: return res; } +AST_TEST_DEFINE(threadpool_initial_threads) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "initial_threads"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool initialization state"; + info->description = + "Ensure that a threadpool created with a specific size contains the\n" + "proper number of idle threads."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks); + if (!listener) { + return AST_TEST_FAIL; + } + tld = listener->private_data; + + pool = ast_threadpool_create(info->name, listener, 3, &options); + if (!pool) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + +end: + if (pool) { + ast_threadpool_shutdown(pool); + } + ao2_cleanup(listener); + return res; +} + + AST_TEST_DEFINE(threadpool_thread_creation) { struct ast_threadpool *pool = NULL; @@ -557,7 +604,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) if (res == AST_TEST_FAIL) { goto end; } - + /* After completing the task, the thread should go idle */ res = wait_until_thread_state(test, tld, 0, 1); if (res == AST_TEST_FAIL) { @@ -749,7 +796,10 @@ AST_TEST_DEFINE(threadpool_auto_increment) { struct ast_threadpool *pool = NULL; struct ast_threadpool_listener *listener = NULL; - struct simple_task_data *std = NULL; + struct simple_task_data *std1 = NULL; + struct simple_task_data *std2 = NULL; + struct simple_task_data *std3 = NULL; + struct simple_task_data *std4 = NULL; enum ast_test_result_state res = AST_TEST_FAIL; struct test_listener_data *tld; struct ast_threadpool_options options = { @@ -783,17 +833,20 @@ AST_TEST_DEFINE(threadpool_auto_increment) goto end; } - std = simple_task_data_alloc(); - if (!std) { + std1 = simple_task_data_alloc(); + std2 = simple_task_data_alloc(); + std3 = simple_task_data_alloc(); + std4 = simple_task_data_alloc(); + if (!std1 || !std2 || !std3 || !std4) { goto end; } - ast_threadpool_push(pool, simple_task, std); + ast_threadpool_push(pool, simple_task, std1); /* Pushing the task should result in the threadpool growing * by three threads. This will allow the task to actually execute */ - res = wait_for_completion(test, std); + res = wait_for_completion(test, std1); if (res == AST_TEST_FAIL) { goto end; } @@ -808,14 +861,46 @@ AST_TEST_DEFINE(threadpool_auto_increment) goto end; } - res = listener_check(test, listener, 1, 1, 1, 0, 3, 1); + /* Now push three tasks into the pool and ensure the pool does not + * grow. + */ + ast_threadpool_push(pool, simple_task, std2); + ast_threadpool_push(pool, simple_task, std3); + ast_threadpool_push(pool, simple_task, std4); + + res = wait_for_completion(test, std2); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std3); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std4); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + if (res == AST_TEST_FAIL) { + goto end; + } + res = listener_check(test, listener, 1, 0, 4, 0, 3, 1); end: if (pool) { ast_threadpool_shutdown(pool); } ao2_cleanup(listener); - ast_free(std); + ast_free(std1); + ast_free(std2); + ast_free(std3); + ast_free(std4); return res; } @@ -877,7 +962,7 @@ AST_TEST_DEFINE(threadpool_reactivation) if (res == AST_TEST_FAIL) { goto end; } - + res = wait_until_thread_state(test, tld, 0, 1); if (res == AST_TEST_FAIL) { goto end; @@ -900,7 +985,7 @@ AST_TEST_DEFINE(threadpool_reactivation) if (res == AST_TEST_FAIL) { goto end; } - + res = wait_until_thread_state(test, tld, 0, 1); if (res == AST_TEST_FAIL) { goto end; @@ -1180,6 +1265,7 @@ end: static int unload_module(void) { ast_test_unregister(threadpool_push); + ast_test_unregister(threadpool_initial_threads); ast_test_unregister(threadpool_thread_creation); ast_test_unregister(threadpool_thread_destruction); ast_test_unregister(threadpool_thread_timeout); @@ -1196,6 +1282,7 @@ static int unload_module(void) static int load_module(void) { ast_test_register(threadpool_push); + ast_test_register(threadpool_initial_threads); ast_test_register(threadpool_thread_creation); ast_test_register(threadpool_thread_destruction); ast_test_register(threadpool_thread_timeout); -- cgit v1.2.3