diff options
Diffstat (limited to 'main/threadpool.c')
-rw-r--r-- | main/threadpool.c | 87 |
1 files changed, 86 insertions, 1 deletions
diff --git a/main/threadpool.c b/main/threadpool.c index adaf8a554..ab390e9d8 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -866,7 +866,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name, if (!pool) { return NULL; } - + tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool); if (!tps_listener) { return NULL; @@ -1103,3 +1103,88 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta ast_cond_signal(&worker->cond); } +struct serializer { + struct ast_threadpool *pool; +}; + +static void serializer_dtor(void *obj) +{ + struct serializer *ser = obj; + ao2_cleanup(ser->pool); + ser->pool = NULL; +} + +static struct serializer *serializer_create(struct ast_threadpool *pool) +{ + struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor); + if (!ser) { + return NULL; + } + ao2_ref(pool, +1); + ser->pool = pool; + return ser; +} + +static int execute_tasks(void *data) +{ + struct ast_taskprocessor *tps = data; + + while (ast_taskprocessor_execute(tps)) { + /* No-op */ + } + + ast_taskprocessor_unreference(tps); + return 0; +} + +static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { + if (was_empty) { + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener); + ast_threadpool_push(ser->pool, execute_tasks, tps); + } +}; + +static int serializer_start(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ + return 0; +} + +static void serializer_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + ao2_cleanup(ser); +} + +static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = { + .task_pushed = serializer_task_pushed, + .start = serializer_start, + .shutdown = serializer_shutdown, +}; + +struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool) +{ + RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup); + RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup); + struct ast_taskprocessor *tps = NULL; + + ser = serializer_create(pool); + if (!ser) { + return NULL; + } + + listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser); + if (!listener) { + return NULL; + } + ser = NULL; /* ownership transferred to listener */ + + tps = ast_taskprocessor_create_with_listener(name, listener); + if (!tps) { + return NULL; + } + listener = NULL; /* ownership transferred to tps */ + + return tps; +} |