summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-12-11 16:34:00 +0000
committerMark Michelson <mmichelson@digium.com>2012-12-11 16:34:00 +0000
commit8760e32ae380f65420799c290f7b92d40cd7926c (patch)
tree21f110cf8fd84476baee7e0fea62f4776db12055
parent29fc1227839155658ca899cdfa179673c88e1ee2 (diff)
Add auto-increment option and accompanying test.
This allows for the threadpool to automatically grow if tasks are pushed to it and no idle threads are currently available. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377803 65c4cc65-6c06-0410-ace0-fbb531ad65f3
-rw-r--r--include/asterisk/threadpool.h13
-rw-r--r--main/threadpool.c15
-rw-r--r--tests/test_threadpool.c88
3 files changed, 109 insertions, 7 deletions
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 9834d9180..39792b1bb 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -86,12 +86,23 @@ struct ast_threadpool_options {
#define AST_THREADPOOL_OPTIONS_VERSION 1
/*! Version of thradpool options in use */
int version;
- /* !
+ /*!
* \brief Time limit in seconds for idle threads
*
* A time of 0 or less will mean an infinite timeout.
*/
int idle_timeout;
+ /*!
+ * \brief Number of threads to increment pool by
+ *
+ * If a task is added into a pool and no idle thread is
+ * available to activate, then the pool can automatically
+ * grow by the given amount.
+ *
+ * Zero is a perfectly valid value to give here if you want
+ * to control threadpool growth yourself via your listener.
+ */
+ int auto_increment;
};
/*!
diff --git a/main/threadpool.c b/main/threadpool.c
index 57520cf11..b3195fc4c 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -416,7 +416,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po
/*!
* \brief Activate idle threads
*
- * This function always returns CMP_MATCH because all threads that this
+ * This function always returns CMP_MATCH because all workers that this
* function acts on need to be seen as matches so they are unlinked from the
* list of idle threads.
*
@@ -425,7 +425,7 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po
* \param arg The pool where the worker belongs
* \retval CMP_MATCH
*/
-static int activate_threads(void *obj, void *arg, int flags)
+static int activate_thread(void *obj, void *arg, int flags)
{
struct worker_thread *worker = obj;
struct ast_threadpool *pool = arg;
@@ -435,6 +435,8 @@ static int activate_threads(void *obj, void *arg, int flags)
return CMP_MATCH;
}
+static void grow(struct ast_threadpool *pool, int delta);
+
/*!
* \brief Queued task called when tasks are pushed into the threadpool
*
@@ -451,8 +453,13 @@ static int queued_task_pushed(void *data)
int was_empty = tpd->was_empty;
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- activate_threads, pool);
+ if (ao2_container_count(pool->idle_threads) == 0 && pool->options.auto_increment > 0) {
+ grow(pool, pool->options.auto_increment);
+ } else {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+ }
+ threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
}
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index 4fe63e222..38b79b58c 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -220,7 +220,7 @@ static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, s
}
if (!tld->empty_notice) {
- ast_test_status_update(test, "Test listener never told that threadpool is empty\n");
+ ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
res = AST_TEST_FAIL;
}
@@ -283,6 +283,7 @@ AST_TEST_DEFINE(threadpool_push)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -336,6 +337,7 @@ AST_TEST_DEFINE(threadpool_thread_creation)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -385,6 +387,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -443,6 +446,7 @@ AST_TEST_DEFINE(threadpool_thread_timeout)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 2,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -505,6 +509,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -581,6 +586,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -645,7 +651,6 @@ end:
ao2_cleanup(listener);
ast_free(std);
return res;
-
}
AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
@@ -660,6 +665,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -737,7 +743,80 @@ end:
ast_free(std2);
ast_free(std3);
return res;
+}
+
+AST_TEST_DEFINE(threadpool_auto_increment)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ struct simple_task_data *std = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 0,
+ .auto_increment = 3,
+ };
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "auto_increment";
+ info->category = "/main/threadpool/";
+ info->summary = "Test that the threadpool grows as tasks are added";
+ info->description =
+ "Create an empty threadpool and push a task to it. Once the task is\n"
+ "pushed, the threadpool should add three threads and be able to\n"
+ "handle the task. The threads should then go idle\n";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks);
+ if (!listener) {
+ return AST_TEST_FAIL;
+ }
+ tld = listener->private_data;
+
+ pool = ast_threadpool_create(info->name, listener, 0, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ std = simple_task_data_alloc();
+ if (!std) {
+ goto end;
+ }
+
+ ast_threadpool_push(pool, simple_task, std);
+
+ /* Pushing the task should result in the threadpool growing
+ * by three threads. This will allow the task to actually execute
+ */
+ res = wait_for_completion(test, std);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = wait_for_empty_notice(test, tld);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 3);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+ res = listener_check(test, listener, 1, 1, 1, 0, 3, 1);
+
+end:
+ if (pool) {
+ ast_threadpool_shutdown(pool);
+ }
+ ao2_cleanup(listener);
+ ast_free(std);
+ return res;
}
AST_TEST_DEFINE(threadpool_reactivation)
@@ -751,6 +830,7 @@ AST_TEST_DEFINE(threadpool_reactivation)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -913,6 +993,7 @@ AST_TEST_DEFINE(threadpool_task_distribution)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -1001,6 +1082,7 @@ AST_TEST_DEFINE(threadpool_more_destruction)
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
+ .auto_increment = 0,
};
switch (cmd) {
@@ -1104,6 +1186,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
+ ast_test_unregister(threadpool_auto_increment);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
@@ -1119,6 +1202,7 @@ static int load_module(void)
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);
+ ast_test_register(threadpool_auto_increment);
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);