summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-12-10 06:13:09 +0000
committerMark Michelson <mmichelson@digium.com>2012-12-10 06:13:09 +0000
commit64deed062a6c79124dbc3b5d09c770a7a3f20f1c (patch)
tree8e182a35753bcb81b53d973dbbd8d89e150b813d /main/threadpool.c
parent5dd22df050264299b42160daeaa7701c81488ceb (diff)
Add threadpool options and accompanying test.
The only test added so far is an idle thread timeout option. This will greatly aid threadpool users who wish to maintain a threadpool by allowing for idle threads to die out as necessary. Test passes. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377580 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c59
1 files changed, 56 insertions, 3 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 45e863805..18c1349ae 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -95,6 +95,8 @@ struct ast_threadpool {
struct ast_taskprocessor *control_tps;
/*! True if the threadpool is in the processof shutting down */
int shutting_down;
+ /*! Threadpool-specific options */
+ struct ast_threadpool_options options;
};
/*!
@@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
}
+static int queued_idle_thread_dead(void *data)
+{
+ struct thread_worker_pair *pair = data;
+
+ ao2_unlink(pair->pool->idle_threads, pair->worker);
+ threadpool_send_state_changed(pair->pool);
+
+ ao2_ref(pair, -1);
+ return 0;
+}
+
+static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
+ struct worker_thread *worker)
+{
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
+ if (!pair) {
+ return;
+ }
+ ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
+}
+
/*!
* \brief Execute a task in the threadpool
*
@@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
return listener;
}
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
+struct pool_options_pair {
+ struct ast_threadpool *pool;
+ struct ast_threadpool_options options;
+};
+
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+ int initial_size, const struct ast_threadpool_options *options)
{
struct ast_threadpool *pool;
struct ast_taskprocessor *tps;
@@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
pool->tps = tps;
ao2_ref(listener, +1);
pool->listener = listener;
+ pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
return pool;
}
@@ -814,6 +849,8 @@ struct worker_thread {
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
+ /*! Options for this threadpool */
+ struct ast_threadpool_options options;
};
/*!
@@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
- ast_log(LOG_NOTICE, "Worker dying\n");
+ ast_debug(1, "Destroying worker thread\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
@@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
+ worker->options = pool->options;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
@@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker)
*/
static int worker_idle(struct worker_thread *worker)
{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + worker->options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000,
+ };
SCOPED_MUTEX(lock, &worker->lock);
if (worker->state != ALIVE) {
return 0;
}
threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
- ast_cond_wait(&worker->cond, lock);
+ if (worker->options.idle_timeout <= 0) {
+ ast_cond_wait(&worker->cond, lock);
+ } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ if (!worker->wake_up) {
+ ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
+ threadpool_idle_thread_dead(worker->pool, worker);
+ worker->state = DEAD;
}
worker->wake_up = 0;
return worker->state == ALIVE;