diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/taskprocessor.c | 294 |
1 files changed, 195 insertions, 99 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 912f891b1..b433ca908 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -83,6 +83,7 @@ struct ast_taskprocessor { AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue; /*! \brief Taskprocessor singleton list entry */ AST_LIST_ENTRY(ast_taskprocessor) list; + struct ast_taskprocessor_listener *listener; }; #define TPS_MAX_BUCKETS 7 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */ @@ -122,6 +123,83 @@ static struct ast_cli_entry taskprocessor_clis[] = { AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"), }; +struct default_taskprocessor_listener_pvt { + pthread_t poll_thread; + ast_mutex_t lock; + ast_cond_t cond; + int wake_up; + int dead; +}; + +static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die) +{ + SCOPED_MUTEX(lock, &pvt->lock); + pvt->wake_up = 1; + pvt->dead = should_die; + ast_cond_signal(&pvt->cond); +} + +static void default_listener_destroy(void *obj) +{ + struct ast_taskprocessor_listener *listener = obj; + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + + default_tps_wake_up(pvt, 1); + pthread_join(pvt->poll_thread, NULL); + pvt->poll_thread = AST_PTHREADT_NULL; + ast_mutex_destroy(&pvt->lock); + ast_cond_destroy(&pvt->cond); + ast_free(pvt); + + ao2_ref(listener->tps, -1); + listener->tps = NULL; +} + +static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt) +{ + SCOPED_MUTEX(lock, &pvt->lock); + while (!pvt->wake_up) { + ast_cond_wait(&pvt->cond, lock); + } + pvt->wake_up = 0; + return pvt->dead; +} + +/* this is the task processing worker function */ +static void *tps_processing_function(void *data) +{ + struct ast_taskprocessor_listener *listener = data; + struct ast_taskprocessor *tps = listener->tps; + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + int dead = 0; + + while (!dead) { + if (!ast_taskprocessor_execute(tps)) { + dead = default_tps_idle(pvt); + } + } + return NULL; +} + +static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + + if (was_empty) { + default_tps_wake_up(pvt, 0); + } +} + +static void default_emptied(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ +} + +static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { + .task_pushed = default_task_pushed, + .emptied = default_emptied, +}; + /*! \internal \brief Clean up resources on Asterisk shutdown */ static void tps_shutdown(void) { @@ -286,75 +364,22 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SUCCESS; } -/* this is the task processing worker function */ -static void *tps_processing_function(void *data) -{ - struct ast_taskprocessor *i = data; - struct tps_task *t; - int size; - - if (!i) { - ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n"); - return NULL; - } - - while (i->poll_thread_run) { - ast_mutex_lock(&i->taskprocessor_lock); - if (!i->poll_thread_run) { - ast_mutex_unlock(&i->taskprocessor_lock); - break; - } - if (!(size = tps_taskprocessor_depth(i))) { - ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock); - if (!i->poll_thread_run) { - ast_mutex_unlock(&i->taskprocessor_lock); - break; - } - } - ast_mutex_unlock(&i->taskprocessor_lock); - /* stuff is in the queue */ - if (!(t = tps_taskprocessor_pop(i))) { - ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size); - continue; - } - if (!t->execute) { - ast_log(LOG_WARNING, "Task is missing a function to execute!\n"); - tps_task_free(t); - continue; - } - t->execute(t->datap); - - ast_mutex_lock(&i->taskprocessor_lock); - if (i->stats) { - i->stats->_tasks_processed_count++; - if (size > i->stats->max_qsize) { - i->stats->max_qsize = size; - } - } - ast_mutex_unlock(&i->taskprocessor_lock); - - tps_task_free(t); - } - while ((t = tps_taskprocessor_pop(i))) { - tps_task_free(t); - } - return NULL; -} - /* hash callback for astobj2 */ static int tps_hash_cb(const void *obj, const int flags) { const struct ast_taskprocessor *tps = obj; + const char *name = flags & OBJ_KEY ? obj : tps->name; - return ast_str_case_hash(tps->name); + return ast_str_case_hash(name); } /* compare callback for astobj2 */ static int tps_cmp_cb(void *obj, void *arg, int flags) { struct ast_taskprocessor *lhs = obj, *rhs = arg; + const char *rhsname = flags & OBJ_KEY ? arg : rhs->name; - return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0; + return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0; } /* destroy the taskprocessor */ @@ -368,20 +393,21 @@ static void tps_taskprocessor_destroy(void *tps) } ast_debug(1, "destroying taskprocessor '%s'\n", t->name); /* kill it */ - ast_mutex_lock(&t->taskprocessor_lock); - t->poll_thread_run = 0; - ast_cond_signal(&t->poll_cond); - ast_mutex_unlock(&t->taskprocessor_lock); - pthread_join(t->poll_thread, NULL); - t->poll_thread = AST_PTHREADT_NULL; ast_mutex_destroy(&t->taskprocessor_lock); - ast_cond_destroy(&t->poll_cond); /* free it */ if (t->stats) { ast_free(t->stats); t->stats = NULL; } ast_free((char *) t->name); + if (t->listener) { + /* This code should not be reached since the listener + * should have been destroyed before the taskprocessor could + * be destroyed + */ + ao2_ref(t->listener, -1); + t->listener = NULL; + } } /* pop the front task and return it */ @@ -416,80 +442,120 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } +static struct ast_taskprocessor_listener *default_listener_alloc(void) +{ + struct ast_taskprocessor_listener *listener; + struct default_taskprocessor_listener_pvt *pvt; + + listener = ao2_alloc(sizeof(*listener), default_listener_destroy); + if (!listener) { + return NULL; + } + pvt = ast_calloc(1, sizeof(*pvt)); + if (!pvt) { + ao2_ref(listener, -1); + return NULL; + } + listener->callbacks = &default_listener_callbacks; + listener->private_data = pvt; + ast_cond_init(&pvt->cond, NULL); + ast_mutex_init(&pvt->lock); + pvt->poll_thread = AST_PTHREADT_NULL; + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { + ao2_ref(listener, -1); + return NULL; + } + return listener; +} + /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't * create the taskprocessor if we were told via ast_tps_options to return a reference only * if it already exists */ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create) { - struct ast_taskprocessor *p, tmp_tps = { - .name = name, - }; + struct ast_taskprocessor *p; + struct ast_taskprocessor_listener *listener; if (ast_strlen_zero(name)) { ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n"); return NULL; } - ao2_lock(tps_singletons); - p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER); + p = ao2_find(tps_singletons, name, OBJ_KEY); if (p) { ao2_unlock(tps_singletons); return p; } if (create & TPS_REF_IF_EXISTS) { /* calling function does not want a new taskprocessor to be created if it doesn't already exist */ - ao2_unlock(tps_singletons); return NULL; } - /* create a new taskprocessor */ - if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) { - ao2_unlock(tps_singletons); + /* Create a new taskprocessor. Start by creating a default listener */ + listener = default_listener_alloc(); + + p = ast_taskprocessor_create_with_listener(name, listener); + ao2_ref(listener, -1); + return p; + +} + +struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener) +{ + RAII_VAR(struct ast_taskprocessor *, p, + ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), + ao2_cleanup); + + if (!p) { ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); return NULL; } - ast_cond_init(&p->poll_cond, NULL); - ast_mutex_init(&p->taskprocessor_lock); - if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) { - ao2_unlock(tps_singletons); ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name); - ao2_ref(p, -1); return NULL; } if (!(p->name = ast_strdup(name))) { - ao2_unlock(tps_singletons); - ao2_ref(p, -1); - return NULL; - } - p->poll_thread_run = 1; - p->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) { - ao2_unlock(tps_singletons); - ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name); ao2_ref(p, -1); return NULL; } + + ao2_ref(listener, +1); + p->listener = listener; + + ao2_ref(p, +1); + listener->tps = p; + if (!(ao2_link(tps_singletons, p))) { - ao2_unlock(tps_singletons); ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); - ao2_ref(p, -1); return NULL; } - ao2_unlock(tps_singletons); + + /* RAII_VAR will decrement the refcount at the end of the function. + * Since we want to pass back a reference to p, we bump the refcount + */ + ao2_ref(p, +1); return p; } /* decrement the taskprocessor reference count and unlink from the container if necessary */ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) { - if (tps) { - ao2_lock(tps_singletons); - ao2_unlink(tps_singletons, tps); - if (ao2_ref(tps, -1) > 1) { - ao2_link(tps_singletons, tps); - } - ao2_unlock(tps_singletons); + struct ast_taskprocessor_listener *listener; + if (!tps) { + return NULL; } + + if (ao2_ref(tps, -1) > 3) { + return NULL; + } + /* If we're down to 3 references, then those must be: + * 1. The reference we just got rid of + * 2. The container + * 3. The listener + */ + ao2_unlink(tps_singletons, tps); + listener = tps->listener; + tps->listener = NULL; + ao2_ref(listener, -1); return NULL; } @@ -497,6 +563,7 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) { struct tps_task *t; + int previous_size; if (!tps || !task_exe) { ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor"); @@ -508,9 +575,38 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * } ast_mutex_lock(&tps->taskprocessor_lock); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); - tps->tps_queue_size++; - ast_cond_signal(&tps->poll_cond); + previous_size = tps->tps_queue_size++; ast_mutex_unlock(&tps->taskprocessor_lock); + tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1); return 0; } +int ast_taskprocessor_execute(struct ast_taskprocessor *tps) +{ + struct tps_task *t; + int size; + + if (!(t = tps_taskprocessor_pop(tps))) { + return 0; + } + + t->execute(t->datap); + + tps_task_free(t); + + ast_mutex_lock(&tps->taskprocessor_lock); + size = tps_taskprocessor_depth(tps); + if (tps->stats) { + tps->stats->_tasks_processed_count++; + if (size > tps->stats->max_qsize) { + tps->stats->max_qsize = size; + } + } + ast_mutex_unlock(&tps->taskprocessor_lock); + + if (size == 0) { + tps->listener->callbacks->emptied(tps->listener); + return 0; + } + return 1; +} |