From d5716ecae2238c9e96576870c2146606ab46bc6c Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Fri, 9 Nov 2012 22:28:10 +0000 Subject: Genericize the allocation and destruction of taskprocessor listeners. The goal of this is to take the responsibility away from individual listeners to be sure to properly unref the taskprocessor. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/taskprocessor.c | 79 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 26 deletions(-) (limited to 'main/taskprocessor.c') diff --git a/main/taskprocessor.c b/main/taskprocessor.c index bd94103d2..4ca01f9ca 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, ast_cond_signal(&pvt->cond); } -static void default_listener_destroy(void *obj) +static void listener_destroy(void *obj) { struct ast_taskprocessor_listener *listener = obj; - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; - default_tps_wake_up(pvt, 1); - pthread_join(pvt->poll_thread, NULL); - pvt->poll_thread = AST_PTHREADT_NULL; - ast_mutex_destroy(&pvt->lock); - ast_cond_destroy(&pvt->cond); - ast_free(pvt); + listener->callbacks->destroy(listener->private_data); ao2_ref(listener->tps, -1); listener->tps = NULL; @@ -173,6 +167,35 @@ static void *tps_processing_function(void *data) return NULL; } +static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt; + + pvt = ast_calloc(1, sizeof(*pvt)); + if (!pvt) { + return NULL; + } + ast_cond_init(&pvt->cond, NULL); + ast_mutex_init(&pvt->lock); + pvt->poll_thread = AST_PTHREADT_NULL; + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { + return NULL; + } + return pvt; +} + +static void default_listener_destroy(void *obj) +{ + struct default_taskprocessor_listener_pvt *pvt = obj; + + default_tps_wake_up(pvt, 1); + pthread_join(pvt->poll_thread, NULL); + pvt->poll_thread = AST_PTHREADT_NULL; + ast_mutex_destroy(&pvt->lock); + ast_cond_destroy(&pvt->cond); + ast_free(pvt); +} + static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { struct default_taskprocessor_listener_pvt *pvt = listener->private_data; @@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener) } static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { + .alloc = default_listener_alloc, .task_pushed = default_task_pushed, .emptied = default_emptied, + .destroy = default_listener_destroy, }; /*! \internal \brief Clean up resources on Asterisk shutdown */ @@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } -static struct ast_taskprocessor_listener *default_listener_alloc(void) +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps, + struct ast_taskprocessor_listener_callbacks *callbacks) { - struct ast_taskprocessor_listener *listener; - struct default_taskprocessor_listener_pvt *pvt; - - listener = ao2_alloc(sizeof(*listener), default_listener_destroy); + RAII_VAR(struct ast_taskprocessor_listener *, listener, + ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup); + if (!listener) { return NULL; } - pvt = ast_calloc(1, sizeof(*pvt)); - if (!pvt) { - ao2_ref(listener, -1); - return NULL; - } - listener->callbacks = &default_listener_callbacks; - listener->private_data = pvt; - ast_cond_init(&pvt->cond, NULL); - ast_mutex_init(&pvt->lock); - pvt->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { - ao2_ref(listener, -1); + listener->callbacks = callbacks; + listener->private_data = listener->callbacks->alloc(listener); + if (!listener->private_data) { return NULL; } + + ao2_ref(listener, +1); return listener; } @@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o return NULL; } /* Create a new taskprocessor. Start by creating a default listener */ - listener = default_listener_alloc(); + listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks); + if (!listener) { + return NULL; + } p = ast_taskprocessor_create_with_listener(name, listener); + if (!p) { + ao2_ref(listener, -1); + return NULL; + } + + /* Unref listener here since the taskprocessor has gained a reference to the listener */ ao2_ref(listener, -1); return p; -- cgit v1.2.3