summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-12-10 06:13:09 +0000
committerMark Michelson <mmichelson@digium.com>2012-12-10 06:13:09 +0000
commit64deed062a6c79124dbc3b5d09c770a7a3f20f1c (patch)
tree8e182a35753bcb81b53d973dbbd8d89e150b813d /main
parent5dd22df050264299b42160daeaa7701c81488ceb (diff)
Add threadpool options and accompanying test.
The only test added so far is an idle thread timeout option. This will greatly aid threadpool users who wish to maintain a threadpool by allowing for idle threads to die out as necessary. Test passes. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377580 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main')
-rw-r--r--main/threadpool.c59
1 files changed, 56 insertions, 3 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 45e863805..18c1349ae 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -95,6 +95,8 @@ struct ast_threadpool {
struct ast_taskprocessor *control_tps;
/*! True if the threadpool is in the processof shutting down */
int shutting_down;
+ /*! Threadpool-specific options */
+ struct ast_threadpool_options options;
};
/*!
@@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
}
+static int queued_idle_thread_dead(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_unlink(pair->pool->idle_threads, pair->worker);
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
+ if (!pair) {
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
+}
+
/*!
* \brief Execute a task in the threadpool
*
@@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
return listener;
}
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
+struct pool_options_pair {
+ struct ast_threadpool *pool;
+ struct ast_threadpool_options options;
+};
+
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+ int initial_size, const struct ast_threadpool_options *options)
{
struct ast_threadpool *pool;
struct ast_taskprocessor *tps;
@@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
pool->tps = tps;
ao2_ref(listener, +1);
pool->listener = listener;
+ pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
return pool;
}
@@ -814,6 +849,8 @@ struct worker_thread {
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
+ /*! Options for this threadpool */
+ struct ast_threadpool_options options;
};
/*!
@@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
- ast_log(LOG_NOTICE, "Worker dying\n");
+ ast_debug(1, "Destroying worker thread\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
@@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
+ worker->options = pool->options;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
@@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker)
*/
static int worker_idle(struct worker_thread *worker)
{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + worker->options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000,
+ };
SCOPED_MUTEX(lock, &worker->lock);
if (worker->state != ALIVE) {
return 0;
}
threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
- ast_cond_wait(&worker->cond, lock);
+ if (worker->options.idle_timeout <= 0) {
+ ast_cond_wait(&worker->cond, lock);
+ } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ if (!worker->wake_up) {
+ ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
+ threadpool_idle_thread_dead(worker->pool, worker);
+ worker->state = DEAD;
}
worker->wake_up = 0;
return worker->state == ALIVE;