diff options
Diffstat (limited to 'main')
-rw-r--r-- | main/cdr.c | 3 | ||||
-rw-r--r-- | main/cel.c | 3 | ||||
-rw-r--r-- | main/manager.c | 3 | ||||
-rw-r--r-- | main/sorcery.c | 14 | ||||
-rw-r--r-- | main/stasis.c | 12 | ||||
-rw-r--r-- | main/stasis_message_router.c | 12 | ||||
-rw-r--r-- | main/taskprocessor.c | 172 |
7 files changed, 177 insertions, 42 deletions
diff --git a/main/cdr.c b/main/cdr.c index 8658710d5..586a10684 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -71,6 +71,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_message_router.h" #include "asterisk/astobj2.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <configInfo name="cdr" language="en_US"> @@ -4219,6 +4220,8 @@ int ast_cdr_engine_init(void) if (!stasis_router) { return -1; } + stasis_message_router_set_congestion_limits(stasis_router, -1, + 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) { return -1; diff --git a/main/cel.c b/main/cel.c index 85311d290..ad75c0186 100644 --- a/main/cel.c +++ b/main/cel.c @@ -59,6 +59,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/parking.h" #include "asterisk/pickup.h" #include "asterisk/core_local.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <configInfo name="cel" language="en_US"> @@ -1593,6 +1594,8 @@ static int create_routes(void) if (!cel_state_router) { return -1; } + stasis_message_router_set_congestion_limits(cel_state_router, -1, + 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); ret |= stasis_message_router_add(cel_state_router, stasis_cache_update_type(), diff --git a/main/manager.c b/main/manager.c index 94415b7a0..029da70f7 100644 --- a/main/manager.c +++ b/main/manager.c @@ -100,6 +100,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/rtp_engine.h" #include "asterisk/format_cache.h" #include "asterisk/translate.h" +#include "asterisk/taskprocessor.h" /*** DOCUMENTATION <manager name="Ping" language="en_US"> @@ -8692,6 +8693,8 @@ static int manager_subscriptions_init(void) if (!stasis_router) { return -1; } + stasis_message_router_set_congestion_limits(stasis_router, -1, + 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); res |= stasis_message_router_set_default(stasis_router, manager_default_msg_cb, NULL); diff --git a/main/sorcery.c b/main/sorcery.c index ec340e827..6a7b54bbf 100644 --- a/main/sorcery.c +++ b/main/sorcery.c @@ -1164,6 +1164,20 @@ int __ast_sorcery_object_register(struct ast_sorcery *sorcery, const char *type, return 0; } +int ast_sorcery_object_set_congestion_levels(struct ast_sorcery *sorcery, const char *type, long low_water, long high_water) +{ + struct ast_sorcery_object_type *object_type; + int res = -1; + + object_type = ao2_find(sorcery->types, type, OBJ_SEARCH_KEY); + if (object_type) { + res = ast_taskprocessor_alert_set_levels(object_type->serializer, + low_water, high_water); + ao2_ref(object_type, -1); + } + return res; +} + void ast_sorcery_object_set_copy_handler(struct ast_sorcery *sorcery, const char *type, sorcery_copy_handler copy) { RAII_VAR(struct ast_sorcery_object_type *, object_type, ao2_find(sorcery->types, type, OBJ_KEY), ao2_cleanup); diff --git a/main/stasis.c b/main/stasis.c index 9fe3a2aae..91ad94e76 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -564,6 +564,18 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } +int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription, + long low_water, long high_water) +{ + int res = -1; + + if (subscription) { + res = ast_taskprocessor_alert_set_levels(subscription->mailbox, + low_water, high_water); + } + return res; +} + void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index f60180d68..85034bcf9 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -289,6 +289,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router, ao2_cleanup(router); } +int stasis_message_router_set_congestion_limits(struct stasis_message_router *router, + long low_water, long high_water) +{ + int res = -1; + + if (router) { + res = stasis_subscription_set_congestion_limits(router->subscription, + low_water, high_water); + } + return res; +} + int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 5b8ff08f1..2f0124045 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -76,6 +76,10 @@ struct ast_taskprocessor { void *local_data; /*! \brief Taskprocessor current queue size */ long tps_queue_size; + /*! \brief Taskprocessor low water clear alert level */ + long tps_queue_low; + /*! \brief Taskprocessor high water alert trigger level */ + long tps_queue_high; /*! \brief Taskprocessor queue */ AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue; struct ast_taskprocessor_listener *listener; @@ -85,6 +89,8 @@ struct ast_taskprocessor { unsigned int executing:1; /*! Indicates that a high water warning has been issued on this task processor */ unsigned int high_water_warned:1; + /*! Indicates that a high water alert is active on this taskprocessor */ + unsigned int high_water_alert:1; }; /*! @@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags); /*! \brief The astobj2 compare callback for taskprocessors */ static int tps_cmp_cb(void *obj, void *arg, int flags); -/*! \brief Destroy the taskprocessor when its refcount reaches zero */ -static void tps_taskprocessor_destroy(void *tps); - /*! \brief CLI <example>taskprocessor ping <blah></example> handler function */ static int tps_ping_handler(void *datap); -/*! \brief Remove the front task off the taskprocessor queue */ -static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg struct ao2_container *sorted_tps; struct ast_taskprocessor *tps; struct ao2_iterator iter; -#define FMT_HEADERS "%-45s %10s %10s %10s\n" -#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n" +#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n" +#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n" switch (cmd) { case CLI_INIT: @@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_FAILURE; } - ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth"); + ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water"); tcount = 0; iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK); while ((tps = ao2_iterator_next(&iter))) { @@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg maxqsize = 0; processed = 0; } - ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize); + ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize, + tps->tps_queue_low, tps->tps_queue_high); ast_taskprocessor_unreference(tps); ++tcount; } @@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags) return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0; } +/*! Count of the number of taskprocessors in high water alert. */ +static unsigned int tps_alert_count; + +/*! Access protection for tps_alert_count */ +AST_RWLOCK_DEFINE_STATIC(tps_alert_lock); + +/*! + * \internal + * \brief Add a delta to tps_alert_count with protection. + * \since 13.10.0 + * + * \param tps Taskprocessor updating queue water mark alert trigger. + * \param delta The amount to add to tps_alert_count. + * + * \return Nothing + */ +static void tps_alert_add(struct ast_taskprocessor *tps, int delta) +{ + unsigned int old; + + ast_rwlock_wrlock(&tps_alert_lock); + old = tps_alert_count; + tps_alert_count += delta; + if (DEBUG_ATLEAST(3) + /* and tps_alert_count becomes zero or non-zero */ + && !old != !tps_alert_count) { + ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n", + tps->name, tps_alert_count ? "triggered" : "cleared"); + } + ast_rwlock_unlock(&tps_alert_lock); +} + +unsigned int ast_taskprocessor_alert_get(void) +{ + unsigned int count; + + ast_rwlock_rdlock(&tps_alert_lock); + count = tps_alert_count; + ast_rwlock_unlock(&tps_alert_lock); + + return count; +} + +int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water) +{ + if (!tps || high_water < 0 || high_water < low_water) { + return -1; + } + + if (low_water < 0) { + /* Set low water level to 90% of high water level */ + low_water = (high_water * 9) / 10; + } + + ao2_lock(tps); + + tps->tps_queue_low = low_water; + tps->tps_queue_high = high_water; + + if (tps->high_water_alert) { + if (!tps->tps_queue_size || tps->tps_queue_size < low_water) { + /* Update water mark alert immediately */ + tps->high_water_alert = 0; + tps_alert_add(tps, -1); + } + } else { + if (high_water <= tps->tps_queue_size) { + /* Update water mark alert immediately */ + tps->high_water_alert = 1; + tps_alert_add(tps, +1); + } + } + + ao2_unlock(tps); + + return 0; +} + /* destroy the taskprocessor */ -static void tps_taskprocessor_destroy(void *tps) +static void tps_taskprocessor_dtor(void *tps) { struct ast_taskprocessor *t = tps; struct tps_task *task; - if (!tps) { - ast_log(LOG_ERROR, "missing taskprocessor\n"); - return; + while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) { + tps_task_free(task); } - ast_debug(1, "destroying taskprocessor '%s'\n", t->name); - /* free it */ + t->tps_queue_size = 0; + + if (t->high_water_alert) { + t->high_water_alert = 0; + tps_alert_add(t, -1); + } + ast_free(t->stats); t->stats = NULL; ast_free((char *) t->name); - if (t->listener) { - ao2_ref(t->listener, -1); - t->listener = NULL; - } - while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) { - tps_task_free(task); - } + t->name = NULL; + ao2_cleanup(t->listener); + t->listener = NULL; } /* pop the front task and return it */ @@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) struct tps_task *task; if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { - tps->tps_queue_size--; + --tps->tps_queue_size; + if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) { + tps->high_water_alert = 0; + tps_alert_add(tps, -1); + } } return task; } @@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void) static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener) { - RAII_VAR(struct ast_taskprocessor *, p, - ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup); + struct ast_taskprocessor *p; + p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor); if (!p) { ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); return NULL; } - if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) { - ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name); - return NULL; - } - if (!(p->name = ast_strdup(name))) { + /* Set default congestion water level alert triggers. */ + p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10; + p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + + p->stats = ast_calloc(1, sizeof(*p->stats)); + p->name = ast_strdup(name); + if (!p->stats || !p->name) { + ao2_ref(p, -1); return NULL; } @@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru if (!(ao2_link(tps_singletons, p))) { ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); listener->tps = NULL; - ao2_ref(p, -1); + ao2_ref(p, -2); return NULL; } if (p->listener->callbacks->start(p->listener)) { - ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name); + ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", + p->name); ast_taskprocessor_unreference(p); return NULL; } - /* RAII_VAR will decrement the refcount at the end of the function. - * Since we want to pass back a reference to p, we bump the refcount - */ - ao2_ref(p, +1); return p; - } /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't @@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; - if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) { - ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", - tps->name, previous_size); - tps->high_water_warned = 1; + if (previous_size >= tps->tps_queue_high) { + if (!tps->high_water_warned) { + tps->high_water_warned = 1; + ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", + tps->name, previous_size); + } + if (!tps->high_water_alert) { + tps->high_water_alert = 1; + tps_alert_add(tps, +1); + } } /* The currently executing task counts as still in queue */ |