summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Colp <jcolp@digium.com>2015-11-11 13:04:08 -0400
committerJoshua Colp <jcolp@digium.com>2015-11-11 15:06:36 -0400
commitb818d70533916aa80af7607f225e0b1e98f41648 (patch)
treee5533fd10088ba8e9a00430a3e436fa0ff5f1a6c
parentdac0bf063c756ce737aa84b246a7c4e6c317b696 (diff)
threadpool: Handle worker thread transitioning to dead when going active.
This change adds handling of dead worker threads when moving them to be active. When this happens the worker thread is removed from both the active and idle threads container. If no threads are able to be moved to active then the pool grows as configured. A unit test has also been added which thrashes the idle timeout and thread activation to exploit any race conditions between the two. ASTERISK-25546 #close Change-Id: I6c455f9a40de60d9e86458d447b548fb52ba1143
-rw-r--r--main/threadpool.c67
-rw-r--r--tests/test_threadpool.c83
2 files changed, 142 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..46de9b7f8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index f5b1073de..42181a25c 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -571,6 +571,87 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 1,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ int iteration;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "thread_timeout_thrash";
+ info->category = "/main/threadpool/";
+ info->summary = "Thrash threadpool thread timeout";
+ info->description =
+ "Repeatedly queue a task when a threadpool thread should timeout.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return AST_TEST_FAIL;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create(info->name, listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ for (iteration = 0; iteration < 30; ++iteration) {
+ struct simple_task_data *std = NULL;
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ std = simple_task_data_alloc();
+ if (!std) {
+ goto end;
+ }
+
+ /* Wait until the threadpool thread should timeout due to being idle */
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ /* This purposely left empty as we want to loop waiting for a time out */
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ ast_threadpool_push(pool, simple_task, std);
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
+
+end:
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return res;
+}
+
AST_TEST_DEFINE(threadpool_one_task_one_thread)
{
struct ast_threadpool *pool = NULL;
@@ -1610,6 +1691,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
+ ast_test_unregister(threadpool_thread_timeout_thrash);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -1630,6 +1712,7 @@ static int load_module(void)
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
+ ast_test_register(threadpool_thread_timeout_thrash);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);