diff options
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r-- | main/taskprocessor.c | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c index b603e5738..35076b06e 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -75,8 +75,8 @@ struct ast_taskprocessor { /*! \brief Taskprocessor singleton list entry */ AST_LIST_ENTRY(ast_taskprocessor) list; struct ast_taskprocessor_listener *listener; - /*! Indicates if the taskprocessor is in the process of shuting down */ - unsigned int shutting_down:1; + /*! Indicates if the taskprocessor is currently executing a task */ + unsigned int executing:1; }; /*! @@ -197,6 +197,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int { struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + ast_assert(!pvt->dead); + if (was_empty) { default_tps_wake_up(pvt, 0); } @@ -447,10 +449,6 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) struct tps_task *task; SCOPED_AO2LOCK(lock, tps); - if (tps->shutting_down) { - return NULL; - } - if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { tps->tps_queue_size--; } @@ -643,6 +641,7 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * { struct tps_task *t; int previous_size; + int was_empty; if (!tps || !task_exe) { ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor"); @@ -655,8 +654,10 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * ao2_lock(tps); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; + /* The currently executing task counts as still in queue */ + was_empty = tps->executing ? 0 : previous_size == 0; ao2_unlock(tps); - tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1); + tps->listener->callbacks->task_pushed(tps->listener, was_empty); return 0; } @@ -665,17 +666,26 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) struct tps_task *t; int size; - if (!(t = tps_taskprocessor_pop(tps))) { - return 0; - } + ao2_lock(tps); + tps->executing = 1; + ao2_unlock(tps); - t->execute(t->datap); + t = tps_taskprocessor_pop(tps); - tps_task_free(t); + if (t) { + t->execute(t->datap); + tps_task_free(t); + } ao2_lock(tps); + /* We need to check size in the same critical section where we reset the + * executing bit. Avoids a race condition where a task is pushed right + * after we pop an empty stack. + */ + tps->executing = 0; size = tps_taskprocessor_depth(tps); - if (tps->stats) { + /* If we executed a task, bump the stats */ + if (t && tps->stats) { tps->stats->_tasks_processed_count++; if (size > tps->stats->max_qsize) { tps->stats->max_qsize = size; @@ -683,9 +693,9 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) } ao2_unlock(tps); - if (size == 0 && tps->listener->callbacks->emptied) { + /* If we executed a task, check for the transition to empty */ + if (t && size == 0 && tps->listener->callbacks->emptied) { tps->listener->callbacks->emptied(tps->listener); - return 0; } - return 1; + return size > 0; } |