summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
committerDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
commit222e8a3afb6aabb31052fe76fa4f57fe26f69688 (patch)
treed7f7494c130d9372bdc43592e762ee83b4a78aec /main/threadpool.c
parente9ff351f06c1cb7ad0ee59b3ff09b804a23eb3a0 (diff)
Add a serializer interface to the threadpool
This patch adds the ability to create a serializer from a thread pool. A serializer is a ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread. While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals. This patch also fixes a bug in how the 'was_empty' parameter is computed for the push callback, and gets rid of the unused 'shutting_down' field. Review: https://reviewboard.asterisk.org/r/2323/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@381326 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c87
1 files changed, 86 insertions, 1 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index adaf8a554..ab390e9d8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -866,7 +866,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
if (!pool) {
return NULL;
}
-
+
tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
if (!tps_listener) {
return NULL;
@@ -1103,3 +1103,88 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
ast_cond_signal(&worker->cond);
}
+struct serializer {
+ struct ast_threadpool *pool;
+};
+
+static void serializer_dtor(void *obj)
+{
+ struct serializer *ser = obj;
+ ao2_cleanup(ser->pool);
+ ser->pool = NULL;
+}
+
+static struct serializer *serializer_create(struct ast_threadpool *pool)
+{
+ struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor);
+ if (!ser) {
+ return NULL;
+ }
+ ao2_ref(pool, +1);
+ ser->pool = pool;
+ return ser;
+}
+
+static int execute_tasks(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+
+ while (ast_taskprocessor_execute(tps)) {
+ /* No-op */
+ }
+
+ ast_taskprocessor_unreference(tps);
+ return 0;
+}
+
+static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
+ if (was_empty) {
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+ ast_threadpool_push(ser->pool, execute_tasks, tps);
+ }
+};
+
+static int serializer_start(struct ast_taskprocessor_listener *listener)
+{
+ /* No-op */
+ return 0;
+}
+
+static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+{
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ ao2_cleanup(ser);
+}
+
+static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+ .task_pushed = serializer_task_pushed,
+ .start = serializer_start,
+ .shutdown = serializer_shutdown,
+};
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+ RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
+ struct ast_taskprocessor *tps = NULL;
+
+ ser = serializer_create(pool);
+ if (!ser) {
+ return NULL;
+ }
+
+ listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
+ if (!listener) {
+ return NULL;
+ }
+ ser = NULL; /* ownership transferred to listener */
+
+ tps = ast_taskprocessor_create_with_listener(name, listener);
+ if (!tps) {
+ return NULL;
+ }
+ listener = NULL; /* ownership transferred to tps */
+
+ return tps;
+}