summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-11-09 22:28:10 +0000
committerMark Michelson <mmichelson@digium.com>2012-11-09 22:28:10 +0000
commitd5716ecae2238c9e96576870c2146606ab46bc6c (patch)
treef050d0d5ec64e242beb18907730a0e1ef06e74ba
parent6fc8d830f8474a73436bceaa4f6f131dabd6ff8f (diff)
Genericize the allocation and destruction of taskprocessor listeners.
The goal of this is to take the responsibility away from individual listeners to be sure to properly unref the taskprocessor. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--include/asterisk/taskprocessor.h7
-rw-r--r--main/astobj2.c1
-rw-r--r--main/taskprocessor.c79
3 files changed, 61 insertions, 26 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index df66f59f0..a92e1f31c 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -63,10 +63,14 @@ enum ast_tps_options {
struct ast_taskprocessor_listener;
struct ast_taskprocessor_listener_callbacks {
+ /*! Allocate the listener's private data */
+ void *(*alloc)(struct ast_taskprocessor_listener *listener);
/*! Indicates a task was pushed to the processor */
void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
/*! Indicates the task processor has become empty */
void (*emptied)(struct ast_taskprocessor_listener *listener);
+ /*! Destroy the listener's private data */
+ void (*destroy)(void *private_data);
};
struct ast_taskprocessor_listener {
@@ -75,6 +79,9 @@ struct ast_taskprocessor_listener {
void *private_data;
};
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+ struct ast_taskprocessor_listener_callbacks *callbacks);
+
/*!
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
*
diff --git a/main/astobj2.c b/main/astobj2.c
index 082dfc038..b36cee837 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -431,6 +431,7 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
int ret;
if (obj == NULL) {
+ ast_backtrace();
ast_assert(0);
return -1;
}
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index bd94103d2..4ca01f9ca 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
ast_cond_signal(&pvt->cond);
}
-static void default_listener_destroy(void *obj)
+static void listener_destroy(void *obj)
{
struct ast_taskprocessor_listener *listener = obj;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
- default_tps_wake_up(pvt, 1);
- pthread_join(pvt->poll_thread, NULL);
- pvt->poll_thread = AST_PTHREADT_NULL;
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ listener->callbacks->destroy(listener->private_data);
ao2_ref(listener->tps, -1);
listener->tps = NULL;
@@ -173,6 +167,35 @@ static void *tps_processing_function(void *data)
return NULL;
}
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ return NULL;
+ }
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+ return NULL;
+ }
+ return pvt;
+}
+
+static void default_listener_destroy(void *obj)
+{
+ struct default_taskprocessor_listener_pvt *pvt = obj;
+
+ default_tps_wake_up(pvt, 1);
+ pthread_join(pvt->poll_thread, NULL);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ ast_mutex_destroy(&pvt->lock);
+ ast_cond_destroy(&pvt->cond);
+ ast_free(pvt);
+}
+
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener)
}
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+ .alloc = default_listener_alloc,
.task_pushed = default_task_pushed,
.emptied = default_emptied,
+ .destroy = default_listener_destroy,
};
/*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps->name;
}
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+ struct ast_taskprocessor_listener_callbacks *callbacks)
{
- struct ast_taskprocessor_listener *listener;
- struct default_taskprocessor_listener_pvt *pvt;
-
- listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+ RAII_VAR(struct ast_taskprocessor_listener *, listener,
+ ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+
if (!listener) {
return NULL;
}
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- ao2_ref(listener, -1);
- return NULL;
- }
- listener->callbacks = &default_listener_callbacks;
- listener->private_data = pvt;
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
- pvt->poll_thread = AST_PTHREADT_NULL;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
- ao2_ref(listener, -1);
+ listener->callbacks = callbacks;
+ listener->private_data = listener->callbacks->alloc(listener);
+ if (!listener->private_data) {
return NULL;
}
+
+ ao2_ref(listener, +1);
return listener;
}
@@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
return NULL;
}
/* Create a new taskprocessor. Start by creating a default listener */
- listener = default_listener_alloc();
+ listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+ if (!listener) {
+ return NULL;
+ }
p = ast_taskprocessor_create_with_listener(name, listener);
+ if (!p) {
+ ao2_ref(listener, -1);
+ return NULL;
+ }
+
+ /* Unref listener here since the taskprocessor has gained a reference to the listener */
ao2_ref(listener, -1);
return p;