summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-11-09 22:28:10 +0000
committerMark Michelson <mmichelson@digium.com>2012-11-09 22:28:10 +0000
commitd5716ecae2238c9e96576870c2146606ab46bc6c (patch)
treef050d0d5ec64e242beb18907730a0e1ef06e74ba /main/taskprocessor.c
parent6fc8d830f8474a73436bceaa4f6f131dabd6ff8f (diff)
Genericize the allocation and destruction of taskprocessor listeners.
The goal of this is to take the responsibility away from individual listeners to be sure to properly unref the taskprocessor. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c79
1 files changed, 53 insertions, 26 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index bd94103d2..4ca01f9ca 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
ast_cond_signal(&pvt->cond);
}
-static void default_listener_destroy(void *obj)
+static void listener_destroy(void *obj)
{
struct ast_taskprocessor_listener *listener = obj;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
- default_tps_wake_up(pvt, 1);
- pthread_join(pvt->poll_thread, NULL);
- pvt->poll_thread = AST_PTHREADT_NULL;
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ listener->callbacks->destroy(listener->private_data);
ao2_ref(listener->tps, -1);
listener->tps = NULL;
@@ -173,6 +167,35 @@ static void *tps_processing_function(void *data)
return NULL;
}
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ return NULL;
+ }
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+ return NULL;
+ }
+ return pvt;
+}
+
+static void default_listener_destroy(void *obj)
+{
+ struct default_taskprocessor_listener_pvt *pvt = obj;
+
+ default_tps_wake_up(pvt, 1);
+ pthread_join(pvt->poll_thread, NULL);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ ast_mutex_destroy(&pvt->lock);
+ ast_cond_destroy(&pvt->cond);
+ ast_free(pvt);
+}
+
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener)
}
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+ .alloc = default_listener_alloc,
.task_pushed = default_task_pushed,
.emptied = default_emptied,
+ .destroy = default_listener_destroy,
};
/*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps->name;
}
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+ struct ast_taskprocessor_listener_callbacks *callbacks)
{
- struct ast_taskprocessor_listener *listener;
- struct default_taskprocessor_listener_pvt *pvt;
-
- listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+ RAII_VAR(struct ast_taskprocessor_listener *, listener,
+ ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+
if (!listener) {
return NULL;
}
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- ao2_ref(listener, -1);
- return NULL;
- }
- listener->callbacks = &default_listener_callbacks;
- listener->private_data = pvt;
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
- pvt->poll_thread = AST_PTHREADT_NULL;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
- ao2_ref(listener, -1);
+ listener->callbacks = callbacks;
+ listener->private_data = listener->callbacks->alloc(listener);
+ if (!listener->private_data) {
return NULL;
}
+
+ ao2_ref(listener, +1);
return listener;
}
@@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
return NULL;
}
/* Create a new taskprocessor. Start by creating a default listener */
- listener = default_listener_alloc();
+ listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+ if (!listener) {
+ return NULL;
+ }
p = ast_taskprocessor_create_with_listener(name, listener);
+ if (!p) {
+ ao2_ref(listener, -1);
+ return NULL;
+ }
+
+ /* Unref listener here since the taskprocessor has gained a reference to the listener */
ao2_ref(listener, -1);
return p;