summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c234
1 files changed, 164 insertions, 70 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index a8d1c80f9..189219d66 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
+#include "asterisk/sem.h"
/*!
* \brief tps_task structure is queued to a taskprocessor
@@ -47,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
- int (*execute)(void *datap);
+ union {
+ int (*execute)(void *datap);
+ int (*execute_local)(struct ast_taskprocessor_local *local);
+ } callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
+ unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -68,6 +73,7 @@ struct ast_taskprocessor {
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
+ void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
@@ -113,9 +119,6 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
-/*! \brief The task processing function executed by a taskprocessor */
-static void *tps_processing_function(void *data);
-
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy(void *tps);
@@ -138,47 +141,56 @@ static struct ast_cli_entry taskprocessor_clis[] = {
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread;
- ast_mutex_t lock;
- ast_cond_t cond;
- int wake_up;
int dead;
+ struct ast_sem sem;
};
-
-static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
+static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- pvt->wake_up = 1;
- pvt->dead = should_die;
- ast_cond_signal(&pvt->cond);
+ ast_assert(pvt->dead);
+ ast_sem_destroy(&pvt->sem);
+ ast_free(pvt);
}
-static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
+static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
{
- SCOPED_MUTEX(lock, &pvt->lock);
- while (!pvt->wake_up) {
- ast_cond_wait(&pvt->cond, lock);
- }
- pvt->wake_up = 0;
- return pvt->dead;
+ struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
+
+ default_listener_pvt_destroy(pvt);
+
+ listener->user_data = NULL;
}
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
-static void *tps_processing_function(void *data)
+static void *default_tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- int dead = 0;
-
- while (!dead) {
- if (!ast_taskprocessor_execute(tps)) {
- dead = default_tps_idle(pvt);
+ int sem_value;
+ int res;
+
+ while (!pvt->dead) {
+ res = ast_sem_wait(&pvt->sem);
+ if (res != 0 && errno != EINTR) {
+ ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
+ strerror(errno));
+ /* Just give up */
+ break;
}
+ ast_taskprocessor_execute(tps);
}
+
+ /* No posting to a dead taskprocessor! */
+ res = ast_sem_getvalue(&pvt->sem, &sem_value);
+ ast_assert(res == 0 && sem_value == 0);
+
+ /* Free the shutdown reference (see default_listener_shutdown) */
+ ao2_t_ref(listener->tps, -1, "tps-shutdown");
+
return NULL;
}
@@ -186,7 +198,7 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
+ if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
return -1;
}
@@ -197,33 +209,50 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- ast_assert(!pvt->dead);
-
- if (was_empty) {
- default_tps_wake_up(pvt, 0);
+ if (ast_sem_post(&pvt->sem) != 0) {
+ ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
+ strerror(errno));
}
}
-static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
+static int default_listener_die(void *data)
{
- ast_mutex_destroy(&pvt->lock);
- ast_cond_destroy(&pvt->cond);
- ast_free(pvt);
+ struct default_taskprocessor_listener_pvt *pvt = data;
+ pvt->dead = 1;
+ return 0;
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- default_tps_wake_up(pvt, 1);
- pthread_join(pvt->poll_thread, NULL);
+ int res;
+
+ /* Hold a reference during shutdown */
+ ao2_t_ref(listener->tps, +1, "tps-shutdown");
+
+ ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
+
+ if (pthread_self() == pvt->poll_thread) {
+ res = pthread_detach(pvt->poll_thread);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_detach(): %s\n",
+ strerror(errno));
+ }
+ } else {
+ res = pthread_join(pvt->poll_thread, NULL);
+ if (res != 0) {
+ ast_log(LOG_ERROR, "pthread_join(): %s\n",
+ strerror(errno));
+ }
+ }
pvt->poll_thread = AST_PTHREADT_NULL;
- default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
+ .dtor = default_listener_pvt_dtor,
};
/*!
@@ -258,19 +287,48 @@ int ast_tps_init(void)
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
- if ((t = ast_calloc(1, sizeof(*t)))) {
- t->execute = task_exe;
- t->datap = datap;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute = task_exe;
+ t->datap = datap;
+
+ return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+ struct tps_task *t;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
}
+
+ t->callback.execute_local = task_exe;
+ t->datap = datap;
+ t->wants_local = 1;
+
return t;
}
/* release task resources */
static void *tps_task_free(struct tps_task *task)
{
- if (task) {
- ast_free(task);
- }
+ ast_free(task);
return NULL;
}
@@ -425,16 +483,10 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* free it */
- if (t->stats) {
- ast_free(t->stats);
- t->stats = NULL;
- }
+ ast_free(t->stats);
+ t->stats = NULL;
ast_free((char *) t->name);
if (t->listener) {
- /* This code should not be reached since the listener
- * should have been destroyed before the taskprocessor could
- * be destroyed
- */
ao2_ref(t->listener, -1);
t->listener = NULL;
}
@@ -447,7 +499,6 @@ static void tps_taskprocessor_destroy(void *tps)
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
{
struct tps_task *task;
- SCOPED_AO2LOCK(lock, tps);
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
@@ -476,10 +527,21 @@ static void listener_shutdown(struct ast_taskprocessor_listener *listener)
ao2_ref(listener->tps, -1);
}
+static void taskprocessor_listener_dtor(void *obj)
+{
+ struct ast_taskprocessor_listener *listener = obj;
+
+ if (listener->callbacks->dtor) {
+ listener->callbacks->dtor(listener);
+ }
+}
+
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
- ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
+ NULL, ao2_cleanup);
+
+ listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
if (!listener) {
return NULL;
@@ -510,9 +572,12 @@ static void *default_listener_pvt_alloc(void)
if (!pvt) {
return NULL;
}
- ast_cond_init(&pvt->cond, NULL);
- ast_mutex_init(&pvt->lock);
pvt->poll_thread = AST_PTHREADT_NULL;
+ if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
+ ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
+ ast_free(pvt);
+ return NULL;
+ }
return pvt;
}
@@ -594,7 +659,6 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
p = __allocate_taskprocessor(name, listener);
if (!p) {
- default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@@ -615,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
return __allocate_taskprocessor(name, listener);
}
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data)
+{
+ SCOPED_AO2LOCK(lock, tps);
+ tps->local_data = local_data;
+}
+
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
@@ -636,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
- struct tps_task *t;
int previous_size;
int was_empty;
- if (!tps || !task_exe) {
- ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+ if (!tps) {
+ ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
- if (!(t = tps_task_alloc(task_exe, datap))) {
- ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
+
+ if (!t) {
+ ast_log(LOG_ERROR, "t is NULL!\n");
return -1;
}
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
@@ -660,21 +732,43 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
return 0;
}
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
+ struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
ao2_lock(tps);
+ t = tps_taskprocessor_pop(tps);
+ if (!t) {
+ ao2_unlock(tps);
+ return 0;
+ }
+
tps->executing = 1;
- ao2_unlock(tps);
- t = tps_taskprocessor_pop(tps);
+ if (t->wants_local) {
+ local.local_data = tps->local_data;
+ local.data = t->datap;
+ }
+ ao2_unlock(tps);
- if (t) {
- t->execute(t->datap);
- tps_task_free(t);
+ if (t->wants_local) {
+ t->callback.execute_local(&local);
+ } else {
+ t->callback.execute(t->datap);
}
+ tps_task_free(t);
ao2_lock(tps);
/* We need to check size in the same critical section where we reset the
@@ -684,7 +778,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
tps->executing = 0;
size = tps_taskprocessor_depth(tps);
/* If we executed a task, bump the stats */
- if (t && tps->stats) {
+ if (tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@@ -693,7 +787,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
- if (t && size == 0 && tps->listener->callbacks->emptied) {
+ if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
return size > 0;