diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_taskprocessor.c | 183 | ||||
-rw-r--r-- | tests/test_threadpool.c | 232 |
2 files changed, 407 insertions, 8 deletions
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index e370dd78f..70400a9ec 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -450,11 +450,193 @@ test_exit: return res; } +struct shutdown_data { + ast_cond_t in; + ast_cond_t out; + ast_mutex_t lock; + int task_complete; + int task_started; + int task_stop_waiting; +}; + +static void shutdown_data_dtor(void *data) +{ + struct shutdown_data *shutdown_data = data; + ast_mutex_destroy(&shutdown_data->lock); + ast_cond_destroy(&shutdown_data->in); + ast_cond_destroy(&shutdown_data->out); +} + +static struct shutdown_data *shutdown_data_create(int dont_wait) +{ + RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup); + + shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor); + if (!shutdown_data) { + return NULL; + } + + ast_mutex_init(&shutdown_data->lock); + ast_cond_init(&shutdown_data->in, NULL); + ast_cond_init(&shutdown_data->out, NULL); + shutdown_data->task_stop_waiting = dont_wait; + ao2_ref(shutdown_data, +1); + return shutdown_data; +} + +static int shutdown_task_exec(void *data) +{ + struct shutdown_data *shutdown_data = data; + SCOPED_MUTEX(lock, &shutdown_data->lock); + shutdown_data->task_started = 1; + ast_cond_signal(&shutdown_data->out); + while (!shutdown_data->task_stop_waiting) { + ast_cond_wait(&shutdown_data->in, &shutdown_data->lock); + } + shutdown_data->task_complete = 1; + ast_cond_signal(&shutdown_data->out); + return 0; +} + +static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &shutdown_data->lock); + + while (!shutdown_data->task_complete) { + if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) { + break; + } + } + + return shutdown_data->task_complete; +} + +static int shutdown_has_completed(struct shutdown_data *shutdown_data) +{ + SCOPED_MUTEX(lock, &shutdown_data->lock); + return shutdown_data->task_complete; +} + +static int shutdown_waitfor_start(struct shutdown_data *shutdown_data) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &shutdown_data->lock); + + while (!shutdown_data->task_started) { + if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) { + break; + } + } + + return shutdown_data->task_started; +} + +static void shutdown_poke(struct shutdown_data *shutdown_data) +{ + SCOPED_MUTEX(lock, &shutdown_data->lock); + shutdown_data->task_stop_waiting = 1; + ast_cond_signal(&shutdown_data->in); +} + +static void *tps_shutdown_thread(void *data) +{ + struct ast_taskprocessor *tps = data; + ast_taskprocessor_unreference(tps); + return NULL; +} + +AST_TEST_DEFINE(taskprocessor_shutdown) +{ + RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference); + RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup); + RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup); + int push_res; + int wait_res; + int pthread_res; + pthread_t shutdown_thread; + + switch (cmd) { + case TEST_INIT: + info->name = "taskprocessor_shutdown"; + info->category = "/main/taskprocessor/"; + info->summary = "Test of taskproccesor shutdown sequence"; + info->description = + "Ensures that all tasks run to completion after the taskprocessor has been unref'ed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT); + task1 = shutdown_data_create(0); /* task1 waits to be poked */ + task2 = shutdown_data_create(1); /* task2 waits for nothing */ + + if (!tps || !task1 || !task2) { + ast_test_status_update(test, "Allocation error\n"); + return AST_TEST_FAIL; + } + + push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1); + if (push_res != 0) { + ast_test_status_update(test, "Could not push task1\n"); + return AST_TEST_FAIL; + } + + push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2); + if (push_res != 0) { + ast_test_status_update(test, "Could not push task2\n"); + return AST_TEST_FAIL; + } + + wait_res = shutdown_waitfor_start(task1); + if (!wait_res) { + ast_test_status_update(test, "Task1 didn't start\n"); + return AST_TEST_FAIL; + } + + pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps); + if (pthread_res != 0) { + ast_test_status_update(test, "Failed to create shutdown thread\n"); + return AST_TEST_FAIL; + } + tps = NULL; + + /* Wakeup task1; it should complete */ + shutdown_poke(task1); + wait_res = shutdown_waitfor_completion(task1); + if (!wait_res) { + ast_test_status_update(test, "Task1 didn't complete\n"); + return AST_TEST_FAIL; + } + + /* Wait for shutdown to complete */ + pthread_join(shutdown_thread, NULL); + + /* Should have also also completed task2 */ + wait_res = shutdown_has_completed(task2); + if (!wait_res) { + ast_test_status_update(test, "Task2 didn't finish\n"); + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; +} + static int unload_module(void) { ast_test_unregister(default_taskprocessor); ast_test_unregister(default_taskprocessor_load); ast_test_unregister(taskprocessor_listener); + ast_test_unregister(taskprocessor_shutdown); return 0; } @@ -463,6 +645,7 @@ static int load_module(void) ast_test_register(default_taskprocessor); ast_test_register(default_taskprocessor_load); ast_test_register(taskprocessor_listener); + ast_test_register(taskprocessor_shutdown); return AST_MODULE_LOAD_SUCCESS; } diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 712b8581b..79b369d94 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -31,12 +31,13 @@ #include "asterisk.h" -#include "asterisk/test.h" -#include "asterisk/threadpool.h" -#include "asterisk/module.h" -#include "asterisk/lock.h" #include "asterisk/astobj2.h" +#include "asterisk/lock.h" #include "asterisk/logger.h" +#include "asterisk/module.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/test.h" +#include "asterisk/threadpool.h" struct test_listener_data { int num_active; @@ -1124,11 +1125,12 @@ end: } struct complex_task_data { + int task_started; int task_executed; int continue_task; ast_mutex_t lock; ast_cond_t stall_cond; - ast_cond_t done_cond; + ast_cond_t notify_cond; }; static struct complex_task_data *complex_task_data_alloc(void) @@ -1140,7 +1142,7 @@ static struct complex_task_data *complex_task_data_alloc(void) } ast_mutex_init(&ctd->lock); ast_cond_init(&ctd->stall_cond, NULL); - ast_cond_init(&ctd->done_cond, NULL); + ast_cond_init(&ctd->notify_cond, NULL); return ctd; } @@ -1148,12 +1150,15 @@ static int complex_task(void *data) { struct complex_task_data *ctd = data; SCOPED_MUTEX(lock, &ctd->lock); + /* Notify that we started */ + ctd->task_started = 1; + ast_cond_signal(&ctd->notify_cond); while (!ctd->continue_task) { ast_cond_wait(&ctd->stall_cond, lock); } /* We got poked. Finish up */ ctd->task_executed = 1; - ast_cond_signal(&ctd->done_cond); + ast_cond_signal(&ctd->notify_cond); return 0; } @@ -1164,6 +1169,42 @@ static void poke_worker(struct complex_task_data *ctd) ast_cond_signal(&ctd->stall_cond); } +static int wait_for_complex_start(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_started) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + return ctd->task_started; +} + +static int has_complex_started(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 1, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_started) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + return ctd->task_started; +} + static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd) { struct timeval start = ast_tvnow(); @@ -1175,7 +1216,7 @@ static enum ast_test_result_state wait_for_complex_completion(struct complex_tas SCOPED_MUTEX(lock, &ctd->lock); while (!ctd->task_executed) { - if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { break; } } @@ -1391,6 +1432,177 @@ end: return res; } +AST_TEST_DEFINE(threadpool_serializer) +{ + int started = 0; + int finished = 0; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool *pool = NULL; + struct ast_taskprocessor *uut = NULL; + struct complex_task_data *data1 = NULL; + struct complex_task_data *data2 = NULL; + struct complex_task_data *data3 = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 2, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_serializer"; + info->category = "/main/threadpool/"; + info->summary = "Test that serializers"; + info->description = + "Ensures that tasks enqueued to a serialize execute in sequence.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + pool = ast_threadpool_create("threadpool_serializer", NULL, &options); + if (!pool) { + ast_test_status_update(test, "Could not create threadpool\n"); + goto end; + } + uut = ast_threadpool_serializer("ser1", pool); + data1 = complex_task_data_alloc(); + data2 = complex_task_data_alloc(); + data3 = complex_task_data_alloc(); + if (!uut || !data1 || !data2 || !data3) { + ast_test_status_update(test, "Allocation failed\n"); + goto end; + } + + /* This should start right away */ + if (ast_taskprocessor_push(uut, complex_task, data1)) { + ast_test_status_update(test, "Failed to enqueue data1\n"); + goto end; + } + started = wait_for_complex_start(data1); + if (!started) { + ast_test_status_update(test, "Failed to start data1\n"); + goto end; + } + + /* This should not start until data 1 is complete */ + if (ast_taskprocessor_push(uut, complex_task, data2)) { + ast_test_status_update(test, "Failed to enqueue data2\n"); + goto end; + } + started = has_complex_started(data2); + if (started) { + ast_test_status_update(test, "data2 started out of order\n"); + goto end; + } + + /* But the free thread in the pool can still run */ + if (ast_threadpool_push(pool, complex_task, data3)) { + ast_test_status_update(test, "Failed to enqueue data3\n"); + } + started = wait_for_complex_start(data3); + if (!started) { + ast_test_status_update(test, "Failed to start data3\n"); + goto end; + } + + /* Finishing data1 should allow data2 to start */ + poke_worker(data1); + finished = wait_for_complex_completion(data1) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data1 couldn't finish\n"); + goto end; + } + started = wait_for_complex_start(data2); + if (!started) { + ast_test_status_update(test, "Failed to start data2\n"); + goto end; + } + + /* Finish up */ + poke_worker(data2); + finished = wait_for_complex_completion(data2) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data2 couldn't finish\n"); + goto end; + } + poke_worker(data3); + finished = wait_for_complex_completion(data3) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data3 couldn't finish\n"); + goto end; + } + + res = AST_TEST_PASS; + +end: + poke_worker(data1); + poke_worker(data2); + poke_worker(data3); + ast_taskprocessor_unreference(uut); + ast_threadpool_shutdown(pool); + ast_free(data1); + ast_free(data2); + ast_free(data3); + return res; +} + +AST_TEST_DEFINE(threadpool_serializer_dupe) +{ + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool *pool = NULL; + struct ast_taskprocessor *uut = NULL; + struct ast_taskprocessor *there_can_be_only_one = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 2, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_serializer_dupe"; + info->category = "/main/threadpool/"; + info->summary = "Test that serializers are uniquely named"; + info->description = + "Creating two serializers with the same name should\n" + "result in error.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + pool = ast_threadpool_create("threadpool_serializer", NULL, &options); + if (!pool) { + ast_test_status_update(test, "Could not create threadpool\n"); + goto end; + } + + uut = ast_threadpool_serializer("highlander", pool); + if (!uut) { + ast_test_status_update(test, "Allocation failed\n"); + goto end; + } + + there_can_be_only_one = ast_threadpool_serializer("highlander", pool); + if (there_can_be_only_one) { + ast_taskprocessor_unreference(there_can_be_only_one); + ast_test_status_update(test, "Duplicate name error\n"); + goto end; + } + + res = AST_TEST_PASS; + +end: + ast_taskprocessor_unreference(uut); + ast_threadpool_shutdown(pool); + return res; +} + static int unload_module(void) { ast_test_unregister(threadpool_push); @@ -1406,6 +1618,8 @@ static int unload_module(void) ast_test_unregister(threadpool_reactivation); ast_test_unregister(threadpool_task_distribution); ast_test_unregister(threadpool_more_destruction); + ast_test_unregister(threadpool_serializer); + ast_test_unregister(threadpool_serializer_dupe); return 0; } @@ -1424,6 +1638,8 @@ static int load_module(void) ast_test_register(threadpool_reactivation); ast_test_register(threadpool_task_distribution); ast_test_register(threadpool_more_destruction); + ast_test_register(threadpool_serializer); + ast_test_register(threadpool_serializer_dupe); return AST_MODULE_LOAD_SUCCESS; } |