summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
authorJoshua Colp <jcolp@digium.com>2015-11-11 13:04:08 -0400
committerJoshua Colp <jcolp@digium.com>2015-11-11 14:18:39 -0500
commit2954354404e915aa4d20f1ee400c8a3a835fac7b (patch)
tree009b7d00c9cbb96d00fdaa1f3580c94ea13534d0 /main/threadpool.c
parentc1abe8906dd4696bc85697cfbbff2c9fca6af4d6 (diff)
threadpool: Handle worker thread transitioning to dead when going active.
This change adds handling of dead worker threads when moving them to be active. When this happens the worker thread is removed from both the active and idle threads container. If no threads are able to be moved to active then the pool grows as configured. A unit test has also been added which thrashes the idle timeout and thread activation to exploit any race conditions between the two. ASTERISK-25546 #close Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c67
1 files changed, 59 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..46de9b7f8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */