summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
authorAlexei Gradinari <alex2grad@gmail.com>2016-08-08 13:53:32 -0400
committerAlexei Gradinari <alex2grad@gmail.com>2016-08-08 13:53:32 -0400
commita06a1af0eb17428f5b929e1b6b76854a21a84500 (patch)
tree811a92589f5d3fc1888976dfe558339aa806ebc7 /res
parentbf2135929230bc805743ff86dba31d52849bfea2 (diff)
res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack
The PJSIP taskprocessors could be overflowed on startup if there are many (thousands) realtime endpoints configured with unsolicited mwi. The PJSIP stack could be totally unresponsive for a few minutes after boot completed. This patch creates a separate PJSIP serializers pool for mwi and makes unsolicited mwi use serializers from this pool. This patch also adds 2 new global options to tune taskprocessor alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'. This patch also adds new global option 'mwi_disable_initial_unsolicited' to disable sending unsolicited mwi to all endpoints on startup. If disabled then unsolicited mwi will start processing on next endpoint's contact update. ASTERISK-26230 #close Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a
Diffstat (limited to 'res')
-rw-r--r--res/res_pjsip.c42
-rw-r--r--res/res_pjsip/config_global.c68
-rw-r--r--res/res_pjsip_mwi.c138
3 files changed, 242 insertions, 6 deletions
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 46f05f991..aafb3a211 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1496,6 +1496,48 @@
set to this value if there is no better option (such as auth/realm) to be
used.</synopsis>
</configOption>
+ <configOption name="mwi_tps_queue_high" default="500">
+ <synopsis>MWI taskprocessor high water alert trigger level.</synopsis>
+ <description>
+ <para>On a heavily loaded system you may need to adjust the
+ taskprocessor queue limits. If any taskprocessor queue size
+ reaches its high water level then pjsip will stop processing
+ new requests until the alert is cleared. The alert clears
+ when all alerting taskprocessor queues have dropped to their
+ low water clear level.
+ </para>
+ </description>
+ </configOption>
+ <configOption name="mwi_tps_queue_low" default="-1">
+ <synopsis>MWI taskprocessor low water clear alert level.</synopsis>
+ <description>
+ <para>On a heavily loaded system you may need to adjust the
+ taskprocessor queue limits. If any taskprocessor queue size
+ reaches its high water level then pjsip will stop processing
+ new requests until the alert is cleared. The alert clears
+ when all alerting taskprocessor queues have dropped to their
+ low water clear level.
+ </para>
+ <note><para>Set to -1 for the low water level to be 90% of
+ the high water level.</para></note>
+ </description>
+ </configOption>
+ <configOption name="mwi_disable_initial_unsolicited" default="no">
+ <synopsis>Enable/Disable sending unsolicited MWI to all endpoints on startup.</synopsis>
+ <description>
+ <para>When the initial unsolicited MWI notification are
+ enabled on startup then the initial notifications
+ get sent at startup. If you have a lot of endpoints
+ (thousands) that use unsolicited MWI then you may
+ want to consider disabling the initial startup
+ notifications.
+ </para>
+ <para>When the initial unsolicited MWI notifications are
+ disabled on startup then the notifications will start
+ on the endpoint's next contact update.
+ </para>
+ </description>
+ </configOption>
</configObject>
</configFile>
</configInfo>
diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c
index 6bb688804..8a1b0d449 100644
--- a/res/res_pjsip/config_global.c
+++ b/res/res_pjsip/config_global.c
@@ -24,6 +24,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/sorcery.h"
+#include "asterisk/taskprocessor.h"
#include "asterisk/ast_version.h"
#include "asterisk/res_pjsip_cli.h"
@@ -43,6 +44,9 @@
#define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30
+#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL
+#define DEFAULT_MWI_TPS_QUEUE_LOW -1
+#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0
static char default_useragent[256];
@@ -79,6 +83,14 @@ struct global_config {
unsigned int unidentified_request_period;
/* Interval at which expired unidentifed requests will be pruned */
unsigned int unidentified_request_prune_interval;
+ struct {
+ /*! Taskprocessor high water alert trigger level */
+ unsigned int tps_queue_high;
+ /*! Taskprocessor low water clear alert level. */
+ int tps_queue_low;
+ /*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */
+ unsigned int disable_initial_unsolicited;
+ } mwi;
};
static void global_destructor(void *obj)
@@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size)
}
}
+
+unsigned int ast_sip_get_mwi_tps_queue_high(void)
+{
+ unsigned int tps_queue_high;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_TPS_QUEUE_HIGH;
+ }
+
+ tps_queue_high = cfg->mwi.tps_queue_high;
+ ao2_ref(cfg, -1);
+ return tps_queue_high;
+}
+
+int ast_sip_get_mwi_tps_queue_low(void)
+{
+ int tps_queue_low;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_TPS_QUEUE_LOW;
+ }
+
+ tps_queue_low = cfg->mwi.tps_queue_low;
+ ao2_ref(cfg, -1);
+ return tps_queue_low;
+}
+
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void)
+{
+ unsigned int disable_initial_unsolicited;
+ struct global_config *cfg;
+
+ cfg = get_global_cfg();
+ if (!cfg) {
+ return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED;
+ }
+
+ disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited;
+ ao2_ref(cfg, -1);
+ return disable_initial_unsolicited;
+}
+
+
/*!
* \internal
* \brief Observer to set default global object if none exist.
@@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void)
OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval));
ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM,
OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high",
+ __stringify(DEFAULT_MWI_TPS_QUEUE_HIGH),
+ OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low",
+ __stringify(DEFAULT_MWI_TPS_QUEUE_LOW),
+ OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low));
+ ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited",
+ DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no",
+ OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited));
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index d86c96c74..9553cd732 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -35,6 +35,7 @@
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/app.h"
@@ -52,6 +53,12 @@ static char *default_voicemail_extension;
#define MWI_DATASTORE "MWI datastore"
+/*! Number of serializers in pool if one not supplied. */
+#define MWI_SERIALIZER_POOL_SIZE 8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
+
static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
@@ -119,6 +126,117 @@ struct mwi_subscription {
char id[1];
};
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \return Nothing
+ */
+static void mwi_serializer_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
+ mwi_serializer_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
+
+ mwi_serializer_pool[idx] = ast_sip_create_serializer_named(tps_name);
+ if (!mwi_serializer_pool[idx]) {
+ mwi_serializer_pool_shutdown();
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/*!
+ * \internal
+ * \brief Pick a mwi serializer from the pool.
+ * \since 13.12.0
+ *
+ * \retval least queue size task processor.
+ */
+static struct ast_taskprocessor *get_mwi_serializer(void)
+{
+ int idx;
+ int pos;
+
+ if (!mwi_serializer_pool[0]) {
+ return NULL;
+ }
+
+ for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
+ pos = idx;
+ }
+ }
+
+ return mwi_serializer_pool[pos];
+}
+
+/*!
+ * \internal
+ * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_set_alert_levels(void)
+{
+ int idx;
+ long tps_queue_high;
+ long tps_queue_low;
+
+ if (!mwi_serializer_pool[0]) {
+ return -1;
+ }
+
+ tps_queue_high = ast_sip_get_mwi_tps_queue_high();
+ if (tps_queue_high <= 0) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
+ tps_queue_high);
+ tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+ }
+
+ tps_queue_low = ast_sip_get_mwi_tps_queue_low();
+ if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
+ tps_queue_low);
+ tps_queue_low = -1;
+ }
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
+ ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
+ idx);
+ }
+ }
+
+ return 0;
+}
+
+
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg);
@@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags)
struct mwi_subscription *mwi_sub = obj;
struct ast_taskprocessor *serializer = mwi_sub->is_solicited
? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
- : NULL;
+ : get_mwi_serializer();
if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
@@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags)
return 0;
}
- if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
+ if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
@@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type)
{
ast_free(default_voicemail_extension);
default_voicemail_extension = ast_sip_get_default_voicemail_extension();
+ mwi_serializer_set_alert_levels();
}
static struct ast_sorcery_observer global_observer = {
@@ -1175,15 +1294,21 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
+ if (mwi_serializer_pool_setup()) {
+ ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
+ }
+
create_mwi_subscriptions();
ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
- if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
- ast_sip_push_task(NULL, send_initial_notify_all, NULL);
- } else {
- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
+ if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+ ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+ } else {
+ stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ }
}
return AST_MODULE_LOAD_SUCCESS;
@@ -1193,6 +1318,7 @@ static int unload_module(void)
{
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
+ mwi_serializer_pool_shutdown();
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sip_unregister_subscription_handler(&mwi_handler);