summaryrefslogtreecommitdiff
path: root/res/res_pjsip_mwi.c
diff options
context:
space:
mode:
authorAlexei Gradinari <alex2grad@gmail.com>2016-08-08 13:53:32 -0400
committerAlexei Gradinari <alex2grad@gmail.com>2016-08-08 13:57:58 -0500
commit403b63571c53719bc050f4aded696cd933995d24 (patch)
treef12ba860945a4259979e2bb1d6497faf185304d7 /res/res_pjsip_mwi.c
parent9042ad40f2a56d6cfd4117897cbc9943253d4e09 (diff)
res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack
The PJSIP taskprocessors could be overflowed on startup if there are many (thousands) realtime endpoints configured with unsolicited mwi. The PJSIP stack could be totally unresponsive for a few minutes after boot completed. This patch creates a separate PJSIP serializers pool for mwi and makes unsolicited mwi use serializers from this pool. This patch also adds 2 new global options to tune taskprocessor alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'. This patch also adds new global option 'mwi_disable_initial_unsolicited' to disable sending unsolicited mwi to all endpoints on startup. If disabled then unsolicited mwi will start processing on next endpoint's contact update. ASTERISK-26230 #close Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a
Diffstat (limited to 'res/res_pjsip_mwi.c')
-rw-r--r--res/res_pjsip_mwi.c138
1 files changed, 132 insertions, 6 deletions
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index d86c96c74..bf7f04276 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -35,6 +35,7 @@
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/app.h"
@@ -52,6 +53,12 @@ static char *default_voicemail_extension;
#define MWI_DATASTORE "MWI datastore"
+/*! Number of serializers in pool if one not supplied. */
+#define MWI_SERIALIZER_POOL_SIZE 8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
+
static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
@@ -119,6 +126,117 @@ struct mwi_subscription {
char id[1];
};
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \return Nothing
+ */
+static void mwi_serializer_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
+ mwi_serializer_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
+
+ mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name);
+ if (!mwi_serializer_pool[idx]) {
+ mwi_serializer_pool_shutdown();
+ return -1;
+ }
+ }
+ return 0;
+}
+
+/*!
+ * \internal
+ * \brief Pick a mwi serializer from the pool.
+ * \since 13.12.0
+ *
+ * \retval least queue size task processor.
+ */
+static struct ast_taskprocessor *get_mwi_serializer(void)
+{
+ int idx;
+ int pos;
+
+ if (!mwi_serializer_pool[0]) {
+ return NULL;
+ }
+
+ for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
+ pos = idx;
+ }
+ }
+
+ return mwi_serializer_pool[pos];
+}
+
+/*!
+ * \internal
+ * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_set_alert_levels(void)
+{
+ int idx;
+ long tps_queue_high;
+ long tps_queue_low;
+
+ if (!mwi_serializer_pool[0]) {
+ return -1;
+ }
+
+ tps_queue_high = ast_sip_get_mwi_tps_queue_high();
+ if (tps_queue_high <= 0) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
+ tps_queue_high);
+ tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+ }
+
+ tps_queue_low = ast_sip_get_mwi_tps_queue_low();
+ if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+ ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
+ tps_queue_low);
+ tps_queue_low = -1;
+ }
+
+ for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+ if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
+ ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
+ idx);
+ }
+ }
+
+ return 0;
+}
+
+
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg);
@@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags)
struct mwi_subscription *mwi_sub = obj;
struct ast_taskprocessor *serializer = mwi_sub->is_solicited
? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
- : NULL;
+ : get_mwi_serializer();
if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
@@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags)
return 0;
}
- if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
+ if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
@@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type)
{
ast_free(default_voicemail_extension);
default_voicemail_extension = ast_sip_get_default_voicemail_extension();
+ mwi_serializer_set_alert_levels();
}
static struct ast_sorcery_observer global_observer = {
@@ -1175,15 +1294,21 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
+ if (mwi_serializer_pool_setup()) {
+ ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
+ }
+
create_mwi_subscriptions();
ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
- if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
- ast_sip_push_task(NULL, send_initial_notify_all, NULL);
- } else {
- stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
+ if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+ ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+ } else {
+ stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+ }
}
return AST_MODULE_LOAD_SUCCESS;
@@ -1193,6 +1318,7 @@ static int unload_module(void)
{
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
+ mwi_serializer_pool_shutdown();
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sip_unregister_subscription_handler(&mwi_handler);