summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2013-06-20 16:29:35 +0000
committerMark Michelson <mmichelson@digium.com>2013-06-20 16:29:35 +0000
commit33eb15a242694e47707b70065c86fad19035d5c8 (patch)
treea5c0a5c953d6534e6049f9febfb5eca6a24abdba /main/threadpool.c
parent6e6652518d43f26ac70ef6378b282464dcd62c50 (diff)
Fix threadpool rapid growth problem.
When a threadpool is set to autoincrement its threadcount, an issue may arise when multiple tasks are queued at once into the threadpool. Since threads start active, each new task would result in autoincrementing the thread count. So if all threads were active, and a thread's autoincrement value were 5, then 3 new tasks would result in 15 threads being created even though the initial autoincrement was sufficient to handle the number of tasks. This change introduces three behavior changes: 1) New threads in the threadpool start idle instead of active. 2) When a threadpool autoincrements, one thread is activated after the growth. 3) When a threadpool's size is incremented manually, all added threads are activated. For a more detailed explanation about the changes, please see the Review Board link at the bottom of this commit. Review: https://reviewboard.asterisk.org/r/2629 git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@392318 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c87
1 files changed, 51 insertions, 36 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 1ff76014a..6cb241bcf 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -510,7 +510,7 @@ static void grow(struct ast_threadpool *pool, int delta)
if (!worker) {
return;
}
- if (ao2_link(pool->active_threads, worker)) {
+ if (ao2_link(pool->idle_threads, worker)) {
if (worker_thread_start(worker)) {
ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
ao2_unlink(pool->active_threads, worker);
@@ -536,24 +536,21 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
- int state_changed;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
if (ao2_container_count(pool->idle_threads) == 0) {
- if (pool->options.auto_increment > 0) {
- grow(pool, pool->options.auto_increment);
- state_changed = 1;
+ if (!pool->options.auto_increment) {
+ return 0;
}
- } else {
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
- state_changed = 1;
- }
- if (state_changed) {
- threadpool_send_state_changed(pool);
+ grow(pool, pool->options.auto_increment);
}
+
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
}
@@ -808,6 +805,8 @@ static int queued_set_size(void *data)
if (current_size < num_threads) {
grow(pool, num_threads - current_size);
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
} else {
shrink(pool, current_size - num_threads);
}
@@ -986,7 +985,31 @@ static void *worker_start(void *arg)
if (worker->options.thread_start) {
worker->options.thread_start();
}
- worker_active(worker);
+
+ ast_mutex_lock(&worker->lock);
+ while (worker_idle(worker)) {
+ ast_mutex_unlock(&worker->lock);
+ worker_active(worker);
+ ast_mutex_lock(&worker->lock);
+ if (worker->state != ALIVE) {
+ break;
+ }
+ threadpool_active_thread_idle(worker->pool, worker);
+ }
+ ast_mutex_unlock(&worker->lock);
+
+ /* Reaching this portion means the thread is
+ * on death's door. It may have been killed while
+ * it was idle, in which case it can just die
+ * peacefully. If it's a zombie, though, then
+ * it needs to let the pool know so
+ * that the thread can be removed from the
+ * list of zombie threads.
+ */
+ if (worker->state == ZOMBIE) {
+ threadpool_zombie_thread_dead(worker->pool, worker);
+ }
+
if (worker->options.thread_end) {
worker->options.thread_end();
}
@@ -1035,24 +1058,19 @@ static int worker_thread_start(struct worker_thread *worker)
*/
static void worker_active(struct worker_thread *worker)
{
- int alive = 1;
- while (alive) {
- if (!threadpool_execute(worker->pool)) {
- alive = worker_idle(worker);
- }
- }
+ int alive;
- /* Reaching this portion means the thread is
- * on death's door. It may have been killed while
- * it was idle, in which case it can just die
- * peacefully. If it's a zombie, though, then
- * it needs to let the pool know so
- * that the thread can be removed from the
- * list of zombie threads.
+ /* The following is equivalent to
+ *
+ * while (threadpool_execute(worker->pool));
+ *
+ * However, reviewers have suggested in the past
+ * doing that can cause optimizers to (wrongly)
+ * optimize the code away.
*/
- if (worker->state == ZOMBIE) {
- threadpool_zombie_thread_dead(worker->pool, worker);
- }
+ do {
+ alive = threadpool_execute(worker->pool);
+ } while (alive);
}
/*!
@@ -1061,6 +1079,8 @@ static void worker_active(struct worker_thread *worker)
* The worker waits here until it gets told by the threadpool
* to wake up.
*
+ * worker is locked before entering this function.
+ *
* \param worker The idle worker
* \retval 0 The thread is being woken up so that it can conclude.
* \retval non-zero The thread is being woken up to do more work.
@@ -1072,15 +1092,10 @@ static int worker_idle(struct worker_thread *worker)
.tv_sec = start.tv_sec + worker->options.idle_timeout,
.tv_nsec = start.tv_usec * 1000,
};
- SCOPED_MUTEX(lock, &worker->lock);
- if (worker->state != ALIVE) {
- return 0;
- }
- threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
if (worker->options.idle_timeout <= 0) {
- ast_cond_wait(&worker->cond, lock);
- } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+ ast_cond_wait(&worker->cond, &worker->lock);
+ } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
break;
}
}