summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_taskprocessor.c183
-rw-r--r--tests/test_threadpool.c232
2 files changed, 407 insertions, 8 deletions
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index e370dd78f..70400a9ec 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -450,11 +450,193 @@ test_exit:
return res;
}
+struct shutdown_data {
+ ast_cond_t in;
+ ast_cond_t out;
+ ast_mutex_t lock;
+ int task_complete;
+ int task_started;
+ int task_stop_waiting;
+};
+
+static void shutdown_data_dtor(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ ast_mutex_destroy(&shutdown_data->lock);
+ ast_cond_destroy(&shutdown_data->in);
+ ast_cond_destroy(&shutdown_data->out);
+}
+
+static struct shutdown_data *shutdown_data_create(int dont_wait)
+{
+ RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
+
+ shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
+ if (!shutdown_data) {
+ return NULL;
+ }
+
+ ast_mutex_init(&shutdown_data->lock);
+ ast_cond_init(&shutdown_data->in, NULL);
+ ast_cond_init(&shutdown_data->out, NULL);
+ shutdown_data->task_stop_waiting = dont_wait;
+ ao2_ref(shutdown_data, +1);
+ return shutdown_data;
+}
+
+static int shutdown_task_exec(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_started = 1;
+ ast_cond_signal(&shutdown_data->out);
+ while (!shutdown_data->task_stop_waiting) {
+ ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
+ }
+ shutdown_data->task_complete = 1;
+ ast_cond_signal(&shutdown_data->out);
+ return 0;
+}
+
+static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_complete) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_has_completed(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_started) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_started;
+}
+
+static void shutdown_poke(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_stop_waiting = 1;
+ ast_cond_signal(&shutdown_data->in);
+}
+
+static void *tps_shutdown_thread(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+ ast_taskprocessor_unreference(tps);
+ return NULL;
+}
+
+AST_TEST_DEFINE(taskprocessor_shutdown)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
+ RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
+ int push_res;
+ int wait_res;
+ int pthread_res;
+ pthread_t shutdown_thread;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "taskprocessor_shutdown";
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test of taskproccesor shutdown sequence";
+ info->description =
+ "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
+ task1 = shutdown_data_create(0); /* task1 waits to be poked */
+ task2 = shutdown_data_create(1); /* task2 waits for nothing */
+
+ if (!tps || !task1 || !task2) {
+ ast_test_status_update(test, "Allocation error\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task1\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task2\n");
+ return AST_TEST_FAIL;
+ }
+
+ wait_res = shutdown_waitfor_start(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't start\n");
+ return AST_TEST_FAIL;
+ }
+
+ pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
+ if (pthread_res != 0) {
+ ast_test_status_update(test, "Failed to create shutdown thread\n");
+ return AST_TEST_FAIL;
+ }
+ tps = NULL;
+
+ /* Wakeup task1; it should complete */
+ shutdown_poke(task1);
+ wait_res = shutdown_waitfor_completion(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't complete\n");
+ return AST_TEST_FAIL;
+ }
+
+ /* Wait for shutdown to complete */
+ pthread_join(shutdown_thread, NULL);
+
+ /* Should have also also completed task2 */
+ wait_res = shutdown_has_completed(task2);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task2 didn't finish\n");
+ return AST_TEST_FAIL;
+ }
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
+ ast_test_unregister(taskprocessor_shutdown);
return 0;
}
@@ -463,6 +645,7 @@ static int load_module(void)
ast_test_register(default_taskprocessor);
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
+ ast_test_register(taskprocessor_shutdown);
return AST_MODULE_LOAD_SUCCESS;
}
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index 712b8581b..79b369d94 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -31,12 +31,13 @@
#include "asterisk.h"
-#include "asterisk/test.h"
-#include "asterisk/threadpool.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
#include "asterisk/astobj2.h"
+#include "asterisk/lock.h"
#include "asterisk/logger.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/test.h"
+#include "asterisk/threadpool.h"
struct test_listener_data {
int num_active;
@@ -1124,11 +1125,12 @@ end:
}
struct complex_task_data {
+ int task_started;
int task_executed;
int continue_task;
ast_mutex_t lock;
ast_cond_t stall_cond;
- ast_cond_t done_cond;
+ ast_cond_t notify_cond;
};
static struct complex_task_data *complex_task_data_alloc(void)
@@ -1140,7 +1142,7 @@ static struct complex_task_data *complex_task_data_alloc(void)
}
ast_mutex_init(&ctd->lock);
ast_cond_init(&ctd->stall_cond, NULL);
- ast_cond_init(&ctd->done_cond, NULL);
+ ast_cond_init(&ctd->notify_cond, NULL);
return ctd;
}
@@ -1148,12 +1150,15 @@ static int complex_task(void *data)
{
struct complex_task_data *ctd = data;
SCOPED_MUTEX(lock, &ctd->lock);
+ /* Notify that we started */
+ ctd->task_started = 1;
+ ast_cond_signal(&ctd->notify_cond);
while (!ctd->continue_task) {
ast_cond_wait(&ctd->stall_cond, lock);
}
/* We got poked. Finish up */
ctd->task_executed = 1;
- ast_cond_signal(&ctd->done_cond);
+ ast_cond_signal(&ctd->notify_cond);
return 0;
}
@@ -1164,6 +1169,42 @@ static void poke_worker(struct complex_task_data *ctd)
ast_cond_signal(&ctd->stall_cond);
}
+static int wait_for_complex_start(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
+static int has_complex_started(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 1,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
@@ -1175,7 +1216,7 @@ static enum ast_test_result_state wait_for_complex_completion(struct complex_tas
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_executed) {
- if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
@@ -1391,6 +1432,177 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_serializer)
+{
+ int started = 0;
+ int finished = 0;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct complex_task_data *data1 = NULL;
+ struct complex_task_data *data2 = NULL;
+ struct complex_task_data *data3 = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers";
+ info->description =
+ "Ensures that tasks enqueued to a serialize execute in sequence.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+ uut = ast_threadpool_serializer("ser1", pool);
+ data1 = complex_task_data_alloc();
+ data2 = complex_task_data_alloc();
+ data3 = complex_task_data_alloc();
+ if (!uut || !data1 || !data2 || !data3) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ /* This should start right away */
+ if (ast_taskprocessor_push(uut, complex_task, data1)) {
+ ast_test_status_update(test, "Failed to enqueue data1\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data1);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data1\n");
+ goto end;
+ }
+
+ /* This should not start until data 1 is complete */
+ if (ast_taskprocessor_push(uut, complex_task, data2)) {
+ ast_test_status_update(test, "Failed to enqueue data2\n");
+ goto end;
+ }
+ started = has_complex_started(data2);
+ if (started) {
+ ast_test_status_update(test, "data2 started out of order\n");
+ goto end;
+ }
+
+ /* But the free thread in the pool can still run */
+ if (ast_threadpool_push(pool, complex_task, data3)) {
+ ast_test_status_update(test, "Failed to enqueue data3\n");
+ }
+ started = wait_for_complex_start(data3);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data3\n");
+ goto end;
+ }
+
+ /* Finishing data1 should allow data2 to start */
+ poke_worker(data1);
+ finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data1 couldn't finish\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data2);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data2\n");
+ goto end;
+ }
+
+ /* Finish up */
+ poke_worker(data2);
+ finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data2 couldn't finish\n");
+ goto end;
+ }
+ poke_worker(data3);
+ finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data3 couldn't finish\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ poke_worker(data1);
+ poke_worker(data2);
+ poke_worker(data3);
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ ast_free(data1);
+ ast_free(data2);
+ ast_free(data3);
+ return res;
+}
+
+AST_TEST_DEFINE(threadpool_serializer_dupe)
+{
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct ast_taskprocessor *there_can_be_only_one = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer_dupe";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers are uniquely named";
+ info->description =
+ "Creating two serializers with the same name should\n"
+ "result in error.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+
+ uut = ast_threadpool_serializer("highlander", pool);
+ if (!uut) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
+ if (there_can_be_only_one) {
+ ast_taskprocessor_unreference(there_can_be_only_one);
+ ast_test_status_update(test, "Duplicate name error\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
@@ -1406,6 +1618,8 @@ static int unload_module(void)
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
+ ast_test_unregister(threadpool_serializer);
+ ast_test_unregister(threadpool_serializer_dupe);
return 0;
}
@@ -1424,6 +1638,8 @@ static int load_module(void)
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);
+ ast_test_register(threadpool_serializer);
+ ast_test_register(threadpool_serializer_dupe);
return AST_MODULE_LOAD_SUCCESS;
}