summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c67
1 files changed, 59 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..46de9b7f8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */