summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/taskprocessor.h28
-rw-r--r--main/taskprocessor.c85
-rw-r--r--main/threadpool.c53
-rw-r--r--tests/test_taskprocessor.c41
4 files changed, 78 insertions, 129 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 7720547d6..c64a8f902 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -75,17 +75,6 @@ struct ast_taskprocessor_listener;
struct ast_taskprocessor_listener_callbacks {
/*!
- * \brief Allocate the listener's private data
- *
- * This is called during taskprocesor creation.
- * It is not necessary to assign the private data to the listener.
- *
- * \param listener The listener to which the private data belongs
- * \retval NULL Error while attempting to initialize private data
- * \retval non-NULL Allocated private data
- */
- void *(*alloc)(struct ast_taskprocessor_listener *listener);
- /*!
* \brief The taskprocessor has started completely
*
* This indicates that the taskprocessor is fully set up and the listener
@@ -111,7 +100,8 @@ struct ast_taskprocessor_listener_callbacks {
* \brief Indicates the taskprocessor wishes to die.
*
* All operations on the task processor must to be stopped in
- * this callback.
+ * this callback. This is an opportune time to free the listener's
+ * user data if it is not going to be used anywhere else.
*
* After this callback returns, it is NOT safe to operate on the
* listener's reference to the taskprocessor.
@@ -119,15 +109,6 @@ struct ast_taskprocessor_listener_callbacks {
* \param listener The listener
*/
void (*shutdown)(struct ast_taskprocessor_listener *listener);
- /*!
- * \brief Destroy the listener's private data
- *
- * It is required that you free the private data in this callback
- * in addition to the private data's individual fields.
- *
- * \param private_data The listener's private data
- */
- void (*destroy)(void *private_data);
};
/*!
@@ -146,7 +127,7 @@ struct ast_taskprocessor_listener {
/*! The taskprocessor that the listener is listening to */
struct ast_taskprocessor *tps;
/*! Data private to the listener */
- void *private_data;
+ void *user_data;
};
/*!
@@ -158,10 +139,11 @@ struct ast_taskprocessor_listener {
* callbacks.
*
* \param callbacks The callbacks to assign to the listener
+ * \param user_data The user data for the listener
* \retval NULL Failure
* \retval non-NULL The newly allocated taskprocessor listener
*/
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks);
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data);
/*!
* \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 95bff720e..911eb76f8 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -151,7 +151,7 @@ static void *tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
int dead = 0;
while (!dead) {
@@ -162,23 +162,9 @@ static void *tps_processing_function(void *data)
return NULL;
}
-static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
-{
- struct default_taskprocessor_listener_pvt *pvt;
-
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- return NULL;
- }
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
- pvt->poll_thread = AST_PTHREADT_NULL;
- return pvt;
-}
-
static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
return -1;
@@ -189,41 +175,33 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (was_empty) {
default_tps_wake_up(pvt, 0);
}
}
-static void default_emptied(struct ast_taskprocessor_listener *listener)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
- /* No-op */
+ ast_mutex_destroy(&pvt->lock);
+ ast_cond_destroy(&pvt->cond);
+ ast_free(pvt);
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
- struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
default_tps_wake_up(pvt, 1);
pthread_join(pvt->poll_thread, NULL);
pvt->poll_thread = AST_PTHREADT_NULL;
-}
-
-static void default_listener_destroy(void *obj)
-{
- struct default_taskprocessor_listener_pvt *pvt = obj;
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
- .alloc = default_listener_alloc,
.start = default_listener_start,
.task_pushed = default_task_pushed,
- .emptied = default_emptied,
.shutdown = default_listener_shutdown,
- .destroy = default_listener_destroy,
};
/*!
@@ -474,33 +452,41 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps->name;
}
-static void listener_destroy(void *obj)
-{
- struct ast_taskprocessor_listener *listener = obj;
-
- listener->callbacks->destroy(listener->private_data);
-}
-
static void listener_shutdown(struct ast_taskprocessor_listener *listener)
{
listener->callbacks->shutdown(listener);
ao2_ref(listener->tps, -1);
}
-struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+ ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
if (!listener) {
return NULL;
}
listener->callbacks = callbacks;
+ listener->user_data = user_data;
ao2_ref(listener, +1);
return listener;
}
+static void *default_listener_pvt_alloc(void)
+{
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ return NULL;
+ }
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ return pvt;
+}
+
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
@@ -508,6 +494,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
{
struct ast_taskprocessor *p;
struct ast_taskprocessor_listener *listener;
+ struct default_taskprocessor_listener_pvt *pvt;
if (ast_strlen_zero(name)) {
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
@@ -522,13 +509,19 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
return NULL;
}
/* Create a new taskprocessor. Start by creating a default listener */
- listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks);
+ pvt = default_listener_pvt_alloc();
+ if (!pvt) {
+ return NULL;
+ }
+ listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
if (!listener) {
+ default_listener_pvt_destroy(pvt);
return NULL;
}
p = ast_taskprocessor_create_with_listener(name, listener);
if (!p) {
+ default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -565,14 +558,6 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
ao2_ref(p, +1);
listener->tps = p;
- /* Allocation of private data must come after setting taskprocessor parameters
- * so that listeners who rely on taskprocessor data will have access to it.
- */
- listener->private_data = listener->callbacks->alloc(listener);
- if (!listener->private_data) {
- return NULL;
- }
-
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
return NULL;
@@ -656,7 +641,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
ao2_unlock(tps);
- if (size == 0) {
+ if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
return 0;
}
diff --git a/main/threadpool.c b/main/threadpool.c
index 05a5f8dd1..db38daa8f 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -365,23 +365,25 @@ static void threadpool_destructor(void *obj)
* is because the threadpool exists as the private data on a taskprocessor
* listener.
*
- * \param listener The taskprocessor listener where the threadpool will live.
+ * \param name The name of the threadpool.
+ * \param options The options the threadpool uses.
* \retval NULL Could not initialize threadpool properly
* \retval non-NULL The newly-allocated threadpool
*/
-static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
+static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
{
RAII_VAR(struct ast_threadpool *, pool,
ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup);
- struct ast_str *name = ast_str_create(64);
+ struct ast_str *control_tps_name = ast_str_create(64);
- if (!name) {
+ if (!control_tps_name) {
return NULL;
}
- ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps));
+ ast_str_set(&control_tps_name, 0, "%s-control", name);
- pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT);
+ pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
+ ast_free(control_tps_name);
if (!pool->control_tps) {
return NULL;
}
@@ -397,6 +399,7 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
if (!pool->zombie_threads) {
return NULL;
}
+ pool->options = *options;
ao2_ref(pool, +1);
return pool;
@@ -545,7 +548,7 @@ static int queued_task_pushed(void *data)
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
int was_empty)
{
- struct ast_threadpool *pool = listener->private_data;
+ struct ast_threadpool *pool = listener->user_data;
struct task_pushed_data *tpd;
SCOPED_AO2LOCK(lock, pool);
@@ -585,7 +588,7 @@ static int queued_emptied(void *data)
*/
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
- struct ast_threadpool *pool = listener->private_data;
+ struct ast_threadpool *pool = listener->user_data;
SCOPED_AO2LOCK(lock, pool);
if (pool->shutting_down) {
@@ -608,26 +611,11 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
*/
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{
- struct ast_threadpool *pool = listener->private_data;
+ struct ast_threadpool *pool = listener->user_data;
ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads);
ao2_cleanup(pool->zombie_threads);
-}
-
-/*!
- * \brief Taskprocessor listener destroy callback
- *
- * Since the threadpool is an ao2 object, all that is necessary is to
- * decrease the refcount. Since the control taskprocessor should already
- * be destroyed by this point, this should be the final reference to the
- * threadpool.
- *
- * \param private_data The threadpool to destroy
- */
-static void threadpool_destroy(void *private_data)
-{
- struct ast_threadpool *pool = private_data;
ao2_cleanup(pool);
}
@@ -635,12 +623,10 @@ static void threadpool_destroy(void *private_data)
* \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
*/
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
- .alloc = threadpool_alloc,
.start = threadpool_tps_start,
.task_pushed = threadpool_tps_task_pushed,
.emptied = threadpool_tps_emptied,
.shutdown = threadpool_tps_shutdown,
- .destroy = threadpool_destroy,
};
/*!
@@ -854,12 +840,15 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
struct ast_threadpool_listener *listener,
int initial_size, const struct ast_threadpool_options *options)
{
- struct ast_threadpool *pool;
struct ast_taskprocessor *tps;
- RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
- ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
- ao2_cleanup);
+ RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup);
+ if (!pool) {
+ return NULL;
+ }
+
+ tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
if (!tps_listener) {
return NULL;
}
@@ -870,19 +859,17 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
}
tps = ast_taskprocessor_create_with_listener(name, tps_listener);
-
if (!tps) {
return NULL;
}
- pool = tps_listener->private_data;
pool->tps = tps;
if (listener) {
ao2_ref(listener, +1);
pool->listener = listener;
}
- pool->options = *options;
ast_threadpool_set_size(pool, initial_size);
+ ao2_ref(pool, +1);
return pool;
}
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index 377a2b3e3..c04eeeb36 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -260,7 +260,7 @@ struct test_listener_pvt {
/*!
* \brief test taskprocessor listener's alloc callback
*/
-static void *test_alloc(struct ast_taskprocessor_listener *listener)
+static void *test_listener_pvt_alloc(void)
{
struct test_listener_pvt *pvt;
@@ -283,7 +283,7 @@ static int test_start(struct ast_taskprocessor_listener *listener)
*/
static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
- struct test_listener_pvt *pvt = listener->private_data;
+ struct test_listener_pvt *pvt = listener->user_data;
++pvt->num_pushed;
if (was_empty) {
++pvt->num_was_empty;
@@ -295,7 +295,7 @@ static void test_task_pushed(struct ast_taskprocessor_listener *listener, int wa
*/
static void test_emptied(struct ast_taskprocessor_listener *listener)
{
- struct test_listener_pvt *pvt = listener->private_data;
+ struct test_listener_pvt *pvt = listener->user_data;
++pvt->num_emptied;
}
@@ -304,26 +304,15 @@ static void test_emptied(struct ast_taskprocessor_listener *listener)
*/
static void test_shutdown(struct ast_taskprocessor_listener *listener)
{
- struct test_listener_pvt *pvt = listener->private_data;
+ struct test_listener_pvt *pvt = listener->user_data;
pvt->shutdown = 1;
}
-/*!
- * \brief test taskprocessor listener's destroy callback.
- */
-static void test_destroy(void *private_data)
-{
- struct test_listener_pvt *pvt = private_data;
- ast_free(pvt);
-}
-
static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
- .alloc = test_alloc,
.start = test_start,
.task_pushed = test_task_pushed,
.emptied = test_emptied,
.shutdown = test_shutdown,
- .destroy = test_destroy,
};
/*!
@@ -381,9 +370,9 @@ static int check_stats(struct ast_test *test, const struct test_listener_pvt *pv
*/
AST_TEST_DEFINE(taskprocessor_listener)
{
- struct ast_taskprocessor *tps;
- struct ast_taskprocessor_listener *listener;
- struct test_listener_pvt *pvt;
+ struct ast_taskprocessor *tps = NULL;
+ struct ast_taskprocessor_listener *listener = NULL;
+ struct test_listener_pvt *pvt = NULL;
enum ast_test_result_state res = AST_TEST_PASS;
switch (cmd) {
@@ -398,10 +387,17 @@ AST_TEST_DEFINE(taskprocessor_listener)
break;
}
- listener = ast_taskprocessor_listener_alloc(&test_callbacks);
+ pvt = test_listener_pvt_alloc();
+ if (!pvt) {
+ ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
+ return AST_TEST_FAIL;
+ }
+
+ listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
if (!listener) {
ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
- return AST_TEST_FAIL;
+ res = AST_TEST_FAIL;
+ goto test_exit;
}
tps = ast_taskprocessor_create_with_listener("test_listener", listener);
@@ -411,8 +407,6 @@ AST_TEST_DEFINE(taskprocessor_listener)
goto test_exit;
}
- pvt = listener->private_data;
-
ast_taskprocessor_push(tps, listener_test_task, NULL);
if (check_stats(test, pvt, 1, 0, 1) < 0) {
@@ -449,9 +443,10 @@ AST_TEST_DEFINE(taskprocessor_listener)
}
test_exit:
- ao2_ref(listener, -1);
+ ao2_cleanup(listener);
/* This is safe even if tps is NULL */
ast_taskprocessor_unreference(tps);
+ ast_free(pvt);
return res;
}