summaryrefslogtreecommitdiff
path: root/main
diff options
context:
space:
mode:
Diffstat (limited to 'main')
-rw-r--r--main/cli.c6
-rw-r--r--main/format.c22
-rw-r--r--main/format_cap.c8
-rw-r--r--main/hashtab.c2
-rw-r--r--main/taskprocessor.c13
-rw-r--r--main/threadpool.c72
6 files changed, 102 insertions, 21 deletions
diff --git a/main/cli.c b/main/cli.c
index 0d66f3e48..7f86eab3a 100644
--- a/main/cli.c
+++ b/main/cli.c
@@ -1076,10 +1076,12 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
char locbuf[40] = "(None)";
char appdata[40] = "(None)";
- if (!cs->context && !cs->exten)
+ if (!ast_strlen_zero(cs->context) && !ast_strlen_zero(cs->exten)) {
snprintf(locbuf, sizeof(locbuf), "%s@%s:%d", cs->exten, cs->context, cs->priority);
- if (cs->appl)
+ }
+ if (!ast_strlen_zero(cs->appl)) {
snprintf(appdata, sizeof(appdata), "%s(%s)", cs->appl, S_OR(cs->data, ""));
+ }
ast_cli(a->fd, FORMAT_STRING, cs->name, locbuf, ast_state2str(cs->state), appdata);
}
}
diff --git a/main/format.c b/main/format.c
index 8ac82f0ed..5bf38dfb2 100644
--- a/main/format.c
+++ b/main/format.c
@@ -302,6 +302,14 @@ const void *ast_format_attribute_get(const struct ast_format *format, const char
{
const struct ast_format_interface *interface = format->interface;
+ if (!interface) {
+ struct format_interface *format_interface = ao2_find(interfaces, format->codec->name, OBJ_SEARCH_KEY);
+ if (format_interface) {
+ interface = format_interface->interface;
+ ao2_ref(format_interface, -1);
+ }
+ }
+
if (!interface || !interface->format_attribute_get) {
return NULL;
}
@@ -330,11 +338,21 @@ struct ast_format *ast_format_parse_sdp_fmtp(const struct ast_format *format, co
void ast_format_generate_sdp_fmtp(const struct ast_format *format, unsigned int payload, struct ast_str **str)
{
- if (!format->interface || !format->interface->format_generate_sdp_fmtp) {
+ const struct ast_format_interface *interface = format->interface;
+
+ if (!interface) {
+ struct format_interface *format_interface = ao2_find(interfaces, format->codec->name, OBJ_SEARCH_KEY);
+ if (format_interface) {
+ interface = format_interface->interface;
+ ao2_ref(format_interface, -1);
+ }
+ }
+
+ if (!interface || !interface->format_generate_sdp_fmtp) {
return;
}
- format->interface->format_generate_sdp_fmtp(format, payload, str);
+ interface->format_generate_sdp_fmtp(format, payload, str);
}
struct ast_codec *ast_format_get_codec(const struct ast_format *format)
diff --git a/main/format_cap.c b/main/format_cap.c
index 4739efa0a..8d6046ab2 100644
--- a/main/format_cap.c
+++ b/main/format_cap.c
@@ -265,7 +265,15 @@ int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_typ
}
format = ast_format_cache_get(codec->name);
+
+ if (format == ast_format_none) {
+ ao2_ref(format, -1);
+ ao2_ref(codec, -1);
+ continue;
+ }
+
if (!format || (codec != ast_format_get_codec(format))) {
+ ao2_cleanup(format);
format = ast_format_create(codec);
}
ao2_ref(codec, -1);
diff --git a/main/hashtab.c b/main/hashtab.c
index 4b765979f..9b334d4a5 100644
--- a/main/hashtab.c
+++ b/main/hashtab.c
@@ -745,6 +745,8 @@ struct ast_hashtab_iter *ast_hashtab_start_write_traversal(struct ast_hashtab *t
void ast_hashtab_end_traversal(struct ast_hashtab_iter *it)
{
+ if (!it)
+ return;
if (it->tab->do_locking)
ast_rwlock_unlock(&it->tab->lock);
free(it);
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index f382814af..7c50089f2 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
return task;
}
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+long ast_taskprocessor_size(struct ast_taskprocessor *tps)
{
return (tps) ? tps->tps_queue_size : -1;
}
@@ -716,8 +713,6 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
return NULL;
}
-#define HIGH_WATER_LEVEL 100
-
/* push the task into the taskprocessor queue */
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
@@ -738,7 +733,7 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
- if (previous_size >= HIGH_WATER_LEVEL && !tps->high_water_warned) {
+ if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
tps->name, previous_size);
tps->high_water_warned = 1;
@@ -765,7 +760,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
struct tps_task *t;
- int size;
+ long size;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@@ -797,7 +792,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
* after we pop an empty stack.
*/
tps->executing = 0;
- size = tps_taskprocessor_depth(tps);
+ size = ast_taskprocessor_size(tps);
/* If we executed a task, bump the stats */
if (tps->stats) {
tps->stats->_tasks_processed_count++;
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..60e1e9a3b 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */
@@ -1346,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
{
return ast_threadpool_serializer_group(name, pool, NULL);
}
+
+long ast_threadpool_queue_size(struct ast_threadpool *pool)
+{
+ return ast_taskprocessor_size(pool->tps);
+}