From e7ce12839daeb903eeaff999171025e6652b7c26 Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Mon, 3 Dec 2012 16:59:26 +0000 Subject: This now compiles. That's a milestone, of sorts. Things really need arranging/documenting, and there's no function to be able to push tasks to a threadpool. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377036 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/threadpool.c | 282 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 218 insertions(+), 64 deletions(-) (limited to 'main') diff --git a/main/threadpool.c b/main/threadpool.c index 15450bc2f..7b7df4e68 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -21,17 +21,21 @@ #include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" +#include "asterisk/astobj2.h" +#include "asterisk/utils.h" #define THREAD_BUCKETS 89 static int id_counter; struct ast_threadpool { - struct ast_threadpool_listener *threadpool_listener; + struct ast_threadpool_listener *listener; struct ao2_container *active_threads; struct ao2_container *idle_threads; struct ao2_container *zombie_threads; -} + struct ast_taskprocessor *tps; + struct ast_taskprocessor *control_tps; +}; enum worker_state { ALIVE, @@ -49,9 +53,9 @@ struct worker_thread { int wake_up; }; -static int worker_thread_hash(const void *obj) +static int worker_thread_hash(const void *obj, int flags) { - struct worker_thread *worker= obj; + const struct worker_thread *worker = obj; return worker->id; } @@ -64,9 +68,26 @@ static int worker_thread_cmp(void *obj, void *arg, int flags) return worker1->id == worker2->id ? CMP_MATCH : 0; } -static worker_thread *worker_thread_alloc(struct ast_threadpool *pool) +static void worker_thread_destroy(void *obj) +{ + struct worker_thread *worker = obj; + ast_mutex_destroy(&worker->lock); + ast_cond_destroy(&worker->cond); +} + +static int worker_active(struct worker_thread *worker); + +static void *worker_start(void *arg) { - struct worker_thread *worker = ao2_alloc(1, sizeof(*worker)); + struct worker_thread *worker = arg; + + worker_active(worker); + return NULL; +} + +static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) +{ + struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy); if (!worker) { /* XXX Dangit! */ return NULL; @@ -77,7 +98,7 @@ static worker_thread *worker_thread_alloc(struct ast_threadpool *pool) worker->pool = pool; worker->thread = AST_PTHREADT_NULL; worker->state = ALIVE; - if (ast_pthread_create(&worker->thread, NULL, worker_active, worker) < 0) { + if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { /* XXX Poop! */ ao2_ref(worker, -1); return NULL; @@ -106,7 +127,7 @@ static void thread_worker_pair_destructor(void *obj) ao2_ref(pair->worker, -1); } -struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool, +static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool, struct worker_thread *worker) { struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor); @@ -114,8 +135,10 @@ struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool, /*XXX Crap */ return NULL; } - pair->pool = ao2_ref(pool); - pair->worker = ao2_ref(worker); + ao2_ref(pool, +1); + pair->pool = pool; + ao2_ref(worker, +1); + pair->worker = worker; return pair; } @@ -140,7 +163,7 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool, /*XXX Crap */ return; } - ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle(pair)); + ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair); } static int queued_zombie_thread_dead(void *data) @@ -162,23 +185,28 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, /* XXX Crap */ return; } - ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead(pair)); + ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); } static int worker_idle(struct worker_thread *worker) { SCOPED_MUTEX(lock, &worker->lock); if (worker->state != ALIVE) { - return false; + return 0; } threadpool_active_thread_idle(worker->pool, worker); while (!worker->wake_up) { ast_cond_wait(&worker->cond, lock); } - worker->wake_up = false; + worker->wake_up = 0; return worker->state == ALIVE; } +static int threadpool_execute(struct ast_threadpool *pool) +{ + return ast_taskprocessor_execute(pool->tps); +} + static int worker_active(struct worker_thread *worker) { int alive = 1; @@ -203,13 +231,47 @@ static int worker_active(struct worker_thread *worker) return 0; } +static void worker_set_state(struct worker_thread *worker, enum worker_state state) +{ + SCOPED_MUTEX(lock, &worker->lock); + worker->state = state; + worker->wake_up = 1; + ast_cond_signal(&worker->cond); +} + +static int worker_shutdown(void *obj, void *arg, int flags) +{ + struct worker_thread *worker = obj; + + worker_set_state(worker, DEAD); + if (worker->thread != AST_PTHREADT_NULL) { + pthread_join(worker->thread, NULL); + worker->thread = AST_PTHREADT_NULL; + } + return 0; +} + +static void threadpool_tps_listener_destroy(void *private_data) +{ + struct ast_threadpool *pool = private_data; + /* XXX Probably should let the listener know we're being destroyed? */ + + /* Threads should all be shut down by now, so this should be a painless + * operation + */ + ao2_ref(pool->active_threads, -1); + ao2_ref(pool->idle_threads, -1); + ao2_ref(pool->zombie_threads, -1); + ao2_ref(pool->listener, -1); + ao2_ref(pool, -1); +} static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener) { - RAII_VAR(ast_threadpool *, pool, - ao2_alloc(sizeof(*pool), threadpool_destroy), ao2_cleanup); + RAII_VAR(struct ast_threadpool *, pool, + ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup); - pool->control_tps = ast_taskprocessor_get(/* XXX ??? */, TPS_REF_DEFAULT); + pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT); if (!pool->control_tps) { return NULL; } @@ -222,32 +284,115 @@ static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *li return NULL; } pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); - if (!pool->zombie_thread) { + if (!pool->zombie_threads) { return NULL; } + pool->tps = listener->tps; + ao2_ref(pool, +1); return pool; } -static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener) +struct task_pushed_data { + struct ast_threadpool *pool; + int was_empty; +}; + +static void task_pushed_data_destroy(void *obj) { - /* XXX stub */ + struct task_pushed_data *tpd = obj; + ao2_ref(tpd->pool, -1); } -static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) +static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool, + int was_empty) { - /* XXX stub */ + struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), + task_pushed_data_destroy); + + if (!tpd) { + return NULL; + } + ao2_ref(pool, +1); + tpd->pool = pool; + tpd->was_empty = was_empty; + return tpd; } -static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) +static int activate_threads(void *obj, void *arg, int flags) +{ + struct worker_thread *worker = obj; + struct ast_threadpool *pool = arg; + + ao2_link(pool->active_threads, worker); + worker_set_state(worker, ALIVE); + return 0; +} + +static int handle_task_pushed(void *data) { - /* XXX stub */ + struct task_pushed_data *tpd = data; + struct ast_threadpool *pool = tpd->pool; + int was_empty = tpd->was_empty; + + pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty); + ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool); + ao2_ref(tpd, -1); + return 0; } -static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener) +static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, + int was_empty) { - /* XXX stub */ + struct ast_threadpool *pool = listener->private_data; + struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty); + + if (!tpd) { + /* XXX Drat! */ + return; + } + + ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd); +} + +static int handle_emptied(void *data) +{ + struct ast_threadpool *pool = data; + + pool->listener->callbacks->emptied(pool->listener); + ao2_ref(pool, -1); + return 0; +} + +static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) +{ + struct ast_threadpool *pool = listener->private_data; + + ao2_ref(pool, +1); + ast_taskprocessor_push(pool->control_tps, handle_emptied, pool); +} + +static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) +{ + /* + * The threadpool triggers the taskprocessor to shut down. As a result, + * we have the freedom of shutting things down in three stages: + * + * 1) Before the tasprocessor is shut down + * 2) During taskprocessor shutdown (here) + * 3) After taskprocessor shutdown + * + * In the spirit of the taskprocessor shutdown, this would be + * where we make sure that all the worker threads are no longer + * executing. We could just do this before we even shut down + * the taskprocessor, but this feels more "right". + */ + + struct ast_threadpool *pool = listener->private_data; + ao2_callback(pool->active_threads, 0, worker_shutdown, NULL); + ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL); + ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL); } static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { @@ -258,25 +403,6 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb .destroy = threadpool_tps_listener_destroy, }; -/*! - * \brief Allocate the taskprocessor to be used for the threadpool - * - * We use a custom taskprocessor listener. We allocate our custom - * listener and then create a taskprocessor. - */ -static struct ast_taskprocessor_listener *threadpool_tps_alloc(void) -{ - RAII_VAR(struct threadpool_tps_listener *, tps_listener, - ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks), - ao2_cleanup); - - if (!tps_listener) { - return NULL; - } - - return ast_taskprocessor_create_with_listener(tps_listener); -} - static void grow(struct ast_threadpool *pool, int delta) { int i; @@ -292,9 +418,11 @@ static void grow(struct ast_threadpool *pool, int delta) static int kill_threads(void *obj, void *arg, int flags) { + struct worker_thread *worker = obj; int *num_to_kill = arg; if ((*num_to_kill)-- > 0) { + worker_shutdown(worker, arg, flags); return CMP_MATCH; } else { return CMP_STOP; @@ -309,6 +437,7 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags) if ((*num_to_zombify)-- > 0) { ao2_link(pool->zombie_threads, worker); + worker_set_state(worker, ZOMBIE); return CMP_MATCH; } else { return CMP_STOP; @@ -325,7 +454,6 @@ static void shrink(struct ast_threadpool *pool, int delta) int idle_threads = ao2_container_count(pool->idle_threads); int idle_threads_to_kill = MIN(delta, idle_threads); int active_threads_to_zombify = delta - idle_threads_to_kill; - int i = 0; ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, kill_threads, &idle_threads_to_kill); @@ -335,16 +463,31 @@ static void shrink(struct ast_threadpool *pool, int delta) } struct set_size_data { - struct threadpool *pool; + struct ast_threadpool *pool; int size; }; -void set_size_data_destroy(void *obj) +static void set_size_data_destroy(void *obj) { struct set_size_data *ssd = obj; ao2_ref(ssd->pool, -1); } +static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, + int size) +{ + struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy); + if (!ssd) { + /* XXX Crap */ + return NULL; + } + + ao2_ref(pool, +1); + ssd->pool = pool; + ssd->size = size; + return ssd; +} + static int queued_set_size(void *data) { struct set_size_data *ssd = data; @@ -355,7 +498,7 @@ static int queued_set_size(void *data) int current_size = ao2_container_count(pool->active_threads) + ao2_container_count(pool->idle_threads); - if (current_size = num_threads) { + if (current_size == num_threads) { return 0; } @@ -366,41 +509,52 @@ static int queued_set_size(void *data) } threadpool_send_state_changed(pool); - ao2_ref(set_size_data, -1); + ao2_ref(ssd, -1); + return 0; } -void ast_threadpool_set_size(struct ast_threadpool *pool, int size) +void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) { struct set_size_data *ssd; - if (size < 0) { - ast_log(LOG_WARNING, "Invalid threadpool size used for resizing: %d\n", size); - return; - } - ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy); + ssd = set_size_data_alloc(pool, size); if (!ssd) { - /* XXX Crap */ + /* XXX *groan* */ return; } - ssd->pool = ao2_ref(pool); - ssd->size = size; - ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); } struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) { struct ast_threadpool *pool; - RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference); + struct ast_taskprocessor *tps; + RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, + ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks), + ao2_cleanup); + + if (!tps_listener) { + return NULL; + } + + tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener); if (!tps) { return NULL; } - pool = tps->listener->private_data; - pool->tps = tps; + pool = tps_listener->private_data; ast_threadpool_set_size(pool, initial_size); - return pool; } + +void ast_threadpool_shutdown(struct ast_threadpool *pool) +{ + /* Pretty simple really. We just shut down the + * taskprocessors and everything else just + * takes care of itself via the taskprocessor callbacks + */ + ast_taskprocessor_unreference(pool->control_tps); + ast_taskprocessor_unreference(pool->tps); +} -- cgit v1.2.3