summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-12-10 05:25:38 +0000
committerMark Michelson <mmichelson@digium.com>2012-12-10 05:25:38 +0000
commit5dd22df050264299b42160daeaa7701c81488ceb (patch)
tree5534dc1df28fa6e820261ae05fc88f6d60913e02 /main/threadpool.c
parent03d617040a1749c8efb90cc7d125583256e8cbb9 (diff)
Improve shutdown procedure.
This helps tests to pass more often than before. They are far less likely to queue extra processes into the control taskprocessor since they are prevented once the threadpool begins to shut down. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377578 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c39
1 files changed, 35 insertions, 4 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index 8d60f878b..45e863805 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -214,7 +214,12 @@ static int queued_active_thread_idle(void *data)
static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -249,7 +254,12 @@ static int queued_zombie_thread_dead(void *data)
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker)
{
- struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+ struct thread_worker_pair *pair;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ pair = thread_worker_pair_alloc(pool, worker);
if (!pair) {
return;
}
@@ -268,9 +278,12 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
*/
static int threadpool_execute(struct ast_threadpool *pool)
{
+ ao2_lock(pool);
if (!pool->shutting_down) {
+ ao2_unlock(pool);
return ast_taskprocessor_execute(pool->tps);
}
+ ao2_unlock(pool);
return 0;
}
@@ -422,8 +435,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen
int was_empty)
{
struct ast_threadpool *pool = listener->private_data;
- struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
+ struct task_pushed_data *tpd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
+ tpd = task_pushed_data_alloc(pool, was_empty);
if (!tpd) {
return;
}
@@ -456,6 +474,11 @@ static int handle_emptied(void *data)
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
struct ast_threadpool *pool = listener->private_data;
+ SCOPED_AO2LOCK(lock, pool);
+
+ if (pool->shutting_down) {
+ return;
+ }
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
}
@@ -690,6 +713,10 @@ static int queued_set_size(void *data)
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
{
struct set_size_data *ssd;
+ SCOPED_AO2LOCK(lock, pool);
+ if (pool->shutting_down) {
+ return;
+ }
ssd = set_size_data_alloc(pool, size);
if (!ssd) {
@@ -750,6 +777,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
+ SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
}
@@ -761,7 +789,9 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool)
/* Shut down the taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks
*/
- ast_atomic_fetchadd_int(&pool->shutting_down, +1);
+ ao2_lock(pool);
+ pool->shutting_down = 1;
+ ao2_unlock(pool);
ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps);
}
@@ -834,6 +864,7 @@ static void worker_shutdown(struct worker_thread *worker)
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
+ ast_log(LOG_NOTICE, "Worker dying\n");
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);