summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2013-01-15 18:40:36 +0000
committerMark Michelson <mmichelson@digium.com>2013-01-15 18:40:36 +0000
commit65c7d6e2c3d4fc3a161a31ff1e89d20256926fb1 (patch)
tree8076acfcfd024cdfa45aea20a5fa908e6af34651 /main/taskprocessor.c
parentc80f86f007f6676f105a768be0ceedb3a8ec8298 (diff)
Remove alloc and destroy callbacks from the taskprocessor.
Now user data is allocated by the creator of the taskprocessor listener and that user data is passed into ast_taskprocessor_listener_alloc(). Similarly, freeing of the user data is left up to the user himself. He can free the data when the taskprocessor shuts down, or he can choose to hold onto it if it makes sense to do so. This, unsurprisingly, makes threadpool allocation a LOT cleaner now. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@379120 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c85
1 files changed, 35 insertions, 50 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 95bff720e..911eb76f8 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -151,7 +151,7 @@ static void *tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
int dead = 0;
while (!dead) {
@@ -162,23 +162,9 @@ static void *tps_processing_function(void *data)
return NULL;
}
-static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
-{
- struct default_taskprocessor_listener_pvt *pvt;
-
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- return NULL;
- }
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
- pvt->poll_thread = AST_PTHREADT_NULL;
- return pvt;
-}
-
static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
return -1;
@@ -189,41 +175,33 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (was_empty) {
default_tps_wake_up(pvt, 0);
}
}
-static void default_emptied(struct ast_taskprocessor_listener *listener)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
- /* No-op */
+ ast_mutex_destroy(&pvt->lock);
+ ast_cond_destroy(&pvt->cond);
+ ast_free(pvt);
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
default_tps_wake_up(pvt, 1);
pthread_join(pvt->poll_thread, NULL);
pvt->poll_thread = AST_PTHREADT_NULL;
-}
-
-static void default_listener_destroy(void *obj)
-{
- struct default_taskprocessor_listener_pvt *pvt = obj;
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
- .alloc = default_listener_alloc,
.start = default_listener_start,
.task_pushed = default_task_pushed,
- .emptied = default_emptied,
.shutdown = default_listener_shutdown,
- .destroy = default_listener_destroy,
};
/*!
@@ -474,33 +452,41 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps->name;
}
-static void listener_destroy(void *obj)
-{
- struct ast_taskprocessor_listener *listener = obj;
-
- listener->callbacks->destroy(listener->private_data);
-}
-
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
{
listener->callbacks->shutdown(listener);
ao2_ref(listener->tps, -1);
}
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+ ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
if (!listener) {
return NULL;
}
listener->callbacks = callbacks;
+ listener->user_data = user_data;
ao2_ref(listener, +1);
return listener;
}
+static void *default_listener_pvt_alloc(void)
+{
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ return NULL;
+ }
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ return pvt;
+}
+
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
@@ -508,6 +494,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
{
struct ast_taskprocessor *p;
struct ast_taskprocessor_listener *listener;
+ struct default_taskprocessor_listener_pvt *pvt;
if (ast_strlen_zero(name)) {
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
@@ -522,13 +509,19 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
return NULL;
}
/* Create a new taskprocessor. Start by creating a default listener */
- listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks);
+ pvt = default_listener_pvt_alloc();
+ if (!pvt) {
+ return NULL;
+ }
+ listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
if (!listener) {
+ default_listener_pvt_destroy(pvt);
return NULL;
}
p = ast_taskprocessor_create_with_listener(name, listener);
if (!p) {
+ default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -565,14 +558,6 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
ao2_ref(p, +1);
listener->tps = p;
- /* Allocation of private data must come after setting taskprocessor parameters
- * so that listeners who rely on taskprocessor data will have access to it.
- */
- listener->private_data = listener->callbacks->alloc(listener);
- if (!listener->private_data) {
- return NULL;
- }
-
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
return NULL;
@@ -656,7 +641,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
ao2_unlock(tps);
- if (size == 0) {
+ if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
return 0;
}