summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--main/threadpool.c39
-rw-r--r--tests/test_threadpool.c1
2 files changed, 36 insertions, 4 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 8d60f878b..45e863805 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -214,7 +214,12 @@ static int queued_active_thread_idle(void *data)
static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -249,7 +254,12 @@ static int queued_zombie_thread_dead(void *data)
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -268,9 +278,12 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
*/
static int threadpool_execute(struct ast_threadpool *pool)
{
+ ao2_lock(pool);
if (!pool->shutting_down) {
+ ao2_unlock(pool);
return ast_taskprocessor_execute(pool->tps);
}
+ ao2_unlock(pool);
return 0;
}
@@ -422,8 +435,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen
int was_empty)
{
struct ast_threadpool *pool = listener->private_data;
- struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
+ struct task_pushed_data *tpd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
return;
}
@@ -456,6 +474,11 @@ static int handle_emptied(void *data)
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = listener->private_data;
+ SCOPED_AO2LOCK(lock, pool);
+
+ if (pool->shutting_down) {
+ return;
+ }
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
}
@@ -690,6 +713,10 @@ static int queued_set_size(void *data)
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{
struct set_size_data *ssd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
ssd = set_size_data_alloc(pool, size);
if (!ssd) {
@@ -750,6 +777,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
+ SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
}
@@ -761,7 +789,9 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool)
/* Shut down the taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
- ast_atomic_fetchadd_int(&pool->shutting_down, +1);
+ ao2_lock(pool);
+ pool->shutting_down = 1;
+ ao2_unlock(pool);
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}
@@ -834,6 +864,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
+ ast_log(LOG_NOTICE, "Worker dying\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index af2c0ff44..373d0c028 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -69,6 +69,7 @@ static void test_state_changed(struct ast_threadpool *pool,
SCOPED_MUTEX(lock, &tld->lock);
tld->num_active = active_threads;
tld->num_idle = idle_threads;
+ ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
ast_cond_signal(&tld->cond);
}