diff options
-rw-r--r-- | include/asterisk/taskprocessor.h | 28 | ||||
-rw-r--r-- | main/taskprocessor.c | 85 | ||||
-rw-r--r-- | main/threadpool.c | 53 | ||||
-rw-r--r-- | tests/test_taskprocessor.c | 41 |
4 files changed, 78 insertions, 129 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 7720547d6..c64a8f902 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -75,17 +75,6 @@ struct ast_taskprocessor_listener; struct ast_taskprocessor_listener_callbacks { /*! - * \brief Allocate the listener's private data - * - * This is called during taskprocesor creation. - * It is not necessary to assign the private data to the listener. - * - * \param listener The listener to which the private data belongs - * \retval NULL Error while attempting to initialize private data - * \retval non-NULL Allocated private data - */ - void *(*alloc)(struct ast_taskprocessor_listener *listener); - /*! * \brief The taskprocessor has started completely * * This indicates that the taskprocessor is fully set up and the listener @@ -111,7 +100,8 @@ struct ast_taskprocessor_listener_callbacks { * \brief Indicates the taskprocessor wishes to die. * * All operations on the task processor must to be stopped in - * this callback. + * this callback. This is an opportune time to free the listener's + * user data if it is not going to be used anywhere else. * * After this callback returns, it is NOT safe to operate on the * listener's reference to the taskprocessor. @@ -119,15 +109,6 @@ struct ast_taskprocessor_listener_callbacks { * \param listener The listener */ void (*shutdown)(struct ast_taskprocessor_listener *listener); - /*! - * \brief Destroy the listener's private data - * - * It is required that you free the private data in this callback - * in addition to the private data's individual fields. - * - * \param private_data The listener's private data - */ - void (*destroy)(void *private_data); }; /*! @@ -146,7 +127,7 @@ struct ast_taskprocessor_listener { /*! The taskprocessor that the listener is listening to */ struct ast_taskprocessor *tps; /*! Data private to the listener */ - void *private_data; + void *user_data; }; /*! @@ -158,10 +139,11 @@ struct ast_taskprocessor_listener { * callbacks. * * \param callbacks The callbacks to assign to the listener + * \param user_data The user data for the listener * \retval NULL Failure * \retval non-NULL The newly allocated taskprocessor listener */ -struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks); +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data); /*! * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 95bff720e..911eb76f8 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -151,7 +151,7 @@ static void *tps_processing_function(void *data) { struct ast_taskprocessor_listener *listener = data; struct ast_taskprocessor *tps = listener->tps; - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; int dead = 0; while (!dead) { @@ -162,23 +162,9 @@ static void *tps_processing_function(void *data) return NULL; } -static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) -{ - struct default_taskprocessor_listener_pvt *pvt; - - pvt = ast_calloc(1, sizeof(*pvt)); - if (!pvt) { - return NULL; - } - ast_cond_init(&pvt->cond, NULL); - ast_mutex_init(&pvt->lock); - pvt->poll_thread = AST_PTHREADT_NULL; - return pvt; -} - static int default_listener_start(struct ast_taskprocessor_listener *listener) { - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) { return -1; @@ -189,41 +175,33 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener) static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; if (was_empty) { default_tps_wake_up(pvt, 0); } } -static void default_emptied(struct ast_taskprocessor_listener *listener) +static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt) { - /* No-op */ + ast_mutex_destroy(&pvt->lock); + ast_cond_destroy(&pvt->cond); + ast_free(pvt); } static void default_listener_shutdown(struct ast_taskprocessor_listener *listener) { - struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; default_tps_wake_up(pvt, 1); pthread_join(pvt->poll_thread, NULL); pvt->poll_thread = AST_PTHREADT_NULL; -} - -static void default_listener_destroy(void *obj) -{ - struct default_taskprocessor_listener_pvt *pvt = obj; - ast_mutex_destroy(&pvt->lock); - ast_cond_destroy(&pvt->cond); - ast_free(pvt); + default_listener_pvt_destroy(pvt); } static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { - .alloc = default_listener_alloc, .start = default_listener_start, .task_pushed = default_task_pushed, - .emptied = default_emptied, .shutdown = default_listener_shutdown, - .destroy = default_listener_destroy, }; /*! @@ -474,33 +452,41 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } -static void listener_destroy(void *obj) -{ - struct ast_taskprocessor_listener *listener = obj; - - listener->callbacks->destroy(listener->private_data); -} - static void listener_shutdown(struct ast_taskprocessor_listener *listener) { listener->callbacks->shutdown(listener); ao2_ref(listener->tps, -1); } -struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks) +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data) { RAII_VAR(struct ast_taskprocessor_listener *, listener, - ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup); + ao2_alloc(sizeof(*listener), NULL), ao2_cleanup); if (!listener) { return NULL; } listener->callbacks = callbacks; + listener->user_data = user_data; ao2_ref(listener, +1); return listener; } +static void *default_listener_pvt_alloc(void) +{ + struct default_taskprocessor_listener_pvt *pvt; + + pvt = ast_calloc(1, sizeof(*pvt)); + if (!pvt) { + return NULL; + } + ast_cond_init(&pvt->cond, NULL); + ast_mutex_init(&pvt->lock); + pvt->poll_thread = AST_PTHREADT_NULL; + return pvt; +} + /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't * create the taskprocessor if we were told via ast_tps_options to return a reference only * if it already exists */ @@ -508,6 +494,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o { struct ast_taskprocessor *p; struct ast_taskprocessor_listener *listener; + struct default_taskprocessor_listener_pvt *pvt; if (ast_strlen_zero(name)) { ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n"); @@ -522,13 +509,19 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o return NULL; } /* Create a new taskprocessor. Start by creating a default listener */ - listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks); + pvt = default_listener_pvt_alloc(); + if (!pvt) { + return NULL; + } + listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt); if (!listener) { + default_listener_pvt_destroy(pvt); return NULL; } p = ast_taskprocessor_create_with_listener(name, listener); if (!p) { + default_listener_pvt_destroy(pvt); ao2_ref(listener, -1); return NULL; } @@ -565,14 +558,6 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam ao2_ref(p, +1); listener->tps = p; - /* Allocation of private data must come after setting taskprocessor parameters - * so that listeners who rely on taskprocessor data will have access to it. - */ - listener->private_data = listener->callbacks->alloc(listener); - if (!listener->private_data) { - return NULL; - } - if (!(ao2_link(tps_singletons, p))) { ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); return NULL; @@ -656,7 +641,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) } ao2_unlock(tps); - if (size == 0) { + if (size == 0 && tps->listener->callbacks->emptied) { tps->listener->callbacks->emptied(tps->listener); return 0; } diff --git a/main/threadpool.c b/main/threadpool.c index 05a5f8dd1..db38daa8f 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -365,23 +365,25 @@ static void threadpool_destructor(void *obj) * is because the threadpool exists as the private data on a taskprocessor * listener. * - * \param listener The taskprocessor listener where the threadpool will live. + * \param name The name of the threadpool. + * \param options The options the threadpool uses. * \retval NULL Could not initialize threadpool properly * \retval non-NULL The newly-allocated threadpool */ -static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) +static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options) { RAII_VAR(struct ast_threadpool *, pool, ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup); - struct ast_str *name = ast_str_create(64); + struct ast_str *control_tps_name = ast_str_create(64); - if (!name) { + if (!control_tps_name) { return NULL; } - ast_str_set(&name, 0, "%s-control", ast_taskprocessor_name(listener->tps)); + ast_str_set(&control_tps_name, 0, "%s-control", name); - pool->control_tps = ast_taskprocessor_get(ast_str_buffer(name), TPS_REF_DEFAULT); + pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT); + ast_free(control_tps_name); if (!pool->control_tps) { return NULL; } @@ -397,6 +399,7 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) if (!pool->zombie_threads) { return NULL; } + pool->options = *options; ao2_ref(pool, +1); return pool; @@ -545,7 +548,7 @@ static int queued_task_pushed(void *data) static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { - struct ast_threadpool *pool = listener->private_data; + struct ast_threadpool *pool = listener->user_data; struct task_pushed_data *tpd; SCOPED_AO2LOCK(lock, pool); @@ -585,7 +588,7 @@ static int queued_emptied(void *data) */ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) { - struct ast_threadpool *pool = listener->private_data; + struct ast_threadpool *pool = listener->user_data; SCOPED_AO2LOCK(lock, pool); if (pool->shutting_down) { @@ -608,26 +611,11 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) */ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) { - struct ast_threadpool *pool = listener->private_data; + struct ast_threadpool *pool = listener->user_data; ao2_cleanup(pool->active_threads); ao2_cleanup(pool->idle_threads); ao2_cleanup(pool->zombie_threads); -} - -/*! - * \brief Taskprocessor listener destroy callback - * - * Since the threadpool is an ao2 object, all that is necessary is to - * decrease the refcount. Since the control taskprocessor should already - * be destroyed by this point, this should be the final reference to the - * threadpool. - * - * \param private_data The threadpool to destroy - */ -static void threadpool_destroy(void *private_data) -{ - struct ast_threadpool *pool = private_data; ao2_cleanup(pool); } @@ -635,12 +623,10 @@ static void threadpool_destroy(void *private_data) * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor */ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { - .alloc = threadpool_alloc, .start = threadpool_tps_start, .task_pushed = threadpool_tps_task_pushed, .emptied = threadpool_tps_emptied, .shutdown = threadpool_tps_shutdown, - .destroy = threadpool_destroy, }; /*! @@ -854,12 +840,15 @@ struct ast_threadpool *ast_threadpool_create(const char *name, struct ast_threadpool_listener *listener, int initial_size, const struct ast_threadpool_options *options) { - struct ast_threadpool *pool; struct ast_taskprocessor *tps; - RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, - ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks), - ao2_cleanup); + RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup); + RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup); + if (!pool) { + return NULL; + } + + tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool); if (!tps_listener) { return NULL; } @@ -870,19 +859,17 @@ struct ast_threadpool *ast_threadpool_create(const char *name, } tps = ast_taskprocessor_create_with_listener(name, tps_listener); - if (!tps) { return NULL; } - pool = tps_listener->private_data; pool->tps = tps; if (listener) { ao2_ref(listener, +1); pool->listener = listener; } - pool->options = *options; ast_threadpool_set_size(pool, initial_size); + ao2_ref(pool, +1); return pool; } diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 377a2b3e3..c04eeeb36 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -260,7 +260,7 @@ struct test_listener_pvt { /*! * \brief test taskprocessor listener's alloc callback */ -static void *test_alloc(struct ast_taskprocessor_listener *listener) +static void *test_listener_pvt_alloc(void) { struct test_listener_pvt *pvt; @@ -283,7 +283,7 @@ static int test_start(struct ast_taskprocessor_listener *listener) */ static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { - struct test_listener_pvt *pvt = listener->private_data; + struct test_listener_pvt *pvt = listener->user_data; ++pvt->num_pushed; if (was_empty) { ++pvt->num_was_empty; @@ -295,7 +295,7 @@ static void test_task_pushed(struct ast_taskprocessor_listener *listener, int wa */ static void test_emptied(struct ast_taskprocessor_listener *listener) { - struct test_listener_pvt *pvt = listener->private_data; + struct test_listener_pvt *pvt = listener->user_data; ++pvt->num_emptied; } @@ -304,26 +304,15 @@ static void test_emptied(struct ast_taskprocessor_listener *listener) */ static void test_shutdown(struct ast_taskprocessor_listener *listener) { - struct test_listener_pvt *pvt = listener->private_data; + struct test_listener_pvt *pvt = listener->user_data; pvt->shutdown = 1; } -/*! - * \brief test taskprocessor listener's destroy callback. - */ -static void test_destroy(void *private_data) -{ - struct test_listener_pvt *pvt = private_data; - ast_free(pvt); -} - static const struct ast_taskprocessor_listener_callbacks test_callbacks = { - .alloc = test_alloc, .start = test_start, .task_pushed = test_task_pushed, .emptied = test_emptied, .shutdown = test_shutdown, - .destroy = test_destroy, }; /*! @@ -381,9 +370,9 @@ static int check_stats(struct ast_test *test, const struct test_listener_pvt *pv */ AST_TEST_DEFINE(taskprocessor_listener) { - struct ast_taskprocessor *tps; - struct ast_taskprocessor_listener *listener; - struct test_listener_pvt *pvt; + struct ast_taskprocessor *tps = NULL; + struct ast_taskprocessor_listener *listener = NULL; + struct test_listener_pvt *pvt = NULL; enum ast_test_result_state res = AST_TEST_PASS; switch (cmd) { @@ -398,10 +387,17 @@ AST_TEST_DEFINE(taskprocessor_listener) break; } - listener = ast_taskprocessor_listener_alloc(&test_callbacks); + pvt = test_listener_pvt_alloc(); + if (!pvt) { + ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n"); + return AST_TEST_FAIL; + } + + listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt); if (!listener) { ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n"); - return AST_TEST_FAIL; + res = AST_TEST_FAIL; + goto test_exit; } tps = ast_taskprocessor_create_with_listener("test_listener", listener); @@ -411,8 +407,6 @@ AST_TEST_DEFINE(taskprocessor_listener) goto test_exit; } - pvt = listener->private_data; - ast_taskprocessor_push(tps, listener_test_task, NULL); if (check_stats(test, pvt, 1, 0, 1) < 0) { @@ -449,9 +443,10 @@ AST_TEST_DEFINE(taskprocessor_listener) } test_exit: - ao2_ref(listener, -1); + ao2_cleanup(listener); /* This is safe even if tps is NULL */ ast_taskprocessor_unreference(tps); + ast_free(pvt); return res; } |