summaryrefslogtreecommitdiff
path: root/tests/test_taskprocessor.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
committerDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
commit222e8a3afb6aabb31052fe76fa4f57fe26f69688 (patch)
treed7f7494c130d9372bdc43592e762ee83b4a78aec /tests/test_taskprocessor.c
parente9ff351f06c1cb7ad0ee59b3ff09b804a23eb3a0 (diff)
Add a serializer interface to the threadpool
This patch adds the ability to create a serializer from a thread pool. A serializer is a ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread. While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals. This patch also fixes a bug in how the 'was_empty' parameter is computed for the push callback, and gets rid of the unused 'shutting_down' field. Review: https://reviewboard.asterisk.org/r/2323/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@381326 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests/test_taskprocessor.c')
-rw-r--r--tests/test_taskprocessor.c183
1 files changed, 183 insertions, 0 deletions
diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c
index e370dd78f..70400a9ec 100644
--- a/tests/test_taskprocessor.c
+++ b/tests/test_taskprocessor.c
@@ -450,11 +450,193 @@ test_exit:
return res;
}
+struct shutdown_data {
+ ast_cond_t in;
+ ast_cond_t out;
+ ast_mutex_t lock;
+ int task_complete;
+ int task_started;
+ int task_stop_waiting;
+};
+
+static void shutdown_data_dtor(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ ast_mutex_destroy(&shutdown_data->lock);
+ ast_cond_destroy(&shutdown_data->in);
+ ast_cond_destroy(&shutdown_data->out);
+}
+
+static struct shutdown_data *shutdown_data_create(int dont_wait)
+{
+ RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
+
+ shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
+ if (!shutdown_data) {
+ return NULL;
+ }
+
+ ast_mutex_init(&shutdown_data->lock);
+ ast_cond_init(&shutdown_data->in, NULL);
+ ast_cond_init(&shutdown_data->out, NULL);
+ shutdown_data->task_stop_waiting = dont_wait;
+ ao2_ref(shutdown_data, +1);
+ return shutdown_data;
+}
+
+static int shutdown_task_exec(void *data)
+{
+ struct shutdown_data *shutdown_data = data;
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_started = 1;
+ ast_cond_signal(&shutdown_data->out);
+ while (!shutdown_data->task_stop_waiting) {
+ ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
+ }
+ shutdown_data->task_complete = 1;
+ ast_cond_signal(&shutdown_data->out);
+ return 0;
+}
+
+static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_complete) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_has_completed(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ return shutdown_data->task_complete;
+}
+
+static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+
+ while (!shutdown_data->task_started) {
+ if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return shutdown_data->task_started;
+}
+
+static void shutdown_poke(struct shutdown_data *shutdown_data)
+{
+ SCOPED_MUTEX(lock, &shutdown_data->lock);
+ shutdown_data->task_stop_waiting = 1;
+ ast_cond_signal(&shutdown_data->in);
+}
+
+static void *tps_shutdown_thread(void *data)
+{
+ struct ast_taskprocessor *tps = data;
+ ast_taskprocessor_unreference(tps);
+ return NULL;
+}
+
+AST_TEST_DEFINE(taskprocessor_shutdown)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
+ RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
+ int push_res;
+ int wait_res;
+ int pthread_res;
+ pthread_t shutdown_thread;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "taskprocessor_shutdown";
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test of taskproccesor shutdown sequence";
+ info->description =
+ "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
+ task1 = shutdown_data_create(0); /* task1 waits to be poked */
+ task2 = shutdown_data_create(1); /* task2 waits for nothing */
+
+ if (!tps || !task1 || !task2) {
+ ast_test_status_update(test, "Allocation error\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task1\n");
+ return AST_TEST_FAIL;
+ }
+
+ push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
+ if (push_res != 0) {
+ ast_test_status_update(test, "Could not push task2\n");
+ return AST_TEST_FAIL;
+ }
+
+ wait_res = shutdown_waitfor_start(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't start\n");
+ return AST_TEST_FAIL;
+ }
+
+ pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
+ if (pthread_res != 0) {
+ ast_test_status_update(test, "Failed to create shutdown thread\n");
+ return AST_TEST_FAIL;
+ }
+ tps = NULL;
+
+ /* Wakeup task1; it should complete */
+ shutdown_poke(task1);
+ wait_res = shutdown_waitfor_completion(task1);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task1 didn't complete\n");
+ return AST_TEST_FAIL;
+ }
+
+ /* Wait for shutdown to complete */
+ pthread_join(shutdown_thread, NULL);
+
+ /* Should have also also completed task2 */
+ wait_res = shutdown_has_completed(task2);
+ if (!wait_res) {
+ ast_test_status_update(test, "Task2 didn't finish\n");
+ return AST_TEST_FAIL;
+ }
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
+ ast_test_unregister(taskprocessor_shutdown);
return 0;
}
@@ -463,6 +645,7 @@ static int load_module(void)
ast_test_register(default_taskprocessor);
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
+ ast_test_register(taskprocessor_shutdown);
return AST_MODULE_LOAD_SUCCESS;
}