diff options
-rw-r--r-- | include/asterisk/taskprocessor.h | 41 | ||||
-rw-r--r-- | main/taskprocessor.c | 66 |
2 files changed, 57 insertions, 50 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f2cf4c63c..7f80e0269 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -22,28 +22,33 @@ * * \author Dwayne M. Hubbard <dhubbard@digium.com> * - * \note A taskprocessor is a named singleton containing a task queue that serializes tasks pushed - * into it by [a] module(s) that reference the taskprocessor. A taskprocessor is created the first - * time its name is requested via the ast_taskprocessor_get() function and destroyed when the - * taskprocessor reference count reaches zero. A taskprocessor also contains an accompanying - * listener that is told when changes in the task queue occur. + * \note A taskprocessor is a named singleton containing a task queue that + * serializes tasks pushed into it by [a] module(s) that reference the taskprocessor. + * A taskprocessor is created the first time its name is requested via the + * ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener() + * function and destroyed when the taskprocessor reference count reaches zero. A + * taskprocessor also contains an accompanying listener that is notified when changes + * in the task queue occur. * * A task is a wrapper around a task-handling function pointer and a data * pointer. A task is pushed into a taskprocessor queue using the * ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the - * taskprocessor after the task handling function returns. A module releases its reference to a - * taskprocessor using the ast_taskprocessor_unreference() function which may result in the - * destruction of the taskprocessor if the taskprocessor's reference count reaches zero. Tasks waiting - * to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero - * will be purged and released from the taskprocessor queue without being processed. - * - * The taskprocessor listener has the flexibility of doling out tasks to best fit the module's - * needs. For instance, a taskprocessor listener may have a single dispatch thread that handles - * all tasks, or it may dispatch tasks to a thread pool. - * - * There is a default taskprocessor listener that will be used if a taskprocessor is created without - * a listener. This default listener runs tasks sequentially in a single thread. The listener will - * execute tasks as long as there are tasks to be processed. + * taskprocessor after the task handling function returns. A module releases its + * reference to a taskprocessor using the ast_taskprocessor_unreference() function which + * may result in the destruction of the taskprocessor if the taskprocessor's reference + * count reaches zero. When the taskprocessor's reference count reaches zero, its + * listener's shutdown() callback will be called. Any further attempts to execute tasks + * will be denied. + * + * The taskprocessor listener has the flexibility of doling out tasks to best fit the + * module's needs. For instance, a taskprocessor listener may have a single dispatch + * thread that handles all tasks, or it may dispatch tasks to a thread pool. + * + * There is a default taskprocessor listener that will be used if a taskprocessor is + * created without any explicit listener. This default listener runs tasks sequentially + * in a single thread. The listener will execute tasks as long as there are tasks to be + * processed. When the taskprocessor is shut down, the default listener will stop + * processing tasks and join its execution thread. */ #ifndef __AST_TASKPROCESSOR_H__ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 78118edb5..69bec7ca0 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -76,6 +76,8 @@ struct ast_taskprocessor { /*! \brief Taskprocessor singleton list entry */ AST_LIST_ENTRY(ast_taskprocessor) list; struct ast_taskprocessor_listener *listener; + /*! Indicates if the taskprocessor is in the process of shuting down */ + unsigned int shutting_down:1; }; #define TPS_MAX_BUCKETS 7 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */ @@ -123,6 +125,7 @@ struct default_taskprocessor_listener_pvt { int dead; }; + static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die) { SCOPED_MUTEX(lock, &pvt->lock); @@ -131,20 +134,6 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, ast_cond_signal(&pvt->cond); } -static void listener_destroy(void *obj) -{ - struct ast_taskprocessor_listener *listener = obj; - - listener->callbacks->destroy(listener->private_data); -} - -static void listener_shutdown(struct ast_taskprocessor_listener *listener) -{ - listener->callbacks->shutdown(listener); - ao2_ref(listener->tps, -1); - listener->tps = NULL; -} - static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt) { SCOPED_MUTEX(lock, &pvt->lock); @@ -188,6 +177,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) return pvt; } +static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + + if (was_empty) { + default_tps_wake_up(pvt, 0); + } +} + +static void default_emptied(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ +} + static void default_listener_shutdown(struct ast_taskprocessor_listener *listener) { struct default_taskprocessor_listener_pvt *pvt = listener->private_data; @@ -204,20 +207,6 @@ static void default_listener_destroy(void *obj) ast_free(pvt); } -static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) -{ - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; - - if (was_empty) { - default_tps_wake_up(pvt, 0); - } -} - -static void default_emptied(struct ast_taskprocessor_listener *listener) -{ - /* No-op */ -} - static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { .alloc = default_listener_alloc, .task_pushed = default_task_pushed, @@ -438,16 +427,15 @@ static void tps_taskprocessor_destroy(void *tps) static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) { struct tps_task *task; + SCOPED_AO2LOCK(lock, tps); - if (!tps) { - ast_log(LOG_ERROR, "missing taskprocessor\n"); + if (tps->shutting_down) { return NULL; } - ao2_lock(tps); + if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { tps->tps_queue_size--; } - ao2_unlock(tps); return task; } @@ -466,6 +454,20 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } +static void listener_destroy(void *obj) +{ + struct ast_taskprocessor_listener *listener = obj; + + listener->callbacks->destroy(listener->private_data); +} + +static void listener_shutdown(struct ast_taskprocessor_listener *listener) +{ + listener->callbacks->shutdown(listener); + ao2_ref(listener->tps, -1); + listener->tps = NULL; +} + struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks) { RAII_VAR(struct ast_taskprocessor_listener *, listener, |