summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/asterisk/taskprocessor.h9
-rw-r--r--main/taskprocessor.c21
-rw-r--r--main/threadpool.c7
-rw-r--r--tests/test_taskprocessor.c11
-rw-r--r--tests/test_threadpool.c61
5 files changed, 106 insertions, 3 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index 6359c057e..4f61939f8 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -85,6 +85,15 @@ struct ast_taskprocessor_listener_callbacks {
*/
void *(*alloc)(struct ast_taskprocessor_listener *listener);
/*!
+ * \brief The taskprocessor has started completely
+ *
+ * This indicates that the taskprocessor is fully set up and the listener
+ * can now start interacting with it.
+ *
+ * \param listener The listener to start
+ */
+ int (*start)(struct ast_taskprocessor_listener *listener);
+ /*!
* \brief Indicates a task was pushed to the processor
*
* \param listener The listener
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index d83228f33..80875ec4a 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -171,12 +171,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
ast_cond_init(&pvt->cond, NULL);
ast_mutex_init(&pvt->lock);
pvt->poll_thread = AST_PTHREADT_NULL;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
- return NULL;
- }
return pvt;
}
+static int default_listener_start(struct ast_taskprocessor_listener *listener)
+{
+ struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+ if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+ return -1;
+ }
+
+ return 0;
+}
+
static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -209,6 +217,7 @@ static void default_listener_destroy(void *obj)
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.alloc = default_listener_alloc,
+ .start = default_listener_start,
.task_pushed = default_task_pushed,
.emptied = default_emptied,
.shutdown = default_listener_shutdown,
@@ -556,6 +565,12 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
return NULL;
}
+ if (p->listener->callbacks->start(p->listener)) {
+ ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+ ast_taskprocessor_unreference(p);
+ return NULL;
+ }
+
/* RAII_VAR will decrement the refcount at the end of the function.
* Since we want to pass back a reference to p, we bump the refcount
*/
diff --git a/main/threadpool.c b/main/threadpool.c
index 1da0d0766..1b0477926 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -268,6 +268,11 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
return pool;
}
+static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
+{
+ return 0;
+}
+
/*!
* \brief helper used for queued task when tasks are pushed
*/
@@ -431,6 +436,7 @@ static void threadpool_destroy(void *private_data)
*/
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
.alloc = threadpool_alloc,
+ .start = threadpool_tps_start,
.task_pushed = threadpool_tps_task_pushed,
.emptied = threadpool_tps_emptied,
.shutdown = threadpool_tps_shutdown,
@@ -623,6 +629,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
pool = tps_listener->private_data;
pool->tps = tps;
+ ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps);
ao2_ref(listener, +1);
pool->listener = listener;
ast_threadpool_set_size(pool, initial_size);
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index cbab754a9..377a2b3e3 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -116,6 +116,7 @@ AST_TEST_DEFINE(default_taskprocessor)
break;
}
}
+ ast_mutex_unlock(&task_data.lock);
if (!task_data.task_complete) {
ast_test_status_update(test, "Queued task did not execute!\n");
@@ -218,6 +219,7 @@ AST_TEST_DEFINE(default_taskprocessor_load)
break;
}
}
+ ast_mutex_unlock(&load_task_results.lock);
if (load_task_results.tasks_completed != NUM_TASKS) {
ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
@@ -267,6 +269,14 @@ static void *test_alloc(struct ast_taskprocessor_listener *listener)
}
/*!
+ * \brief test taskprocessor listener's start callback
+ */
+static int test_start(struct ast_taskprocessor_listener *listener)
+{
+ return 0;
+}
+
+/*!
* \brief test taskprocessor listener's task_pushed callback
*
* Adjusts private data's stats as indicated by the parameters.
@@ -309,6 +319,7 @@ static void test_destroy(void *private_data)
static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
.alloc = test_alloc,
+ .start = test_start,
.task_pushed = test_task_pushed,
.emptied = test_emptied,
.shutdown = test_shutdown,
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index eed4dc20f..fbbe670bb 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -36,6 +36,7 @@
#include "asterisk/module.h"
#include "asterisk/lock.h"
#include "asterisk/astobj2.h"
+#include "asterisk/logger.h"
struct test_listener_data {
int num_active;
@@ -66,6 +67,7 @@ static void test_state_changed(struct ast_threadpool *pool,
{
struct test_listener_data *tld = listener->private_data;
SCOPED_MUTEX(lock, &tld->lock);
+ ast_log(LOG_NOTICE, "State changed: num_active: %d, num_idle: %d\n", active_threads, idle_threads);
tld->num_active = active_threads;
tld->num_idle = idle_threads;
ast_cond_signal(&tld->cond);
@@ -95,6 +97,7 @@ static void test_emptied(struct ast_threadpool *pool,
static void test_destroy(void *private_data)
{
struct test_listener_data *tld = private_data;
+ ast_debug(1, "Poop\n");
ast_cond_destroy(&tld->cond);
ast_mutex_destroy(&tld->lock);
ast_free(tld);
@@ -135,6 +138,15 @@ static int simple_task(void *data)
return 0;
}
+#define WAIT_WHILE(tld, condition) \
+{\
+ ast_mutex_lock(&tld->lock);\
+ while ((condition)) {\
+ ast_cond_wait(&tld->cond, &tld->lock);\
+ }\
+ ast_mutex_unlock(&tld->lock);\
+}\
+
static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
{
struct test_listener_data *tld = listener->private_data;
@@ -246,15 +258,64 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_thread_creation)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_thread_creation";
+ info->category = "/main/threadpool_thread_creation/";
+ info->summary = "Test threadpool thread creation";
+ info->description =
+ "Ensure that threads can be added to a threadpool";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks);
+ if (!listener) {
+ return AST_TEST_FAIL;
+ }
+ tld = listener->private_data;
+
+ pool = ast_threadpool_create(listener, 0);
+ if (!pool) {
+ goto end;
+ }
+
+ /* Now let's create a thread. It should start active, then go
+ * idle immediately
+ */
+ ast_threadpool_set_size(pool, 1);
+
+ WAIT_WHILE(tld, tld->num_idle == 0);
+
+ res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
+
+end:
+ if (pool) {
+ ast_threadpool_shutdown(pool);
+ }
+ ao2_cleanup(listener);
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
+ ast_test_unregister(threadpool_thread_creation);
return 0;
}
static int load_module(void)
{
ast_test_register(threadpool_push);
+ ast_test_register(threadpool_thread_creation);
return AST_MODULE_LOAD_SUCCESS;
}