diff options
Diffstat (limited to 'res/res_pjsip_mwi.c')
-rw-r--r-- | res/res_pjsip_mwi.c | 188 |
1 files changed, 148 insertions, 40 deletions
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 9eba335b5..fb6e6b8f6 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -35,6 +35,7 @@ #include "asterisk/module.h" #include "asterisk/logger.h" #include "asterisk/astobj2.h" +#include "asterisk/taskprocessor.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/app.h" @@ -52,6 +53,12 @@ static char *default_voicemail_extension; #define MWI_DATASTORE "MWI datastore" +/*! Number of serializers in pool if one not supplied. */ +#define MWI_SERIALIZER_POOL_SIZE 8 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; + static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint, @@ -119,6 +126,117 @@ struct mwi_subscription { char id[1]; }; +/*! + * \internal + * \brief Shutdown the serializers in the mwi pool. + * \since 13.12.0 + * + * \return Nothing + */ +static void mwi_serializer_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + ast_taskprocessor_unreference(mwi_serializer_pool[idx]); + mwi_serializer_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_pool_setup(void) +{ + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); + + mwi_serializer_pool[idx] = ast_sip_create_serializer_named(tps_name); + if (!mwi_serializer_pool[idx]) { + mwi_serializer_pool_shutdown(); + return -1; + } + } + return 0; +} + +/*! + * \internal + * \brief Pick a mwi serializer from the pool. + * \since 13.12.0 + * + * \retval least queue size task processor. + */ +static struct ast_taskprocessor *get_mwi_serializer(void) +{ + int idx; + int pos; + + if (!mwi_serializer_pool[0]) { + return NULL; + } + + for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { + pos = idx; + } + } + + return mwi_serializer_pool[pos]; +} + +/*! + * \internal + * \brief Set taskprocessor alert levels for the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_set_alert_levels(void) +{ + int idx; + long tps_queue_high; + long tps_queue_low; + + if (!mwi_serializer_pool[0]) { + return -1; + } + + tps_queue_high = ast_sip_get_mwi_tps_queue_high(); + if (tps_queue_high <= 0) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", + tps_queue_high); + tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + } + + tps_queue_low = ast_sip_get_mwi_tps_queue_low(); + if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", + tps_queue_low); + tps_queue_low = -1; + } + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { + ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", + idx); + } + } + + return 0; +} + + static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -630,7 +748,6 @@ static int endpoint_receives_unsolicited_mwi_for_mailbox(struct ast_sip_endpoint int ret = 0; mwi_subs = ao2_find(unsolicited_mwi, endpoint_id, OBJ_SEARCH_KEY | OBJ_MULTIPLE); - if (!mwi_subs) { return 0; } @@ -945,7 +1062,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : NULL; + : get_mwi_serializer(); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -971,43 +1088,17 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, } } -/*! \note Called with the unsolicited_mwi conainer lock held. */ +/*! \note Called with the unsolicited_mwi container lock held. */ static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, int flags) { RAII_VAR(struct mwi_subscription *, aggregate_sub, NULL, ao2_cleanup); struct ast_sip_endpoint *endpoint = obj; - char *endpoint_aors, *aor_name, *mailboxes, *mailbox; - struct ao2_container *contacts = NULL; + char *mailboxes, *mailbox; if (ast_strlen_zero(endpoint->subscription.mwi.mailboxes)) { return 0; } - endpoint_aors = ast_strdupa(endpoint->aors); - - while ((aor_name = ast_strip(strsep(&endpoint_aors, ",")))) { - RAII_VAR(struct ast_sip_aor *, aor, ast_sip_location_retrieve_aor(aor_name), ao2_cleanup); - - if (!aor) { - continue; - } - - contacts = ast_sip_location_retrieve_aor_contacts(aor); - if (!contacts || (ao2_container_count(contacts) == 0)) { - ao2_cleanup(contacts); - contacts = NULL; - continue; - } - - break; - } - - if (!contacts) { - return 0; - } - - ao2_ref(contacts, -1); - if (endpoint->subscription.mwi.aggregate) { aggregate_sub = mwi_subscription_alloc(endpoint, 0, NULL); if (!aggregate_sub) { @@ -1089,7 +1180,7 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1120,7 +1211,8 @@ static void mwi_contact_added(const void *object) ao2_lock(unsolicited_mwi); - mwi_subs = ao2_find(unsolicited_mwi, endpoint_id, OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK | OBJ_UNLINK); + mwi_subs = ao2_find(unsolicited_mwi, endpoint_id, + OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK | OBJ_UNLINK); if (mwi_subs) { for (; (mwi_sub = ao2_iterator_next(mwi_subs)); ao2_cleanup(mwi_sub)) { unsubscribe(mwi_sub, NULL, 0); @@ -1175,6 +1267,7 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); + mwi_serializer_set_alert_levels(); } static struct ast_sorcery_observer global_observer = { @@ -1195,21 +1288,29 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - unsolicited_mwi = ao2_container_alloc(MWI_BUCKETS, mwi_sub_hash, mwi_sub_cmp); + if (mwi_serializer_pool_setup()) { + ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); + } + + unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, + mwi_sub_hash, NULL, mwi_sub_cmp); if (!unsolicited_mwi) { + mwi_serializer_pool_shutdown(); ast_sip_unregister_subscription_handler(&mwi_handler); return AST_MODULE_LOAD_DECLINE; } - create_mwi_subscriptions(); + ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_reload_object(ast_sip_get_sorcery(), "global"); - if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); - } else { - stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + if (!ast_sip_get_mwi_disable_initial_unsolicited()) { + if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + ast_sip_push_task(NULL, send_initial_notify_all, NULL); + } else { + stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + } } return AST_MODULE_LOAD_SUCCESS; @@ -1217,12 +1318,19 @@ static int load_module(void) static int unload_module(void) { - ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_ref(unsolicited_mwi, -1); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); + + ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); + ao2_ref(unsolicited_mwi, -1); + unsolicited_mwi = NULL; + + mwi_serializer_pool_shutdown(); + ast_sip_unregister_subscription_handler(&mwi_handler); + ast_free(default_voicemail_extension); + default_voicemail_extension = NULL; return 0; } |