summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2016-06-03 11:35:49 -0500
committerRichard Mudgett <rmudgett@digium.com>2016-06-09 10:32:07 -0500
commitdcfef53ee2339d09f839aa4e333419b80b309d8a (patch)
tree76193b7e1f9251bdce3aba0c2e57b270794ab797
parent4879cd875c932817c8259934814dd18cf15f5a1c (diff)
stasis: Add setting subscription congestion levels.
Stasis subscriptions and message routers create taskprocessors to process the event messages. API calls are needed to be able to set the congestion levels of these taskprocessors for selected subscriptions and message routers. * Updated CDR, CEL, and manager's stasis subscription congestion levels based upon stress testing. Increased the congestion levels to reduce the potential for bursty call setup/teardown activity from triggering the taskprocessor overload alert. CDRs in particular need an extra high congestion level because they can take awhile to process the stasis messages. ASTERISK-26088 Reported by: Richard Mudgett Change-Id: Id0a716394b4eee746dd158acc63d703902450244
-rw-r--r--include/asterisk/stasis.h14
-rw-r--r--include/asterisk/stasis_message_router.h14
-rw-r--r--main/cdr.c3
-rw-r--r--main/cel.c3
-rw-r--r--main/manager.c3
-rw-r--r--main/stasis.c12
-rw-r--r--main/stasis_message_router.c12
7 files changed, 61 insertions, 0 deletions
diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h
index 14ab7d93b..62ed1ed1a 100644
--- a/include/asterisk/stasis.h
+++ b/include/asterisk/stasis.h
@@ -591,6 +591,20 @@ struct stasis_subscription *stasis_unsubscribe(
struct stasis_subscription *subscription);
/*!
+ * \brief Set the high and low alert water marks of the stasis subscription.
+ * \since 13.10.0
+ *
+ * \param subscription Pointer to a stasis subscription
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+ long low_water, long high_water);
+
+/*!
* \brief Block until the last message is processed on a subscription.
*
* This function will not return until the \a subscription's callback for the
diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h
index 89657a5ee..50270a788 100644
--- a/include/asterisk/stasis_message_router.h
+++ b/include/asterisk/stasis_message_router.h
@@ -127,6 +127,20 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router,
struct stasis_message *message);
/*!
+ * \brief Set the high and low alert water marks of the stasis message router.
+ * \since 13.10.0
+ *
+ * \param router Pointer to a stasis message router
+ * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
+ * \param high_water New queue high water mark.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error (water marks not changed).
+ */
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+ long low_water, long high_water);
+
+/*!
* \brief Add a route to a message router.
*
* A particular \a message_type may have at most one route per \a router. If
diff --git a/main/cdr.c b/main/cdr.c
index b43e3610c..3ddf55b6b 100644
--- a/main/cdr.c
+++ b/main/cdr.c
@@ -71,6 +71,7 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_message_router.h"
#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<configInfo name="cdr" language="en_US">
@@ -4221,6 +4222,8 @@ int ast_cdr_engine_init(void)
if (!stasis_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(stasis_router, -1,
+ 10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
if (STASIS_MESSAGE_TYPE_INIT(cdr_sync_message_type)) {
return -1;
diff --git a/main/cel.c b/main/cel.c
index a0d0ad723..887a9e6a5 100644
--- a/main/cel.c
+++ b/main/cel.c
@@ -59,6 +59,7 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/parking.h"
#include "asterisk/pickup.h"
#include "asterisk/core_local.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<configInfo name="cel" language="en_US">
@@ -1575,6 +1576,8 @@ static int create_routes(void)
if (!cel_state_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(cel_state_router, -1,
+ 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
ret |= stasis_message_router_add(cel_state_router,
stasis_cache_update_type(),
diff --git a/main/manager.c b/main/manager.c
index 94415b7a0..029da70f7 100644
--- a/main/manager.c
+++ b/main/manager.c
@@ -100,6 +100,7 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/rtp_engine.h"
#include "asterisk/format_cache.h"
#include "asterisk/translate.h"
+#include "asterisk/taskprocessor.h"
/*** DOCUMENTATION
<manager name="Ping" language="en_US">
@@ -8692,6 +8693,8 @@ static int manager_subscriptions_init(void)
if (!stasis_router) {
return -1;
}
+ stasis_message_router_set_congestion_limits(stasis_router, -1,
+ 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
res |= stasis_message_router_set_default(stasis_router,
manager_default_msg_cb, NULL);
diff --git a/main/stasis.c b/main/stasis.c
index 9fe3a2aae..91ad94e76 100644
--- a/main/stasis.c
+++ b/main/stasis.c
@@ -564,6 +564,18 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL;
}
+int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
+ long low_water, long high_water)
+{
+ int res = -1;
+
+ if (subscription) {
+ res = ast_taskprocessor_alert_set_levels(subscription->mailbox,
+ low_water, high_water);
+ }
+ return res;
+}
+
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c
index f60180d68..85034bcf9 100644
--- a/main/stasis_message_router.c
+++ b/main/stasis_message_router.c
@@ -289,6 +289,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router,
ao2_cleanup(router);
}
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+ long low_water, long high_water)
+{
+ int res = -1;
+
+ if (router) {
+ res = stasis_subscription_set_congestion_limits(router->subscription,
+ low_water, high_water);
+ }
+ return res;
+}
+
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)