diff options
Diffstat (limited to 'main/threadpool.c')
-rw-r--r-- | main/threadpool.c | 67 |
1 files changed, 59 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c index d97a7adb8..46de9b7f8 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -168,7 +168,7 @@ static void *worker_start(void *arg); static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); static int worker_thread_start(struct worker_thread *worker); static int worker_idle(struct worker_thread *worker); -static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static int worker_set_state(struct worker_thread *worker, enum worker_state state); static void worker_shutdown(struct worker_thread *worker); /*! @@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags) worker->id); return 0; } - worker_set_state(worker, ALIVE); + + if (worker_set_state(worker, ALIVE)) { + ast_debug(1, "Failed to activate thread %d. It is dead\n", + worker->id); + /* The worker thread will no longer exist in the active threads or + * idle threads container after this. + */ + ao2_unlink(pool->active_threads, worker); + } + return CMP_MATCH; } @@ -538,20 +547,33 @@ static int queued_task_pushed(void *data) struct task_pushed_data *tpd = data; struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; + unsigned int existing_active; if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } - if (ao2_container_count(pool->idle_threads) == 0) { + + existing_active = ao2_container_count(pool->active_threads); + + /* The first pass transitions any existing idle threads to be active, and + * will also remove any worker threads that have recently entered the dead + * state. + */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + + /* If no idle threads could be transitioned to active grow the pool as permitted. */ + if (ao2_container_count(pool->active_threads) == existing_active) { if (!pool->options.auto_increment) { + ao2_ref(tpd, -1); return 0; } grow(pool, pool->options.auto_increment); + /* An optional second pass transitions any newly added threads. */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); } - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, - activate_thread, pool); - threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; @@ -797,7 +819,7 @@ static int queued_set_size(void *data) /* We don't count zombie threads as being "live" when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + - ao2_container_count(pool->idle_threads); + ao2_container_count(pool->idle_threads); if (current_size == num_threads) { ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n", @@ -806,6 +828,12 @@ static int queued_set_size(void *data) } if (current_size < num_threads) { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_thread, pool); + + /* As the above may have altered the number of current threads update it */ + current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); grow(pool, num_threads - current_size); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, activate_thread, pool); @@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker) * * The threadpool calls into this function in order to let a worker know * how it should proceed. + * + * \retval -1 failure (state transition not permitted) + * \retval 0 success */ -static void worker_set_state(struct worker_thread *worker, enum worker_state state) +static int worker_set_state(struct worker_thread *worker, enum worker_state state) { SCOPED_MUTEX(lock, &worker->lock); + + switch (state) { + case ALIVE: + /* This can occur due to a race condition between being told to go active + * and an idle timeout happening. + */ + if (worker->state == DEAD) { + return -1; + } + ast_assert(worker->state != ZOMBIE); + break; + case DEAD: + break; + case ZOMBIE: + ast_assert(worker->state != DEAD); + break; + } + worker->state = state; worker->wake_up = 1; ast_cond_signal(&worker->cond); + + return 0; } /*! Serializer group shutdown control object. */ |