From ddde765c590bb817f01c65cba500db1720fc657f Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Thu, 29 Nov 2012 18:54:51 +0000 Subject: Commit some progress towards threadpools. Does this compile? Not even close. But I figure I don't want to lose this all in the case of some catastrophe. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376833 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/threadpool.c | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 main/threadpool.c (limited to 'main') diff --git a/main/threadpool.c b/main/threadpool.c new file mode 100644 index 000000000..362c765cf --- /dev/null +++ b/main/threadpool.c @@ -0,0 +1,161 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012, Digium, Inc. + * + * Mark Michelson + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/threadpool.h" +#include "asterisk/taskprocessor.h" + +struct ast_threadpool; + +enum worker_state { + ALIVE, + ZOMBIE, + DEAD, +}; + +struct worker_thread { + ast_cond_t cond; + ast_mutex_t lock; + pthread_t thread; + struct ast_threadpool *pool; + AST_LIST_ENTRY(struct worker_thread) next; + int wake_up; + enum worker_state state; +}; + +static int worker_idle(struct worker_thread *worker) +{ + SCOPED_MUTEX(lock, &worker->lock); + if (worker->state != ALIVE) { + return false; + } + threadpool_active_thread_idle(worker->pool, worker); + while (!worker->wake_up) { + ast_cond_wait(&worker->cond, lock); + } + worker->wake_up = false; + return worker->state == ALIVE; +} + +static int worker_active(struct worker_thread *worker) +{ + int alive = 1; + while (alive) { + if (threadpool_execute(worker->pool)) { + alive = worker_idle(worker); + } + } + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } + + return 0; +} + +struct ast_threadpool { + struct ast_threadpool_listener *threadpool_listener; + int active_threads; + int idle_threads; + int zombie_threads; +} + +static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener) +{ + RAII_VAR(ast_threadpool *, threadpool, + ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup); + + return threadpool; +} + +static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { + .alloc = threadpool_tps_listener_alloc, + .task_pushed = threadpool_tps_task_pushed, + .emptied = threadpool_tps_emptied, + .shutdown = threadpool_tps_shutdown, + .destroy = threadpool_tps_listener_destroy, +}; + +/*! + * \brief Allocate the taskprocessor to be used for the threadpool + * + * We use a custom taskprocessor listener. We allocate our custom + * listener and then create a taskprocessor. + */ +static struct ast_taskprocessor_listener *threadpool_tps_alloc(void) +{ + RAII_VAR(struct threadpool_tps_listener *, tps_listener, + ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks), + ao2_cleanup); + + if (!tps_listener) { + return NULL; + } + + return ast_taskprocessor_create_with_listener(tps_listener); +} + +void ast_threadpool_set_size(struct ast_threadpool *pool, int size) +{ +} + +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) +{ + struct ast_threadpool *pool; + RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference); + + if (!tps) { + return NULL; + } + + pool = tps->listener->private_data; + pool->tps = tps; + ast_threadpool_set_size(pool, initial_size); + + return pool; +} -- cgit v1.2.3