summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Colp <jcolp@digium.com>2015-11-11 13:04:08 -0400
committerJoshua Colp <jcolp@digium.com>2015-11-11 14:18:39 -0500
commit2954354404e915aa4d20f1ee400c8a3a835fac7b (patch)
tree009b7d00c9cbb96d00fdaa1f3580c94ea13534d0
parentc1abe8906dd4696bc85697cfbbff2c9fca6af4d6 (diff)
threadpool: Handle worker thread transitioning to dead when going active.
This change adds handling of dead worker threads when moving them to be active. When this happens the worker thread is removed from both the active and idle threads container. If no threads are able to be moved to active then the pool grows as configured. A unit test has also been added which thrashes the idle timeout and thread activation to exploit any race conditions between the two. ASTERISK-25546 #close Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
-rw-r--r--main/threadpool.c67
-rw-r--r--tests/test_threadpool.c83
2 files changed, 142 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..46de9b7f8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index f5b1073de..42181a25c 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -571,6 +571,87 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 1,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ int iteration;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "thread_timeout_thrash";
+ info->category = "/main/threadpool/";
+ info->summary = "Thrash threadpool thread timeout";
+ info->description =
+ "Repeatedly queue a task when a threadpool thread should timeout.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return AST_TEST_FAIL;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create(info->name, listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ for (iteration = 0; iteration < 30; ++iteration) {
+ struct simple_task_data *std = NULL;
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ std = simple_task_data_alloc();
+ if (!std) {
+ goto end;
+ }
+
+ /* Wait until the threadpool thread should timeout due to being idle */
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ /* This purposely left empty as we want to loop waiting for a time out */
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ ast_threadpool_push(pool, simple_task, std);
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
+
+end:
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return res;
+}
+
AST_TEST_DEFINE(threadpool_one_task_one_thread)
{
struct ast_threadpool *pool = NULL;
@@ -1610,6 +1691,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
+ ast_test_unregister(threadpool_thread_timeout_thrash);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -1630,6 +1712,7 @@ static int load_module(void)
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
+ ast_test_register(threadpool_thread_timeout_thrash);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);