summaryrefslogtreecommitdiff
path: root/main/threadpool.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/threadpool.c')
-rw-r--r--main/threadpool.c87
1 files changed, 86 insertions, 1 deletions
diff --git a/main/threadpool.c b/main/threadpool.c
index adaf8a554..ab390e9d8 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -866,7 +866,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
if (!pool) {
return NULL;
}
-
+
tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
if (!tps_listener) {
return NULL;
@@ -1103,3 +1103,88 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta
ast_cond_signal(&worker->cond);
}
+struct serializer {
+ struct ast_threadpool *pool;
+};
+
+static void serializer_dtor(void *obj)
+{
+ struct serializer *ser = obj;
+ ao2_cleanup(ser->pool);
+ ser->pool = NULL;
+}
+
+static struct serializer *serializer_create(struct ast_threadpool *pool)
+{
+ struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor);
+ if (!ser) {
+ return NULL;
+ }
+ ao2_ref(pool, +1);
+ ser->pool = pool;
+ return ser;
+}
+
+static int execute_tasks(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+
+ while (ast_taskprocessor_execute(tps)) {
+ /* No-op */
+ }
+
+ ast_taskprocessor_unreference(tps);
+ return 0;
+}
+
+static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) {
+ if (was_empty) {
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
+ ast_threadpool_push(ser->pool, execute_tasks, tps);
+ }
+};
+
+static int serializer_start(struct ast_taskprocessor_listener *listener)
+{
+ /* No-op */
+ return 0;
+}
+
+static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
+{
+ struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
+ ao2_cleanup(ser);
+}
+
+static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
+ .task_pushed = serializer_task_pushed,
+ .start = serializer_start,
+ .shutdown = serializer_shutdown,
+};
+
+struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
+{
+ RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
+ struct ast_taskprocessor *tps = NULL;
+
+ ser = serializer_create(pool);
+ if (!ser) {
+ return NULL;
+ }
+
+ listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
+ if (!listener) {
+ return NULL;
+ }
+ ser = NULL; /* ownership transferred to listener */
+
+ tps = ast_taskprocessor_create_with_listener(name, listener);
+ if (!tps) {
+ return NULL;
+ }
+ listener = NULL; /* ownership transferred to tps */
+
+ return tps;
+}