summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
committerDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
commit222e8a3afb6aabb31052fe76fa4f57fe26f69688 (patch)
treed7f7494c130d9372bdc43592e762ee83b4a78aec /main/taskprocessor.c
parente9ff351f06c1cb7ad0ee59b3ff09b804a23eb3a0 (diff)
Add a serializer interface to the threadpool
This patch adds the ability to create a serializer from a thread pool. A serializer is a ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread. While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals. This patch also fixes a bug in how the 'was_empty' parameter is computed for the push callback, and gets rid of the unused 'shutting_down' field. Review: https://reviewboard.asterisk.org/r/2323/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@381326 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c42
1 files changed, 26 insertions, 16 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index b603e5738..35076b06e 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -75,8 +75,8 @@ struct ast_taskprocessor {
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
struct ast_taskprocessor_listener *listener;
- /*! Indicates if the taskprocessor is in the process of shuting down */
- unsigned int shutting_down:1;
+ /*! Indicates if the taskprocessor is currently executing a task */
+ unsigned int executing:1;
};
/*!
@@ -197,6 +197,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+ ast_assert(!pvt->dead);
+
if (was_empty) {
default_tps_wake_up(pvt, 0);
}
@@ -447,10 +449,6 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task *task;
SCOPED_AO2LOCK(lock, tps);
- if (tps->shutting_down) {
- return NULL;
- }
-
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
}
@@ -643,6 +641,7 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
{
struct tps_task *t;
int previous_size;
+ int was_empty;
if (!tps || !task_exe) {
ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -655,8 +654,10 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
+ /* The currently executing task counts as still in queue */
+ was_empty = tps->executing ? 0 : previous_size == 0;
ao2_unlock(tps);
- tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
+ tps->listener->callbacks->task_pushed(tps->listener, was_empty);
return 0;
}
@@ -665,17 +666,26 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
struct tps_task *t;
int size;
- if (!(t = tps_taskprocessor_pop(tps))) {
- return 0;
- }
+ ao2_lock(tps);
+ tps->executing = 1;
+ ao2_unlock(tps);
- t->execute(t->datap);
+ t = tps_taskprocessor_pop(tps);
- tps_task_free(t);
+ if (t) {
+ t->execute(t->datap);
+ tps_task_free(t);
+ }
ao2_lock(tps);
+ /* We need to check size in the same critical section where we reset the
+ * executing bit. Avoids a race condition where a task is pushed right
+ * after we pop an empty stack.
+ */
+ tps->executing = 0;
size = tps_taskprocessor_depth(tps);
- if (tps->stats) {
+ /* If we executed a task, bump the stats */
+ if (t && tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@@ -683,9 +693,9 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
ao2_unlock(tps);
- if (size == 0 && tps->listener->callbacks->emptied) {
+ /* If we executed a task, check for the transition to empty */
+ if (t && size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
- return 0;
}
- return 1;
+ return size > 0;
}