From 4590bfd93d7124fbd042f17260738e315e0abb6e Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Fri, 7 Dec 2012 00:30:35 +0000 Subject: Add new threadpool test and fix some taskprocessor bugs. The new thread creation test fails because Asterisk locks up while trying to lock a taskprocessor. While trying to debug that, I found a race condition during taskprocessor creation where a default taskprocessor listener could try to operate on a partially started taskprocessor. This was fixed by adding a new callback to taskprocessor listeners. Then while testing that change, I found some bugs in the taskprocessor tests where I was not properly unlocking when done with a lock. Scoped locks have spoiled me a bit. I still have not figured out why the threadpool thread creation test is locking up. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377368 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/taskprocessor.c | 21 ++++++++++++++++++--- main/threadpool.c | 7 +++++++ 2 files changed, 25 insertions(+), 3 deletions(-) (limited to 'main') diff --git a/main/taskprocessor.c b/main/taskprocessor.c index d83228f33..80875ec4a 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -171,12 +171,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) ast_cond_init(&pvt->cond, NULL); ast_mutex_init(&pvt->lock); pvt->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { - return NULL; - } return pvt; } +static int default_listener_start(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) { + return -1; + } + + return 0; +} + static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { struct default_taskprocessor_listener_pvt *pvt = listener->private_data; @@ -209,6 +217,7 @@ static void default_listener_destroy(void *obj) static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { .alloc = default_listener_alloc, + .start = default_listener_start, .task_pushed = default_task_pushed, .emptied = default_emptied, .shutdown = default_listener_shutdown, @@ -556,6 +565,12 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam return NULL; } + if (p->listener->callbacks->start(p->listener)) { + ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name); + ast_taskprocessor_unreference(p); + return NULL; + } + /* RAII_VAR will decrement the refcount at the end of the function. * Since we want to pass back a reference to p, we bump the refcount */ diff --git a/main/threadpool.c b/main/threadpool.c index 1da0d0766..1b0477926 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -268,6 +268,11 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) return pool; } +static int threadpool_tps_start(struct ast_taskprocessor_listener *listener) +{ + return 0; +} + /*! * \brief helper used for queued task when tasks are pushed */ @@ -431,6 +436,7 @@ static void threadpool_destroy(void *private_data) */ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { .alloc = threadpool_alloc, + .start = threadpool_tps_start, .task_pushed = threadpool_tps_task_pushed, .emptied = threadpool_tps_emptied, .shutdown = threadpool_tps_shutdown, @@ -623,6 +629,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis pool = tps_listener->private_data; pool->tps = tps; + ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps); ao2_ref(listener, +1); pool->listener = listener; ast_threadpool_set_size(pool, initial_size); -- cgit v1.2.3