diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/cli.c | 6 | ||||
-rw-r--r-- | main/format.c | 22 | ||||
-rw-r--r-- | main/format_cap.c | 8 | ||||
-rw-r--r-- | main/hashtab.c | 2 | ||||
-rw-r--r-- | main/taskprocessor.c | 13 | ||||
-rw-r--r-- | main/threadpool.c | 72 |
6 files changed, 102 insertions, 21 deletions
diff --git a/main/cli.c b/main/cli.c index 0d66f3e48..7f86eab3a 100644 --- a/main/cli.c +++ b/main/cli.c @@ -1076,10 +1076,12 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar char locbuf[40] = "(None)"; char appdata[40] = "(None)"; - if (!cs->context && !cs->exten) + if (!ast_strlen_zero(cs->context) && !ast_strlen_zero(cs->exten)) { snprintf(locbuf, sizeof(locbuf), "%s@%s:%d", cs->exten, cs->context, cs->priority); - if (cs->appl) + } + if (!ast_strlen_zero(cs->appl)) { snprintf(appdata, sizeof(appdata), "%s(%s)", cs->appl, S_OR(cs->data, "")); + } ast_cli(a->fd, FORMAT_STRING, cs->name, locbuf, ast_state2str(cs->state), appdata); } } diff --git a/main/format.c b/main/format.c index 8ac82f0ed..5bf38dfb2 100644 --- a/main/format.c +++ b/main/format.c @@ -302,6 +302,14 @@ const void *ast_format_attribute_get(const struct ast_format *format, const char { const struct ast_format_interface *interface = format->interface; + if (!interface) { + struct format_interface *format_interface = ao2_find(interfaces, format->codec->name, OBJ_SEARCH_KEY); + if (format_interface) { + interface = format_interface->interface; + ao2_ref(format_interface, -1); + } + } + if (!interface || !interface->format_attribute_get) { return NULL; } @@ -330,11 +338,21 @@ struct ast_format *ast_format_parse_sdp_fmtp(const struct ast_format *format, co void ast_format_generate_sdp_fmtp(const struct ast_format *format, unsigned int payload, struct ast_str **str) { - if (!format->interface || !format->interface->format_generate_sdp_fmtp) { + const struct ast_format_interface *interface = format->interface; + + if (!interface) { + struct format_interface *format_interface = ao2_find(interfaces, format->codec->name, OBJ_SEARCH_KEY); + if (format_interface) { + interface = format_interface->interface; + ao2_ref(format_interface, -1); + } + } + + if (!interface || !interface->format_generate_sdp_fmtp) { return; } - format->interface->format_generate_sdp_fmtp(format, payload, str); + interface->format_generate_sdp_fmtp(format, payload, str); } struct ast_codec *ast_format_get_codec(const struct ast_format *format) diff --git a/main/format_cap.c b/main/format_cap.c index 4739efa0a..8d6046ab2 100644 --- a/main/format_cap.c +++ b/main/format_cap.c @@ -265,7 +265,15 @@ int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_typ } format = ast_format_cache_get(codec->name); + + if (format == ast_format_none) { + ao2_ref(format, -1); + ao2_ref(codec, -1); + continue; + } + if (!format || (codec != ast_format_get_codec(format))) { + ao2_cleanup(format); format = ast_format_create(codec); } ao2_ref(codec, -1); diff --git a/main/hashtab.c b/main/hashtab.c index 4b765979f..9b334d4a5 100644 --- a/main/hashtab.c +++ b/main/hashtab.c @@ -745,6 +745,8 @@ struct ast_hashtab_iter *ast_hashtab_start_write_traversal(struct ast_hashtab *t void ast_hashtab_end_traversal(struct ast_hashtab_iter *it) { + if (!it) + return; if (it->tab->do_locking) ast_rwlock_unlock(&it->tab->lock); free(it); diff --git a/main/taskprocessor.c b/main/taskprocessor.c index f382814af..7c50089f2 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap); /*! \brief Remove the front task off the taskprocessor queue */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); -/*! \brief Return the size of the taskprocessor queue */ -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) return task; } -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps) +long ast_taskprocessor_size(struct ast_taskprocessor *tps) { return (tps) ? tps->tps_queue_size : -1; } @@ -716,8 +713,6 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) return NULL; } -#define HIGH_WATER_LEVEL 100 - /* push the task into the taskprocessor queue */ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) { @@ -738,7 +733,7 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; - if (previous_size >= HIGH_WATER_LEVEL && !tps->high_water_warned) { + if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) { ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", tps->name, previous_size); tps->high_water_warned = 1; @@ -765,7 +760,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { struct ast_taskprocessor_local local; struct tps_task *t; - int size; + long size; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -797,7 +792,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) * after we pop an empty stack. */ tps->executing = 0; - size = tps_taskprocessor_depth(tps); + size = ast_taskprocessor_size(tps); /* If we executed a task, bump the stats */ if (tps->stats) { tps->stats->_tasks_processed_count++; diff --git a/main/threadpool.c b/main/threadpool.c index d97a7adb8..60e1e9a3b 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -168,7 +168,7 @@ static void *worker_start(void *arg); static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); static int worker_thread_start(struct worker_thread *worker); static int worker_idle(struct worker_thread *worker); -static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static int worker_set_state(struct worker_thread *worker, enum worker_state state); static void worker_shutdown(struct worker_thread *worker); /*! @@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags) worker->id); return 0; } - worker_set_state(worker, ALIVE); + + if (worker_set_state(worker, ALIVE)) { + ast_debug(1, "Failed to activate thread %d. It is dead\n", + worker->id); + /* The worker thread will no longer exist in the active threads or + * idle threads container after this. + */ + ao2_unlink(pool->active_threads, worker); + } + return CMP_MATCH; } @@ -538,20 +547,33 @@ static int queued_task_pushed(void *data) struct task_pushed_data *tpd = data; struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; + unsigned int existing_active; if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } - if (ao2_container_count(pool->idle_threads) == 0) { + + existing_active = ao2_container_count(pool->active_threads); + + /* The first pass transitions any existing idle threads to be active, and + * will also remove any worker threads that have recently entered the dead + * state. + */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + + /* If no idle threads could be transitioned to active grow the pool as permitted. */ + if (ao2_container_count(pool->active_threads) == existing_active) { if (!pool->options.auto_increment) { + ao2_ref(tpd, -1); return 0; } grow(pool, pool->options.auto_increment); + /* An optional second pass transitions any newly added threads. */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); } - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, - activate_thread, pool); - threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; @@ -797,7 +819,7 @@ static int queued_set_size(void *data) /* We don't count zombie threads as being "live" when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + - ao2_container_count(pool->idle_threads); + ao2_container_count(pool->idle_threads); if (current_size == num_threads) { ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n", @@ -806,6 +828,12 @@ static int queued_set_size(void *data) } if (current_size < num_threads) { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_thread, pool); + + /* As the above may have altered the number of current threads update it */ + current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); grow(pool, num_threads - current_size); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, activate_thread, pool); @@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker) * * The threadpool calls into this function in order to let a worker know * how it should proceed. + * + * \retval -1 failure (state transition not permitted) + * \retval 0 success */ -static void worker_set_state(struct worker_thread *worker, enum worker_state state) +static int worker_set_state(struct worker_thread *worker, enum worker_state state) { SCOPED_MUTEX(lock, &worker->lock); + + switch (state) { + case ALIVE: + /* This can occur due to a race condition between being told to go active + * and an idle timeout happening. + */ + if (worker->state == DEAD) { + return -1; + } + ast_assert(worker->state != ZOMBIE); + break; + case DEAD: + break; + case ZOMBIE: + ast_assert(worker->state != DEAD); + break; + } + worker->state = state; worker->wake_up = 1; ast_cond_signal(&worker->cond); + + return 0; } /*! Serializer group shutdown control object. */ @@ -1346,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast { return ast_threadpool_serializer_group(name, pool, NULL); } + +long ast_threadpool_queue_size(struct ast_threadpool *pool) +{ + return ast_taskprocessor_size(pool->tps); +} |