diff options
-rw-r--r-- | main/threadpool.c | 39 | ||||
-rw-r--r-- | tests/test_threadpool.c | 1 |
2 files changed, 36 insertions, 4 deletions
diff --git a/main/threadpool.c b/main/threadpool.c index 8d60f878b..45e863805 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -214,7 +214,12 @@ static int queued_active_thread_idle(void *data) static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker) { - struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker); + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); if (!pair) { return; } @@ -249,7 +254,12 @@ static int queued_zombie_thread_dead(void *data) static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker) { - struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker); + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); if (!pair) { return; } @@ -268,9 +278,12 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, */ static int threadpool_execute(struct ast_threadpool *pool) { + ao2_lock(pool); if (!pool->shutting_down) { + ao2_unlock(pool); return ast_taskprocessor_execute(pool->tps); } + ao2_unlock(pool); return 0; } @@ -422,8 +435,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen int was_empty) { struct ast_threadpool *pool = listener->private_data; - struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty); + struct task_pushed_data *tpd; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + tpd = task_pushed_data_alloc(pool, was_empty); if (!tpd) { return; } @@ -456,6 +474,11 @@ static int handle_emptied(void *data) static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) { struct ast_threadpool *pool = listener->private_data; + SCOPED_AO2LOCK(lock, pool); + + if (pool->shutting_down) { + return; + } ast_taskprocessor_push(pool->control_tps, handle_emptied, pool); } @@ -690,6 +713,10 @@ static int queued_set_size(void *data) void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) { struct set_size_data *ssd; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } ssd = set_size_data_alloc(pool, size); if (!ssd) { @@ -750,6 +777,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) { + SCOPED_AO2LOCK(lock, pool); if (!pool->shutting_down) { return ast_taskprocessor_push(pool->tps, task, data); } @@ -761,7 +789,9 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool) /* Shut down the taskprocessors and everything else just * takes care of itself via the taskprocessor callbacks */ - ast_atomic_fetchadd_int(&pool->shutting_down, +1); + ao2_lock(pool); + pool->shutting_down = 1; + ao2_unlock(pool); ast_taskprocessor_unreference(pool->control_tps); ast_taskprocessor_unreference(pool->tps); } @@ -834,6 +864,7 @@ static void worker_shutdown(struct worker_thread *worker) static void worker_thread_destroy(void *obj) { struct worker_thread *worker = obj; + ast_log(LOG_NOTICE, "Worker dying\n"); worker_shutdown(worker); ast_mutex_destroy(&worker->lock); ast_cond_destroy(&worker->cond); diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index af2c0ff44..373d0c028 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -69,6 +69,7 @@ static void test_state_changed(struct ast_threadpool *pool, SCOPED_MUTEX(lock, &tld->lock); tld->num_active = active_threads; tld->num_idle = idle_threads; + ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle); ast_cond_signal(&tld->cond); } |