summaryrefslogtreecommitdiff
path: root/tests/test_threadpool.c
diff options
context:
space:
mode:
authorDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
committerDavid M. Lee <dlee@digium.com>2013-02-12 21:45:59 +0000
commit222e8a3afb6aabb31052fe76fa4f57fe26f69688 (patch)
treed7f7494c130d9372bdc43592e762ee83b4a78aec /tests/test_threadpool.c
parente9ff351f06c1cb7ad0ee59b3ff09b804a23eb3a0 (diff)
Add a serializer interface to the threadpool
This patch adds the ability to create a serializer from a thread pool. A serializer is a ast_taskprocessor with the same contract as a default taskprocessor (tasks execute serially) except instead of executing out of a dedicated thread, execution occurs in a thread from a ast_threadpool. Think of it as a lightweight thread. While it guarantees that each task will complete before executing the next, there is no guarantee as to which thread from the pool individual tasks will execute. This normally only matters if your code relys on thread specific information, such as thread locals. This patch also fixes a bug in how the 'was_empty' parameter is computed for the push callback, and gets rid of the unused 'shutting_down' field. Review: https://reviewboard.asterisk.org/r/2323/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@381326 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'tests/test_threadpool.c')
-rw-r--r--tests/test_threadpool.c232
1 files changed, 224 insertions, 8 deletions
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index 712b8581b..79b369d94 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -31,12 +31,13 @@
#include "asterisk.h"
-#include "asterisk/test.h"
-#include "asterisk/threadpool.h"
-#include "asterisk/module.h"
-#include "asterisk/lock.h"
#include "asterisk/astobj2.h"
+#include "asterisk/lock.h"
#include "asterisk/logger.h"
+#include "asterisk/module.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/test.h"
+#include "asterisk/threadpool.h"
struct test_listener_data {
int num_active;
@@ -1124,11 +1125,12 @@ end:
}
struct complex_task_data {
+ int task_started;
int task_executed;
int continue_task;
ast_mutex_t lock;
ast_cond_t stall_cond;
- ast_cond_t done_cond;
+ ast_cond_t notify_cond;
};
static struct complex_task_data *complex_task_data_alloc(void)
@@ -1140,7 +1142,7 @@ static struct complex_task_data *complex_task_data_alloc(void)
}
ast_mutex_init(&ctd->lock);
ast_cond_init(&ctd->stall_cond, NULL);
- ast_cond_init(&ctd->done_cond, NULL);
+ ast_cond_init(&ctd->notify_cond, NULL);
return ctd;
}
@@ -1148,12 +1150,15 @@ static int complex_task(void *data)
{
struct complex_task_data *ctd = data;
SCOPED_MUTEX(lock, &ctd->lock);
+ /* Notify that we started */
+ ctd->task_started = 1;
+ ast_cond_signal(&ctd->notify_cond);
while (!ctd->continue_task) {
ast_cond_wait(&ctd->stall_cond, lock);
}
/* We got poked. Finish up */
ctd->task_executed = 1;
- ast_cond_signal(&ctd->done_cond);
+ ast_cond_signal(&ctd->notify_cond);
return 0;
}
@@ -1164,6 +1169,42 @@ static void poke_worker(struct complex_task_data *ctd)
ast_cond_signal(&ctd->stall_cond);
}
+static int wait_for_complex_start(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 5,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
+static int has_complex_started(struct complex_task_data *ctd)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 1,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ SCOPED_MUTEX(lock, &ctd->lock);
+
+ while (!ctd->task_started) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
+ break;
+ }
+ }
+
+ return ctd->task_started;
+}
+
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
@@ -1175,7 +1216,7 @@ static enum ast_test_result_state wait_for_complex_completion(struct complex_tas
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_executed) {
- if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
+ if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
@@ -1391,6 +1432,177 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_serializer)
+{
+ int started = 0;
+ int finished = 0;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct complex_task_data *data1 = NULL;
+ struct complex_task_data *data2 = NULL;
+ struct complex_task_data *data3 = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers";
+ info->description =
+ "Ensures that tasks enqueued to a serialize execute in sequence.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+ uut = ast_threadpool_serializer("ser1", pool);
+ data1 = complex_task_data_alloc();
+ data2 = complex_task_data_alloc();
+ data3 = complex_task_data_alloc();
+ if (!uut || !data1 || !data2 || !data3) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ /* This should start right away */
+ if (ast_taskprocessor_push(uut, complex_task, data1)) {
+ ast_test_status_update(test, "Failed to enqueue data1\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data1);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data1\n");
+ goto end;
+ }
+
+ /* This should not start until data 1 is complete */
+ if (ast_taskprocessor_push(uut, complex_task, data2)) {
+ ast_test_status_update(test, "Failed to enqueue data2\n");
+ goto end;
+ }
+ started = has_complex_started(data2);
+ if (started) {
+ ast_test_status_update(test, "data2 started out of order\n");
+ goto end;
+ }
+
+ /* But the free thread in the pool can still run */
+ if (ast_threadpool_push(pool, complex_task, data3)) {
+ ast_test_status_update(test, "Failed to enqueue data3\n");
+ }
+ started = wait_for_complex_start(data3);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data3\n");
+ goto end;
+ }
+
+ /* Finishing data1 should allow data2 to start */
+ poke_worker(data1);
+ finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data1 couldn't finish\n");
+ goto end;
+ }
+ started = wait_for_complex_start(data2);
+ if (!started) {
+ ast_test_status_update(test, "Failed to start data2\n");
+ goto end;
+ }
+
+ /* Finish up */
+ poke_worker(data2);
+ finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data2 couldn't finish\n");
+ goto end;
+ }
+ poke_worker(data3);
+ finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
+ if (!finished) {
+ ast_test_status_update(test, "data3 couldn't finish\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ poke_worker(data1);
+ poke_worker(data2);
+ poke_worker(data3);
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ ast_free(data1);
+ ast_free(data2);
+ ast_free(data3);
+ return res;
+}
+
+AST_TEST_DEFINE(threadpool_serializer_dupe)
+{
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct ast_threadpool *pool = NULL;
+ struct ast_taskprocessor *uut = NULL;
+ struct ast_taskprocessor *there_can_be_only_one = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 0,
+ .initial_size = 2,
+ .max_size = 0,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "threadpool_serializer_dupe";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that serializers are uniquely named";
+ info->description =
+ "Creating two serializers with the same name should\n"
+ "result in error.\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
+ if (!pool) {
+ ast_test_status_update(test, "Could not create threadpool\n");
+ goto end;
+ }
+
+ uut = ast_threadpool_serializer("highlander", pool);
+ if (!uut) {
+ ast_test_status_update(test, "Allocation failed\n");
+ goto end;
+ }
+
+ there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
+ if (there_can_be_only_one) {
+ ast_taskprocessor_unreference(there_can_be_only_one);
+ ast_test_status_update(test, "Duplicate name error\n");
+ goto end;
+ }
+
+ res = AST_TEST_PASS;
+
+end:
+ ast_taskprocessor_unreference(uut);
+ ast_threadpool_shutdown(pool);
+ return res;
+}
+
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
@@ -1406,6 +1618,8 @@ static int unload_module(void)
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
+ ast_test_unregister(threadpool_serializer);
+ ast_test_unregister(threadpool_serializer_dupe);
return 0;
}
@@ -1424,6 +1638,8 @@ static int load_module(void)
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);
+ ast_test_register(threadpool_serializer);
+ ast_test_register(threadpool_serializer_dupe);
return AST_MODULE_LOAD_SUCCESS;
}