summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2012-11-08 23:27:16 +0000
committerMark Michelson <mmichelson@digium.com>2012-11-08 23:27:16 +0000
commit77725bf293f470b6697b94472eafb457be83ef14 (patch)
treed92a1a0f57d203184616bce268c83ee470b45d78 /main/taskprocessor.c
parent7c6c20bfc642345f7d2276f1fa89fff4fe90edaf (diff)
Move taskprocessors to use a listener model.
Taskprocessors are now divided into two units: the task queue and their listeners. When a task is added to the queue, the listener is notified and can take whatever action is desired. This means that taskprocessors are no longer confined to having their tasks executed within a single thread. A default taskprocessor listener has been added that mirrors the old taskprocessor behavior. I've tested it by running Asterisk and placing calls. It appears to work as expected. I'm going to do some cleaning up first and then write some unit tests to be sure everything works as expected. git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376118 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c294
1 files changed, 195 insertions, 99 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 912f891b1..b433ca908 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -83,6 +83,7 @@ struct ast_taskprocessor {
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
+ struct ast_taskprocessor_listener *listener;
};
#define TPS_MAX_BUCKETS 7
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
@@ -122,6 +123,83 @@ static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
};
+struct default_taskprocessor_listener_pvt {
+ pthread_t poll_thread;
+ ast_mutex_t lock;
+ ast_cond_t cond;
+ int wake_up;
+ int dead;
+};
+
+static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+{
+ SCOPED_MUTEX(lock, &pvt->lock);
+ pvt->wake_up = 1;
+ pvt->dead = should_die;
+ ast_cond_signal(&pvt->cond);
+}
+
+static void default_listener_destroy(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+ struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+ default_tps_wake_up(pvt, 1);
+ pthread_join(pvt->poll_thread, NULL);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ ast_mutex_destroy(&pvt->lock);
+ ast_cond_destroy(&pvt->cond);
+ ast_free(pvt);
+
+ ao2_ref(listener->tps, -1);
+ listener->tps = NULL;
+}
+
+static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+{
+ SCOPED_MUTEX(lock, &pvt->lock);
+ while (!pvt->wake_up) {
+ ast_cond_wait(&pvt->cond, lock);
+ }
+ pvt->wake_up = 0;
+ return pvt->dead;
+}
+
+/* this is the task processing worker function */
+static void *tps_processing_function(void *data)
+{
+ struct ast_taskprocessor_listener *listener = data;
+ struct ast_taskprocessor *tps = listener->tps;
+ struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+ int dead = 0;
+
+ while (!dead) {
+ if (!ast_taskprocessor_execute(tps)) {
+ dead = default_tps_idle(pvt);
+ }
+ }
+ return NULL;
+}
+
+static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
+{
+ struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
+
+ if (was_empty) {
+ default_tps_wake_up(pvt, 0);
+ }
+}
+
+static void default_emptied(struct ast_taskprocessor_listener *listener)
+{
+ /* No-op */
+}
+
+static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+ .task_pushed = default_task_pushed,
+ .emptied = default_emptied,
+};
+
/*! \internal \brief Clean up resources on Asterisk shutdown */
static void tps_shutdown(void)
{
@@ -286,75 +364,22 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SUCCESS;
}
-/* this is the task processing worker function */
-static void *tps_processing_function(void *data)
-{
- struct ast_taskprocessor *i = data;
- struct tps_task *t;
- int size;
-
- if (!i) {
- ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n");
- return NULL;
- }
-
- while (i->poll_thread_run) {
- ast_mutex_lock(&i->taskprocessor_lock);
- if (!i->poll_thread_run) {
- ast_mutex_unlock(&i->taskprocessor_lock);
- break;
- }
- if (!(size = tps_taskprocessor_depth(i))) {
- ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock);
- if (!i->poll_thread_run) {
- ast_mutex_unlock(&i->taskprocessor_lock);
- break;
- }
- }
- ast_mutex_unlock(&i->taskprocessor_lock);
- /* stuff is in the queue */
- if (!(t = tps_taskprocessor_pop(i))) {
- ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size);
- continue;
- }
- if (!t->execute) {
- ast_log(LOG_WARNING, "Task is missing a function to execute!\n");
- tps_task_free(t);
- continue;
- }
- t->execute(t->datap);
-
- ast_mutex_lock(&i->taskprocessor_lock);
- if (i->stats) {
- i->stats->_tasks_processed_count++;
- if (size > i->stats->max_qsize) {
- i->stats->max_qsize = size;
- }
- }
- ast_mutex_unlock(&i->taskprocessor_lock);
-
- tps_task_free(t);
- }
- while ((t = tps_taskprocessor_pop(i))) {
- tps_task_free(t);
- }
- return NULL;
-}
-
/* hash callback for astobj2 */
static int tps_hash_cb(const void *obj, const int flags)
{
const struct ast_taskprocessor *tps = obj;
+ const char *name = flags & OBJ_KEY ? obj : tps->name;
- return ast_str_case_hash(tps->name);
+ return ast_str_case_hash(name);
}
/* compare callback for astobj2 */
static int tps_cmp_cb(void *obj, void *arg, int flags)
{
struct ast_taskprocessor *lhs = obj, *rhs = arg;
+ const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
- return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0;
+ return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
/* destroy the taskprocessor */
@@ -368,20 +393,21 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* kill it */
- ast_mutex_lock(&t->taskprocessor_lock);
- t->poll_thread_run = 0;
- ast_cond_signal(&t->poll_cond);
- ast_mutex_unlock(&t->taskprocessor_lock);
- pthread_join(t->poll_thread, NULL);
- t->poll_thread = AST_PTHREADT_NULL;
ast_mutex_destroy(&t->taskprocessor_lock);
- ast_cond_destroy(&t->poll_cond);
/* free it */
if (t->stats) {
ast_free(t->stats);
t->stats = NULL;
}
ast_free((char *) t->name);
+ if (t->listener) {
+ /* This code should not be reached since the listener
+ * should have been destroyed before the taskprocessor could
+ * be destroyed
+ */
+ ao2_ref(t->listener, -1);
+ t->listener = NULL;
+ }
}
/* pop the front task and return it */
@@ -416,80 +442,120 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps->name;
}
+static struct ast_taskprocessor_listener *default_listener_alloc(void)
+{
+ struct ast_taskprocessor_listener *listener;
+ struct default_taskprocessor_listener_pvt *pvt;
+
+ listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+ if (!listener) {
+ return NULL;
+ }
+ pvt = ast_calloc(1, sizeof(*pvt));
+ if (!pvt) {
+ ao2_ref(listener, -1);
+ return NULL;
+ }
+ listener->callbacks = &default_listener_callbacks;
+ listener->private_data = pvt;
+ ast_cond_init(&pvt->cond, NULL);
+ ast_mutex_init(&pvt->lock);
+ pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+ ao2_ref(listener, -1);
+ return NULL;
+ }
+ return listener;
+}
+
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
{
- struct ast_taskprocessor *p, tmp_tps = {
- .name = name,
- };
+ struct ast_taskprocessor *p;
+ struct ast_taskprocessor_listener *listener;
if (ast_strlen_zero(name)) {
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
return NULL;
}
- ao2_lock(tps_singletons);
- p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER);
+ p = ao2_find(tps_singletons, name, OBJ_KEY);
if (p) {
ao2_unlock(tps_singletons);
return p;
}
if (create & TPS_REF_IF_EXISTS) {
/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
- ao2_unlock(tps_singletons);
return NULL;
}
- /* create a new taskprocessor */
- if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) {
- ao2_unlock(tps_singletons);
+ /* Create a new taskprocessor. Start by creating a default listener */
+ listener = default_listener_alloc();
+
+ p = ast_taskprocessor_create_with_listener(name, listener);
+ ao2_ref(listener, -1);
+ return p;
+
+}
+
+struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
+{
+ RAII_VAR(struct ast_taskprocessor *, p,
+ ao2_alloc(sizeof(*p), tps_taskprocessor_destroy),
+ ao2_cleanup);
+
+ if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
}
- ast_cond_init(&p->poll_cond, NULL);
- ast_mutex_init(&p->taskprocessor_lock);
-
if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
- ao2_unlock(tps_singletons);
ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
- ao2_ref(p, -1);
return NULL;
}
if (!(p->name = ast_strdup(name))) {
- ao2_unlock(tps_singletons);
- ao2_ref(p, -1);
- return NULL;
- }
- p->poll_thread_run = 1;
- p->poll_thread = AST_PTHREADT_NULL;
- if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) {
- ao2_unlock(tps_singletons);
- ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name);
ao2_ref(p, -1);
return NULL;
}
+
+ ao2_ref(listener, +1);
+ p->listener = listener;
+
+ ao2_ref(p, +1);
+ listener->tps = p;
+
if (!(ao2_link(tps_singletons, p))) {
- ao2_unlock(tps_singletons);
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
- ao2_ref(p, -1);
return NULL;
}
- ao2_unlock(tps_singletons);
+
+ /* RAII_VAR will decrement the refcount at the end of the function.
+ * Since we want to pass back a reference to p, we bump the refcount
+ */
+ ao2_ref(p, +1);
return p;
}
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
- if (tps) {
- ao2_lock(tps_singletons);
- ao2_unlink(tps_singletons, tps);
- if (ao2_ref(tps, -1) > 1) {
- ao2_link(tps_singletons, tps);
- }
- ao2_unlock(tps_singletons);
+ struct ast_taskprocessor_listener *listener;
+ if (!tps) {
+ return NULL;
}
+
+ if (ao2_ref(tps, -1) > 3) {
+ return NULL;
+ }
+ /* If we're down to 3 references, then those must be:
+ * 1. The reference we just got rid of
+ * 2. The container
+ * 3. The listener
+ */
+ ao2_unlink(tps_singletons, tps);
+ listener = tps->listener;
+ tps->listener = NULL;
+ ao2_ref(listener, -1);
return NULL;
}
@@ -497,6 +563,7 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
+ int previous_size;
if (!tps || !task_exe) {
ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
@@ -508,9 +575,38 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
}
ast_mutex_lock(&tps->taskprocessor_lock);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
- tps->tps_queue_size++;
- ast_cond_signal(&tps->poll_cond);
+ previous_size = tps->tps_queue_size++;
ast_mutex_unlock(&tps->taskprocessor_lock);
+ tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
return 0;
}
+int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
+{
+ struct tps_task *t;
+ int size;
+
+ if (!(t = tps_taskprocessor_pop(tps))) {
+ return 0;
+ }
+
+ t->execute(t->datap);
+
+ tps_task_free(t);
+
+ ast_mutex_lock(&tps->taskprocessor_lock);
+ size = tps_taskprocessor_depth(tps);
+ if (tps->stats) {
+ tps->stats->_tasks_processed_count++;
+ if (size > tps->stats->max_qsize) {
+ tps->stats->max_qsize = size;
+ }
+ }
+ ast_mutex_unlock(&tps->taskprocessor_lock);
+
+ if (size == 0) {
+ tps->listener->callbacks->emptied(tps->listener);
+ return 0;
+ }
+ return 1;
+}