summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/threadpool.c87
1 files changed, 51 insertions, 36 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 1ff76014a..6cb241bcf 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -510,7 +510,7 @@ static void grow(struct ast_threadpool *pool, int delta)
if (!worker) {
return;
}
- if (ao2_link(pool->active_threads, worker)) {
+ if (ao2_link(pool->idle_threads, worker)) {
if (worker_thread_start(worker)) {
ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
ao2_unlink(pool->active_threads, worker);
@@ -536,24 +536,21 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
- int state_changed;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
if (ao2_container_count(pool->idle_threads) == 0) {
- if (pool->options.auto_increment > 0) {
- grow(pool, pool->options.auto_increment);
- state_changed = 1;
+ if (!pool->options.auto_increment) {
+ return 0;
}
- } else {
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
- state_changed = 1;
- }
- if (state_changed) {
- threadpool_send_state_changed(pool);
+ grow(pool, pool->options.auto_increment);
}
+
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
}
@@ -808,6 +805,8 @@ static int queued_set_size(void *data)
if (current_size < num_threads) {
grow(pool, num_threads - current_size);
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
} else {
shrink(pool, current_size - num_threads);
}
@@ -986,7 +985,31 @@ static void *worker_start(void *arg)
if (worker->options.thread_start) {
worker->options.thread_start();
}
- worker_active(worker);
+
+ ast_mutex_lock(&worker->lock);
+ while (worker_idle(worker)) {
+ ast_mutex_unlock(&worker->lock);
+ worker_active(worker);
+ ast_mutex_lock(&worker->lock);
+ if (worker->state != ALIVE) {
+ break;
+ }
+ threadpool_active_thread_idle(worker->pool, worker);
+ }
+ ast_mutex_unlock(&worker->lock);
+
+ /* Reaching this portion means the thread is
+ * on death's door. It may have been killed while
+ * it was idle, in which case it can just die
+ * peacefully. If it's a zombie, though, then
+ * it needs to let the pool know so
+ * that the thread can be removed from the
+ * list of zombie threads.
+ */
+ if (worker->state == ZOMBIE) {
+ threadpool_zombie_thread_dead(worker->pool, worker);
+ }
+
if (worker->options.thread_end) {
worker->options.thread_end();
}
@@ -1035,24 +1058,19 @@ static int worker_thread_start(struct worker_thread *worker)
*/
static void worker_active(struct worker_thread *worker)
{
- int alive = 1;
- while (alive) {
- if (!threadpool_execute(worker->pool)) {
- alive = worker_idle(worker);
- }
- }
+ int alive;
- /* Reaching this portion means the thread is
- * on death's door. It may have been killed while
- * it was idle, in which case it can just die
- * peacefully. If it's a zombie, though, then
- * it needs to let the pool know so
- * that the thread can be removed from the
- * list of zombie threads.
+ /* The following is equivalent to
+ *
+ * while (threadpool_execute(worker->pool));
+ *
+ * However, reviewers have suggested in the past
+ * doing that can cause optimizers to (wrongly)
+ * optimize the code away.
*/
- if (worker->state == ZOMBIE) {
- threadpool_zombie_thread_dead(worker->pool, worker);
- }
+ do {
+ alive = threadpool_execute(worker->pool);
+ } while (alive);
}
/*!
@@ -1061,6 +1079,8 @@ static void worker_active(struct worker_thread *worker)
* The worker waits here until it gets told by the threadpool
* to wake up.
*
+ * worker is locked before entering this function.
+ *
* \param worker The idle worker
* \retval 0 The thread is being woken up so that it can conclude.
* \retval non-zero The thread is being woken up to do more work.
@@ -1072,15 +1092,10 @@ static int worker_idle(struct worker_thread *worker)
.tv_sec = start.tv_sec + worker->options.idle_timeout,
.tv_nsec = start.tv_usec * 1000,
};
- SCOPED_MUTEX(lock, &worker->lock);
- if (worker->state != ALIVE) {
- return 0;
- }
- threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
if (worker->options.idle_timeout <= 0) {
- ast_cond_wait(&worker->cond, lock);
- } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+ ast_cond_wait(&worker->cond, &worker->lock);
+ } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
break;
}
}