summaryrefslogtreecommitdiff
path: root/main/taskprocessor.c
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2016-06-02 16:08:19 -0500
committerRichard Mudgett <rmudgett@digium.com>2016-06-09 10:32:07 -0500
commit2cd67d5b07d28891ea7c86aece702a4d6e436afc (patch)
tree22a35eaf8088ed377a0f1acc1979bde11a0c6543 /main/taskprocessor.c
parentc966a035e03f1dba94fb24c4a7ed56133425c85a (diff)
taskprocessors: Implement high/low water mark alerts.
When taskprocessors get backed up, there is a good chance that we are being overloaded and need to defer adding new work to the system. * Implemented a high/low water alert mechanism for modules to check if the system is being overloaded and take appropriate action. When a taskprocessor is created it has default congestion levels set. A taskprocessor can later have those congestion levels altered for specific needs if stress testing shows that the taskprocessor is a symptom of overloading or needs to handle bursty activity without triggering an overload alert. * Add CLI "core show taskprocessor" low/high water columns. * Fixed __allocate_taskprocessor() to not use RAII_VAR(). RAII_VAR() was never a good thing to use when creating a taskprocessor because of the nature of how its references needed to be cleaned up on a partial creation. * Made res_pjsip's distributor check if the taskprocessor overload alert is active before placing a message representing brand new work onto a distributor serializer. ASTERISK-26088 Reported by: Richard Mudgett Change-Id: I182f1be603529cd665958661c4c05ff9901825fa
Diffstat (limited to 'main/taskprocessor.c')
-rw-r--r--main/taskprocessor.c172
1 files changed, 130 insertions, 42 deletions
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 5b8ff08f1..2f0124045 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -76,6 +76,10 @@ struct ast_taskprocessor {
void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
+ /*! \brief Taskprocessor low water clear alert level */
+ long tps_queue_low;
+ /*! \brief Taskprocessor high water alert trigger level */
+ long tps_queue_high;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
struct ast_taskprocessor_listener *listener;
@@ -85,6 +89,8 @@ struct ast_taskprocessor {
unsigned int executing:1;
/*! Indicates that a high water warning has been issued on this task processor */
unsigned int high_water_warned:1;
+ /*! Indicates that a high water alert is active on this taskprocessor */
+ unsigned int high_water_alert:1;
};
/*!
@@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
-/*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
-
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
static int tps_ping_handler(void *datap);
-/*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
struct ao2_container *sorted_tps;
struct ast_taskprocessor *tps;
struct ao2_iterator iter;
-#define FMT_HEADERS "%-45s %10s %10s %10s\n"
-#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n"
+#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
switch (cmd) {
case CLI_INIT:
@@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_FAILURE;
}
- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
tcount = 0;
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
@@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
maxqsize = 0;
processed = 0;
}
- ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
+ ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+ tps->tps_queue_low, tps->tps_queue_high);
ast_taskprocessor_unreference(tps);
++tcount;
}
@@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
+/*! Count of the number of taskprocessors in high water alert. */
+static unsigned int tps_alert_count;
+
+/*! Access protection for tps_alert_count */
+AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
+
+/*!
+ * \internal
+ * \brief Add a delta to tps_alert_count with protection.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor updating queue water mark alert trigger.
+ * \param delta The amount to add to tps_alert_count.
+ *
+ * \return Nothing
+ */
+static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
+{
+ unsigned int old;
+
+ ast_rwlock_wrlock(&tps_alert_lock);
+ old = tps_alert_count;
+ tps_alert_count += delta;
+ if (DEBUG_ATLEAST(3)
+ /* and tps_alert_count becomes zero or non-zero */
+ && !old != !tps_alert_count) {
+ ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
+ tps->name, tps_alert_count ? "triggered" : "cleared");
+ }
+ ast_rwlock_unlock(&tps_alert_lock);
+}
+
+unsigned int ast_taskprocessor_alert_get(void)
+{
+ unsigned int count;
+
+ ast_rwlock_rdlock(&tps_alert_lock);
+ count = tps_alert_count;
+ ast_rwlock_unlock(&tps_alert_lock);
+
+ return count;
+}
+
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
+{
+ if (!tps || high_water < 0 || high_water < low_water) {
+ return -1;
+ }
+
+ if (low_water < 0) {
+ /* Set low water level to 90% of high water level */
+ low_water = (high_water * 9) / 10;
+ }
+
+ ao2_lock(tps);
+
+ tps->tps_queue_low = low_water;
+ tps->tps_queue_high = high_water;
+
+ if (tps->high_water_alert) {
+ if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
+ /* Update water mark alert immediately */
+ tps->high_water_alert = 0;
+ tps_alert_add(tps, -1);
+ }
+ } else {
+ if (high_water <= tps->tps_queue_size) {
+ /* Update water mark alert immediately */
+ tps->high_water_alert = 1;
+ tps_alert_add(tps, +1);
+ }
+ }
+
+ ao2_unlock(tps);
+
+ return 0;
+}
+
/* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
+static void tps_taskprocessor_dtor(void *tps)
{
struct ast_taskprocessor *t = tps;
struct tps_task *task;
- if (!tps) {
- ast_log(LOG_ERROR, "missing taskprocessor\n");
- return;
+ while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
+ tps_task_free(task);
}
- ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
- /* free it */
+ t->tps_queue_size = 0;
+
+ if (t->high_water_alert) {
+ t->high_water_alert = 0;
+ tps_alert_add(t, -1);
+ }
+
ast_free(t->stats);
t->stats = NULL;
ast_free((char *) t->name);
- if (t->listener) {
- ao2_ref(t->listener, -1);
- t->listener = NULL;
- }
- while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
- tps_task_free(task);
- }
+ t->name = NULL;
+ ao2_cleanup(t->listener);
+ t->listener = NULL;
}
/* pop the front task and return it */
@@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task *task;
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
- tps->tps_queue_size--;
+ --tps->tps_queue_size;
+ if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
+ tps->high_water_alert = 0;
+ tps_alert_add(tps, -1);
+ }
}
return task;
}
@@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
- RAII_VAR(struct ast_taskprocessor *, p,
- ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
+ struct ast_taskprocessor *p;
+ p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
}
- if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
- ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
- return NULL;
- }
- if (!(p->name = ast_strdup(name))) {
+ /* Set default congestion water level alert triggers. */
+ p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
+ p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+
+ p->stats = ast_calloc(1, sizeof(*p->stats));
+ p->name = ast_strdup(name);
+ if (!p->stats || !p->name) {
+ ao2_ref(p, -1);
return NULL;
}
@@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
listener->tps = NULL;
- ao2_ref(p, -1);
+ ao2_ref(p, -2);
return NULL;
}
if (p->listener->callbacks->start(p->listener)) {
- ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+ ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+ p->name);
ast_taskprocessor_unreference(p);
return NULL;
}
- /* RAII_VAR will decrement the refcount at the end of the function.
- * Since we want to pass back a reference to p, we bump the refcount
- */
- ao2_ref(p, +1);
return p;
-
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
@@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
- if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
- ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
- tps->name, previous_size);
- tps->high_water_warned = 1;
+ if (previous_size >= tps->tps_queue_high) {
+ if (!tps->high_water_warned) {
+ tps->high_water_warned = 1;
+ ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
+ tps->name, previous_size);
+ }
+ if (!tps->high_water_alert) {
+ tps->high_water_alert = 1;
+ tps_alert_add(tps, +1);
+ }
}
/* The currently executing task counts as still in queue */