summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2016-06-02 16:08:19 -0500
committerRichard Mudgett <rmudgett@digium.com>2016-06-09 10:32:07 -0500
commit2cd67d5b07d28891ea7c86aece702a4d6e436afc (patch)
tree22a35eaf8088ed377a0f1acc1979bde11a0c6543
parentc966a035e03f1dba94fb24c4a7ed56133425c85a (diff)
taskprocessors: Implement high/low water mark alerts.
When taskprocessors get backed up, there is a good chance that we are being overloaded and need to defer adding new work to the system. * Implemented a high/low water alert mechanism for modules to check if the system is being overloaded and take appropriate action. When a taskprocessor is created it has default congestion levels set. A taskprocessor can later have those congestion levels altered for specific needs if stress testing shows that the taskprocessor is a symptom of overloading or needs to handle bursty activity without triggering an overload alert. * Add CLI "core show taskprocessor" low/high water columns. * Fixed __allocate_taskprocessor() to not use RAII_VAR(). RAII_VAR() was never a good thing to use when creating a taskprocessor because of the nature of how its references needed to be cleaned up on a partial creation. * Made res_pjsip's distributor check if the taskprocessor overload alert is active before placing a message representing brand new work onto a distributor serializer. ASTERISK-26088 Reported by: Richard Mudgett Change-Id: I182f1be603529cd665958661c4c05ff9901825fa
-rw-r--r--include/asterisk/taskprocessor.h23
-rw-r--r--main/taskprocessor.c172
-rw-r--r--res/res_pjsip/pjsip_distributor.c38
3 files changed, 176 insertions, 57 deletions
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index af3ce747f..e51122269 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -59,6 +59,7 @@ struct ast_taskprocessor;
/*! \brief Suggested maximum taskprocessor name length (less null terminator). */
#define AST_TASKPROCESSOR_MAX_NAME 45
+/*! Default taskprocessor high water level alert trigger */
#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500
/*!
@@ -297,4 +298,26 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
*/
long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+/*!
+ * \brief Get the current taskprocessor high water alert count.
+ * \since 13.10.0
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_alert_get(void);
+
+/*!
+ * \brief Set the high and low alert water marks of the given taskprocessor queue.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor to update queue water marks.
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water);
+
#endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 5b8ff08f1..2f0124045 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -76,6 +76,10 @@ struct ast_taskprocessor {
void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
+ /*! \brief Taskprocessor low water clear alert level */
+ long tps_queue_low;
+ /*! \brief Taskprocessor high water alert trigger level */
+ long tps_queue_high;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
struct ast_taskprocessor_listener *listener;
@@ -85,6 +89,8 @@ struct ast_taskprocessor {
unsigned int executing:1;
/*! Indicates that a high water warning has been issued on this task processor */
unsigned int high_water_warned:1;
+ /*! Indicates that a high water alert is active on this taskprocessor */
+ unsigned int high_water_alert:1;
};
/*!
@@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
-/*! \brief Destroy the taskprocessor when its refcount reaches zero */
-static void tps_taskprocessor_destroy(void *tps);
-
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
static int tps_ping_handler(void *datap);
-/*! \brief Remove the front task off the taskprocessor queue */
-static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
struct ao2_container *sorted_tps;
struct ast_taskprocessor *tps;
struct ao2_iterator iter;
-#define FMT_HEADERS "%-45s %10s %10s %10s\n"
-#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n"
+#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
+#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
switch (cmd) {
case CLI_INIT:
@@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_FAILURE;
}
- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
+ ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
tcount = 0;
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
@@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
maxqsize = 0;
processed = 0;
}
- ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
+ ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
+ tps->tps_queue_low, tps->tps_queue_high);
ast_taskprocessor_unreference(tps);
++tcount;
}
@@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
+/*! Count of the number of taskprocessors in high water alert. */
+static unsigned int tps_alert_count;
+
+/*! Access protection for tps_alert_count */
+AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
+
+/*!
+ * \internal
+ * \brief Add a delta to tps_alert_count with protection.
+ * \since 13.10.0
+ *
+ * \param tps Taskprocessor updating queue water mark alert trigger.
+ * \param delta The amount to add to tps_alert_count.
+ *
+ * \return Nothing
+ */
+static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
+{
+ unsigned int old;
+
+ ast_rwlock_wrlock(&tps_alert_lock);
+ old = tps_alert_count;
+ tps_alert_count += delta;
+ if (DEBUG_ATLEAST(3)
+ /* and tps_alert_count becomes zero or non-zero */
+ && !old != !tps_alert_count) {
+ ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
+ tps->name, tps_alert_count ? "triggered" : "cleared");
+ }
+ ast_rwlock_unlock(&tps_alert_lock);
+}
+
+unsigned int ast_taskprocessor_alert_get(void)
+{
+ unsigned int count;
+
+ ast_rwlock_rdlock(&tps_alert_lock);
+ count = tps_alert_count;
+ ast_rwlock_unlock(&tps_alert_lock);
+
+ return count;
+}
+
+int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
+{
+ if (!tps || high_water < 0 || high_water < low_water) {
+ return -1;
+ }
+
+ if (low_water < 0) {
+ /* Set low water level to 90% of high water level */
+ low_water = (high_water * 9) / 10;
+ }
+
+ ao2_lock(tps);
+
+ tps->tps_queue_low = low_water;
+ tps->tps_queue_high = high_water;
+
+ if (tps->high_water_alert) {
+ if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
+ /* Update water mark alert immediately */
+ tps->high_water_alert = 0;
+ tps_alert_add(tps, -1);
+ }
+ } else {
+ if (high_water <= tps->tps_queue_size) {
+ /* Update water mark alert immediately */
+ tps->high_water_alert = 1;
+ tps_alert_add(tps, +1);
+ }
+ }
+
+ ao2_unlock(tps);
+
+ return 0;
+}
+
/* destroy the taskprocessor */
-static void tps_taskprocessor_destroy(void *tps)
+static void tps_taskprocessor_dtor(void *tps)
{
struct ast_taskprocessor *t = tps;
struct tps_task *task;
- if (!tps) {
- ast_log(LOG_ERROR, "missing taskprocessor\n");
- return;
+ while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
+ tps_task_free(task);
}
- ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
- /* free it */
+ t->tps_queue_size = 0;
+
+ if (t->high_water_alert) {
+ t->high_water_alert = 0;
+ tps_alert_add(t, -1);
+ }
+
ast_free(t->stats);
t->stats = NULL;
ast_free((char *) t->name);
- if (t->listener) {
- ao2_ref(t->listener, -1);
- t->listener = NULL;
- }
- while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
- tps_task_free(task);
- }
+ t->name = NULL;
+ ao2_cleanup(t->listener);
+ t->listener = NULL;
}
/* pop the front task and return it */
@@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task *task;
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
- tps->tps_queue_size--;
+ --tps->tps_queue_size;
+ if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
+ tps->high_water_alert = 0;
+ tps_alert_add(tps, -1);
+ }
}
return task;
}
@@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
- RAII_VAR(struct ast_taskprocessor *, p,
- ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
+ struct ast_taskprocessor *p;
+ p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
}
- if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
- ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
- return NULL;
- }
- if (!(p->name = ast_strdup(name))) {
+ /* Set default congestion water level alert triggers. */
+ p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
+ p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+
+ p->stats = ast_calloc(1, sizeof(*p->stats));
+ p->name = ast_strdup(name);
+ if (!p->stats || !p->name) {
+ ao2_ref(p, -1);
return NULL;
}
@@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
if (!(ao2_link(tps_singletons, p))) {
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
listener->tps = NULL;
- ao2_ref(p, -1);
+ ao2_ref(p, -2);
return NULL;
}
if (p->listener->callbacks->start(p->listener)) {
- ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
+ ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
+ p->name);
ast_taskprocessor_unreference(p);
return NULL;
}
- /* RAII_VAR will decrement the refcount at the end of the function.
- * Since we want to pass back a reference to p, we bump the refcount
- */
- ao2_ref(p, +1);
return p;
-
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
@@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
- if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
- ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
- tps->name, previous_size);
- tps->high_water_warned = 1;
+ if (previous_size >= tps->tps_queue_high) {
+ if (!tps->high_water_warned) {
+ tps->high_water_warned = 1;
+ ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
+ tps->name, previous_size);
+ }
+ if (!tps->high_water_alert) {
+ tps->high_water_alert = 1;
+ tps_alert_add(tps, +1);
+ }
}
/* The currently executing task counts as still in queue */
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 75ae461cd..715ecb263 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -369,8 +369,6 @@ static pjsip_module endpoint_mod = {
.on_rx_request = endpoint_lookup,
};
-#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
-
static pj_bool_t distributor(pjsip_rx_data *rdata)
{
pjsip_dialog *dlg;
@@ -408,6 +406,13 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
pjsip_rx_data_get_info(rdata));
serializer = find_request_serializer(rdata);
if (!serializer) {
+ if (ast_taskprocessor_alert_get()) {
+ /* We're overloaded, ignore the unmatched response. */
+ ast_debug(3, "Taskprocessor overload alert: Ignoring unmatched '%s'.\n",
+ pjsip_rx_data_get_info(rdata));
+ return PJ_TRUE;
+ }
+
/*
* Pick a serializer for the unmatched response. Maybe
* the stack can figure out what it is for, or we really
@@ -422,6 +427,21 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
return PJ_TRUE;
} else {
+ if (ast_taskprocessor_alert_get()) {
+ /*
+ * When taskprocessors get backed up, there is a good chance that
+ * we are being overloaded and need to defer adding new work to
+ * the system. To defer the work we will ignore the request and
+ * rely on the peer's transport layer to retransmit the message.
+ * We usually work off the overload within a few seconds. The
+ * alternative is to send back a 503 response to these requests
+ * and be done with it.
+ */
+ ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
+ pjsip_rx_data_get_info(rdata));
+ return PJ_TRUE;
+ }
+
/* Pick a serializer for the out-of-dialog request. */
serializer = ast_sip_get_distributor_serializer(rdata);
}
@@ -432,21 +452,9 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
}
- if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
- /* When the threadpool is backed up this much, there is a good chance that we have encountered
- * some sort of terrible condition and don't need to be adding more work to the threadpool.
- * It's in our best interest to send back a 503 response and be done with it.
- */
- if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
- pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
- }
+ if (ast_sip_push_task(serializer, distribute, clone)) {
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
- } else {
- if (ast_sip_push_task(serializer, distribute, clone)) {
- ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
- pjsip_rx_data_free_cloned(clone);
- }
}
ast_taskprocessor_unreference(serializer);