summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/threadpool.c67
-rw-r--r--tests/test_threadpool.c83
2 files changed, 142 insertions, 8 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..46de9b7f8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index f5b1073de..42181a25c 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -571,6 +571,87 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 1,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ int iteration;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "thread_timeout_thrash";
+ info->category = "/main/threadpool/";
+ info->summary = "Thrash threadpool thread timeout";
+ info->description =
+ "Repeatedly queue a task when a threadpool thread should timeout.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return AST_TEST_FAIL;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create(info->name, listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ for (iteration = 0; iteration < 30; ++iteration) {
+ struct simple_task_data *std = NULL;
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ std = simple_task_data_alloc();
+ if (!std) {
+ goto end;
+ }
+
+ /* Wait until the threadpool thread should timeout due to being idle */
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ /* This purposely left empty as we want to loop waiting for a time out */
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ ast_threadpool_push(pool, simple_task, std);
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
+
+end:
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return res;
+}
+
AST_TEST_DEFINE(threadpool_one_task_one_thread)
{
struct ast_threadpool *pool = NULL;
@@ -1610,6 +1691,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
+ ast_test_unregister(threadpool_thread_timeout_thrash);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -1630,6 +1712,7 @@ static int load_module(void)
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
+ ast_test_register(threadpool_thread_timeout_thrash);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);