summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c42
1 files changed, 26 insertions, 16 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index b603e5738..35076b06e 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -75,8 +75,8 @@ struct ast_taskprocessor {
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
struct ast_taskprocessor_listener *listener;
- /*! Indicates if the taskprocessor is in the process of shuting down */
- unsigned int shutting_down:1;
+ /*! Indicates if the taskprocessor is currently executing a task */
+ unsigned int executing:1;
};
/*!
@@ -197,6 +197,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ ast_assert(!pvt->dead);
+
if (was_empty) {
default_tps_wake_up(pvt, 0);
}
@@ -447,10 +449,6 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task *task;
SCOPED_AO2LOCK(lock, tps);
- if (tps->shutting_down) {
- return NULL;
- }
-
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
}
@@ -643,6 +641,7 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
{
struct tps_task *t;
int previous_size;
+ int was_empty;
if (!tps || !task_exe) {
ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -655,8 +654,10 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
+ /* The currently executing task counts as still in queue */
+ was_empty = tps->executing ? 0 : previous_size == 0;
ao2_unlock(tps);
- tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
+ tps->listener->callbacks->task_pushed(tps->listener, was_empty);
return 0;
}
@@ -665,17 +666,26 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
struct tps_task *t;
int size;
- if (!(t = tps_taskprocessor_pop(tps))) {
- return 0;
- }
+ ao2_lock(tps);
+ tps->executing = 1;
+ ao2_unlock(tps);
- t->execute(t->datap);
+ t = tps_taskprocessor_pop(tps);
- tps_task_free(t);
+ if (t) {
+ t->execute(t->datap);
+ tps_task_free(t);
+ }
ao2_lock(tps);
+ /* We need to check size in the same critical section where we reset the
+ * executing bit. Avoids a race condition where a task is pushed right
+ * after we pop an empty stack.
+ */
+ tps->executing = 0;
size = tps_taskprocessor_depth(tps);
- if (tps->stats) {
+ /* If we executed a task, bump the stats */
+ if (t && tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@@ -683,9 +693,9 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
ao2_unlock(tps);
- if (size == 0 && tps->listener->callbacks->emptied) {
+ /* If we executed a task, check for the transition to empty */
+ if (t && size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
- return 0;
}
- return 1;
+ return size > 0;
}