diff options
author | Mark Michelson <mmichelson@digium.com> | 2012-12-10 06:13:09 +0000 |
---|---|---|
committer | Mark Michelson <mmichelson@digium.com> | 2012-12-10 06:13:09 +0000 |
commit | 64deed062a6c79124dbc3b5d09c770a7a3f20f1c (patch) | |
tree | 8e182a35753bcb81b53d973dbbd8d89e150b813d /main | |
parent | 5dd22df050264299b42160daeaa7701c81488ceb (diff) |
Add threadpool options and accompanying test.
The only test added so far is an idle thread timeout
option. This will greatly aid threadpool users who wish
to maintain a threadpool by allowing for idle threads to
die out as necessary.
Test passes.
git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377580 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r-- | main/threadpool.c | 59 |
1 files changed, 56 insertions, 3 deletions
diff --git a/main/threadpool.c b/main/threadpool.c index 45e863805..18c1349ae 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -95,6 +95,8 @@ struct ast_threadpool { struct ast_taskprocessor *control_tps; /*! True if the threadpool is in the processof shutting down */ int shutting_down; + /*! Threadpool-specific options */ + struct ast_threadpool_options options; }; /*! @@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); } +static int queued_idle_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->idle_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +static void threadpool_idle_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair); +} + /*! * \brief Execute a task in the threadpool * @@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc( return listener; } -struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) +struct pool_options_pair { + struct ast_threadpool *pool; + struct ast_threadpool_options options; +}; + +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, + int initial_size, const struct ast_threadpool_options *options) { struct ast_threadpool *pool; struct ast_taskprocessor *tps; @@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis pool->tps = tps; ao2_ref(listener, +1); pool->listener = listener; + pool->options = *options; ast_threadpool_set_size(pool, initial_size); return pool; } @@ -814,6 +849,8 @@ struct worker_thread { enum worker_state state; /*! A boolean used to determine if an idle thread should become active */ int wake_up; + /*! Options for this threadpool */ + struct ast_threadpool_options options; }; /*! @@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker) static void worker_thread_destroy(void *obj) { struct worker_thread *worker = obj; - ast_log(LOG_NOTICE, "Worker dying\n"); + ast_debug(1, "Destroying worker thread\n"); worker_shutdown(worker); ast_mutex_destroy(&worker->lock); ast_cond_destroy(&worker->cond); @@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) worker->pool = pool; worker->thread = AST_PTHREADT_NULL; worker->state = ALIVE; + worker->options = pool->options; if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { ast_log(LOG_ERROR, "Unable to start worker thread!\n"); ao2_ref(worker, -1); @@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker) */ static int worker_idle(struct worker_thread *worker) { + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + worker->options.idle_timeout, + .tv_nsec = start.tv_usec * 1000, + }; SCOPED_MUTEX(lock, &worker->lock); if (worker->state != ALIVE) { return 0; } threadpool_active_thread_idle(worker->pool, worker); while (!worker->wake_up) { - ast_cond_wait(&worker->cond, lock); + if (worker->options.idle_timeout <= 0) { + ast_cond_wait(&worker->cond, lock); + } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!worker->wake_up) { + ast_debug(1, "Worker thread idle timeout reached. Dying.\n"); + threadpool_idle_thread_dead(worker->pool, worker); + worker->state = DEAD; } worker->wake_up = 0; return worker->state == ALIVE; |