diff options
Diffstat (limited to 'main/threadpool.c')
-rw-r--r-- | main/threadpool.c | 146 |
1 files changed, 132 insertions, 14 deletions
diff --git a/main/threadpool.c b/main/threadpool.c index 5ed9a42d1..1443716d4 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -46,6 +46,11 @@ struct ast_threadpool { */ struct ao2_container *idle_threads; /*! + * \brief The container of zombie threads. + * Zombie threads may be running tasks, but they are scheduled to die soon + */ + struct ao2_container *zombie_threads; + /*! * \brief The main taskprocessor * * Tasks that are queued in this taskprocessor are @@ -79,7 +84,7 @@ struct ast_threadpool { * This is done for three main reasons * 1) It ensures that listeners are given an accurate portrayal * of the threadpool's current state. In other words, when a listener - * gets told a count of active and idle threads, it does not + * gets told a count of active, idle and zombie threads, it does not * need to worry that internal state of the threadpool might be different * from what it has been told. * 2) It minimizes the locking required in both the threadpool and in @@ -96,7 +101,20 @@ struct ast_threadpool { enum worker_state { /*! The worker is either active or idle */ ALIVE, - /*! The worker has been asked to shut down. */ + /*! + * The worker has been asked to shut down but + * may still be in the process of executing tasks. + * This transition happens when the threadpool needs + * to shrink and needs to kill active threads in order + * to do so. + */ + ZOMBIE, + /*! + * The worker has been asked to shut down. Typically + * only idle threads go to this state directly, but + * active threads may go straight to this state when + * the threadpool is shut down. + */ DEAD, }; @@ -202,6 +220,41 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool, } /*! + * \brief Kill a zombie thread + * + * This runs from the threadpool's control taskprocessor thread. + * + * \param data A thread_worker_pair containing the threadpool and the zombie thread + * \return 0 + */ +static int queued_zombie_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->zombie_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +/*! + * \brief Queue a task to kill a zombie thread + * + * This is called by a worker thread when it acknowledges that it is time for + * it to die. + */ +static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); +} + +/*! * \brief Execute a task in the threadpool * * This is the function that worker threads call in order to execute tasks @@ -263,6 +316,10 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) if (!pool->idle_threads) { return NULL; } + pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->zombie_threads) { + return NULL; + } ao2_ref(pool, +1); return pool; @@ -413,6 +470,7 @@ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) ao2_cleanup(pool->active_threads); ao2_cleanup(pool->idle_threads); + ao2_cleanup(pool->zombie_threads); } /*! @@ -459,6 +517,7 @@ static void grow(struct ast_threadpool *pool, int delta) return; } ao2_link(pool->active_threads, worker); + ao2_ref(worker, -1); } } @@ -478,7 +537,46 @@ static int kill_threads(void *obj, void *arg, int flags) { int *num_to_kill = arg; - if ((*num_to_kill)-- > 0) { + ast_log(LOG_NOTICE, "num to kill is %d\n", *num_to_kill); + + if (*num_to_kill > 0) { + --(*num_to_kill); + ast_log(LOG_NOTICE, "Should be killing a thread\n"); + return CMP_MATCH; + } else { + return CMP_STOP; + } +} + +/*! + * \brief ao2 callback to zombify a set number of threads. + * + * Threads will be zombified as long as as the counter has not reached + * zero. The counter is decremented with each thread that is zombified. + * + * Zombifying a thread involves removing it from its current container, + * adding it to the zombie container, and changing the state of the + * worker to a zombie + * + * This callback is called from the threadpool control taskprocessor thread. + * + * \param obj The worker thread that may be zombified + * \param arg The pool to which the worker belongs + * \param data The counter + * \param flags Unused + * \retval CMP_MATCH The zombified thread should be removed from its current container + * \retval CMP_STOP Stop attempting to zombify threads + */ +static int zombify_threads(void *obj, void *arg, void *data, int flags) +{ + struct worker_thread *worker = obj; + struct ast_threadpool *pool = arg; + int *num_to_zombify = data; + + if ((*num_to_zombify)-- > 0) { + ast_log(LOG_NOTICE, "Should be zombifying a thread\n"); + ao2_link(pool->zombie_threads, worker); + worker_set_state(worker, ZOMBIE); return CMP_MATCH; } else { return CMP_STOP; @@ -490,7 +588,7 @@ static int kill_threads(void *obj, void *arg, int flags) * * The preference is to kill idle threads. However, if there are * more threads to remove than there are idle threads, then active - * threads will be removed too. + * threads will be zombified instead. * * This function is called from the threadpool control taskprocessor thread. * @@ -499,15 +597,21 @@ static int kill_threads(void *obj, void *arg, int flags) */ static void shrink(struct ast_threadpool *pool, int delta) { + /* + * Preference is to kill idle threads, but + * we'll move on to deactivating active threads + * if we have to + */ int idle_threads = ao2_container_count(pool->idle_threads); int idle_threads_to_kill = MIN(delta, idle_threads); - int active_threads_to_kill = delta - idle_threads_to_kill; + int active_threads_to_zombify = delta - idle_threads_to_kill; - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK, + ast_log(LOG_NOTICE, "Going to kill off %d idle threads\n", idle_threads_to_kill); + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, kill_threads, &idle_threads_to_kill); - ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK, - kill_threads, &active_threads_to_kill); + ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + zombify_threads, pool, &active_threads_to_zombify); } /*! @@ -553,20 +657,22 @@ static int queued_set_size(void *data) { struct set_size_data *ssd = data; struct ast_threadpool *pool = ssd->pool; - unsigned int new_size = ssd->size; + unsigned int num_threads = ssd->size; + + /* We don't count zombie threads as being "live when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + ao2_container_count(pool->idle_threads); - if (current_size == new_size) { + if (current_size == num_threads) { ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n", - new_size, current_size); + num_threads, current_size); return 0; } - if (current_size < new_size) { - grow(pool, new_size - current_size); + if (current_size < num_threads) { + grow(pool, num_threads - current_size); } else { - shrink(pool, current_size - new_size); + shrink(pool, current_size - num_threads); } threadpool_send_state_changed(pool); @@ -788,6 +894,18 @@ static void worker_active(struct worker_thread *worker) alive = worker_idle(worker); } } + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } } /*! |