From b3c787d1ddd36eb7dfa0906b92a83ab23d8741cd Mon Sep 17 00:00:00 2001 From: Alexei Gradinari Date: Thu, 2 Jun 2016 18:26:09 -0400 Subject: res_pjsip: improve realtime performance #2 The patch removes updating all Endpoints' status on startup. Instead, only non-qualified aors with static contact and non-qualified non-expired contacts are retrieved from the realtime to update the endpoint status to ONLINE. The endpoint name was added to the contact object to simply find the endpoint that created this contact. The status of endpoints with qualified aors will be updated by 'qualify' functions. ASTERISK-26061 #close Change-Id: Id324c1776fa55d3741e0c5457ecac0304cb1a0df --- res/res_pjsip/location.c | 6 ++ res/res_pjsip/pjsip_configuration.c | 126 ++++++++++++++++++++++------------- res/res_pjsip/pjsip_options.c | 128 +++++++++++++++++++++++++++++++++++- 3 files changed, 214 insertions(+), 46 deletions(-) (limited to 'res/res_pjsip') diff --git a/res/res_pjsip/location.c b/res/res_pjsip/location.c index f55bd0fb4..1b7850f5f 100644 --- a/res/res_pjsip/location.c +++ b/res/res_pjsip/location.c @@ -121,6 +121,7 @@ static void *contact_alloc(const char *name) return NULL; } + ast_string_field_init_extended(contact, endpoint_name); ast_string_field_init_extended(contact, reg_server); ast_string_field_init_extended(contact, via_addr); ast_string_field_init_extended(contact, call_id); @@ -356,6 +357,10 @@ int ast_sip_location_add_contact_nolock(struct ast_sip_aor *aor, const char *uri contact->endpoint = ao2_bump(endpoint); + if (endpoint) { + ast_string_field_set(contact, endpoint_name, ast_sorcery_object_get_id(endpoint)); + } + return ast_sorcery_create(ast_sip_get_sorcery(), contact); } @@ -1138,6 +1143,7 @@ int ast_sip_initialize_sorcery_location(void) ast_sorcery_object_field_register(sorcery, "contact", "authenticate_qualify", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_contact, authenticate_qualify)); ast_sorcery_object_field_register(sorcery, "contact", "outbound_proxy", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, outbound_proxy)); ast_sorcery_object_field_register(sorcery, "contact", "user_agent", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, user_agent)); + ast_sorcery_object_field_register(sorcery, "contact", "endpoint", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, endpoint_name)); ast_sorcery_object_field_register(sorcery, "contact", "reg_server", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, reg_server)); ast_sorcery_object_field_register(sorcery, "contact", "via_addr", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_contact, via_addr)); ast_sorcery_object_field_register(sorcery, "contact", "via_port", "0", OPT_UINT_T, 0, FLDSET(struct ast_sip_contact, via_port)); diff --git a/res/res_pjsip/pjsip_configuration.c b/res/res_pjsip/pjsip_configuration.c index 368f0d8a2..8791816c3 100644 --- a/res/res_pjsip/pjsip_configuration.c +++ b/res/res_pjsip/pjsip_configuration.c @@ -58,6 +58,53 @@ static int persistent_endpoint_cmp(void *obj, void *arg, int flags) return !strcmp(ast_endpoint_get_resource(persistent1->endpoint), id) ? CMP_MATCH | CMP_STOP : 0; } +/*! \brief Internal function for changing the state of an endpoint */ +static void endpoint_update_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state) +{ + struct ast_json *blob; + char *regcontext; + + /* If there was no state change, don't publish anything. */ + if (ast_endpoint_get_state(endpoint) == state) { + return; + } + + regcontext = ast_sip_get_regcontext(); + + if (state == AST_ENDPOINT_ONLINE) { + ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE); + blob = ast_json_pack("{s: s}", "peer_status", "Reachable"); + + if (!ast_strlen_zero(regcontext)) { + if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) { + ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL, + "Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP"); + } + } + + ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint)); + } else { + ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE); + blob = ast_json_pack("{s: s}", "peer_status", "Unreachable"); + + if (!ast_strlen_zero(regcontext)) { + struct pbx_find_info q = { .stacklen = 0 }; + + if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) { + ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL); + } + } + + ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint)); + } + + ast_free(regcontext); + + ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob); + ast_json_unref(blob); + ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint)); +} + /*! \brief Callback function for changing the state of an endpoint */ static int persistent_endpoint_update_state(void *obj, void *arg, int flags) { @@ -69,7 +116,6 @@ static int persistent_endpoint_update_state(void *obj, void *arg, int flags) struct ao2_iterator i; struct ast_sip_contact *contact; enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE; - char *regcontext; if (status) { char rtt[32]; @@ -113,45 +159,7 @@ static int persistent_endpoint_update_state(void *obj, void *arg, int flags) ao2_ref(contacts, -1); } - /* If there was no state change, don't publish anything. */ - if (ast_endpoint_get_state(endpoint) == state) { - return 0; - } - - regcontext = ast_sip_get_regcontext(); - - if (state == AST_ENDPOINT_ONLINE) { - ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE); - blob = ast_json_pack("{s: s}", "peer_status", "Reachable"); - - if (!ast_strlen_zero(regcontext)) { - if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) { - ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL, - "Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP"); - } - } - - ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint)); - } else { - ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE); - blob = ast_json_pack("{s: s}", "peer_status", "Unreachable"); - - if (!ast_strlen_zero(regcontext)) { - struct pbx_find_info q = { .stacklen = 0 }; - - if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) { - ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL); - } - } - - ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint)); - } - - ast_free(regcontext); - - ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob); - ast_json_unref(blob); - ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint)); + endpoint_update_state(endpoint,state); return 0; } @@ -1184,6 +1192,20 @@ static void persistent_endpoint_destroy(void *obj) ast_free(persistent->aors); } +int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state) +{ + RAII_VAR(struct sip_persistent_endpoint *, persistent, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, persistent_endpoints); + + if (!(persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_KEY | OBJ_NOLOCK))) { + return -1; + } + + endpoint_update_state(persistent->endpoint, state); + + return 0; +} + /*! \brief Internal function which finds (or creates) persistent endpoint information */ static struct ast_endpoint *persistent_endpoint_find_or_create(const struct ast_sip_endpoint *endpoint) { @@ -1201,11 +1223,7 @@ static struct ast_endpoint *persistent_endpoint_find_or_create(const struct ast_ persistent->aors = ast_strdup(endpoint->aors); - if (ast_strlen_zero(persistent->aors)) { - ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_UNKNOWN); - } else { - persistent_endpoint_update_state(persistent, NULL, 0); - } + ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_UNKNOWN); ao2_link_flags(persistent_endpoints, persistent, OBJ_NOLOCK); } @@ -1686,6 +1704,22 @@ static struct ast_cli_entry cli_commands[] = { struct ast_sip_cli_formatter_entry *channel_formatter; struct ast_sip_cli_formatter_entry *endpoint_formatter; +static int on_load_endpoint(void *obj, void *arg, int flags) +{ + return sip_endpoint_apply_handler(sip_sorcery, obj); +} + +static void load_all_endpoints(void) +{ + struct ao2_container *endpoints; + + endpoints = ast_sorcery_retrieve_by_fields(sip_sorcery, "endpoint", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + if (endpoints) { + ao2_callback(endpoints, OBJ_NODATA, on_load_endpoint, NULL); + ao2_ref(endpoints, -1); + } +} + int ast_res_pjsip_initialize_configuration(const struct ast_module_info *ast_module_info) { if (ast_manager_register_xml(AMI_SHOW_ENDPOINTS, EVENT_FLAG_SYSTEM, ami_show_endpoints) || @@ -1887,6 +1921,8 @@ int ast_res_pjsip_initialize_configuration(const struct ast_module_info *ast_mod ast_sorcery_load(sip_sorcery); + load_all_endpoints(); + return 0; } diff --git a/res/res_pjsip/pjsip_options.c b/res/res_pjsip/pjsip_options.c index d73766cb2..ede0d5eba 100644 --- a/res/res_pjsip/pjsip_options.c +++ b/res/res_pjsip/pjsip_options.c @@ -340,7 +340,12 @@ static int qualify_contact(struct ast_sip_endpoint *endpoint, struct ast_sip_con if (endpoint) { endpoint_local = ao2_bump(endpoint); } else { - endpoint_local = find_an_endpoint(contact); + if (!ast_strlen_zero(contact->endpoint_name)) { + endpoint_local = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", contact->endpoint_name); + } + if (!endpoint_local) { + endpoint_local = find_an_endpoint(contact); + } if (!endpoint_local) { ast_log(LOG_ERROR, "Unable to find an endpoint to qualify contact %s\n", contact->uri); @@ -1248,6 +1253,126 @@ static const struct ast_sorcery_observer observer_callbacks_options = { .deleted = aor_observer_deleted }; +static int aor_update_endpoint_state(void *obj, void *arg, int flags) +{ + struct ast_sip_endpoint *endpoint = obj; + const char *endpoint_name = ast_sorcery_object_get_id(endpoint); + char *aor = arg; + char *endpoint_aor; + char *endpoint_aors; + + if (ast_strlen_zero(aor) || ast_strlen_zero(endpoint->aors)) { + return 0; + } + + endpoint_aors = ast_strdupa(endpoint->aors); + while ((endpoint_aor = ast_strip(strsep(&endpoint_aors, ",")))) { + if (!strcmp(aor, endpoint_aor)) { + if (ast_sip_persistent_endpoint_update_state(endpoint_name, AST_ENDPOINT_ONLINE) == -1) { + ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for aor '%s'\n", + endpoint_name, aor); + } + } + } + + return 0; +} + +static int on_aor_update_endpoint_state(void *obj, void *arg, int flags) +{ + struct ast_sip_aor *aor = obj; + struct ao2_container *endpoints; + RAII_VAR(struct ast_variable *, var, NULL, ast_variables_destroy); + const char *aor_name = ast_sorcery_object_get_id(aor); + char *aor_like; + + if (ast_strlen_zero(aor_name)) { + return -1; + } + + if (aor->permanent_contacts && ((int)(aor->qualify_frequency * 1000)) <= 0) { + aor_like = ast_alloca(strlen(aor_name) + 3); + sprintf(aor_like, "%%%s%%", aor_name); + var = ast_variable_new("aors LIKE", aor_like, ""); + if (!var) { + return -1; + } + endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "endpoint", AST_RETRIEVE_FLAG_MULTIPLE, var); + + if (endpoints) { + /* + * Because aors are a string list, we have to use a pattern match but since a simple + * pattern match could return an endpoint that has an aor of "aaabccc" when searching + * for "abc", we still have to iterate over them to find an exact aor match. + */ + ao2_callback(endpoints, 0, aor_update_endpoint_state, (char *)aor_name); + ao2_ref(endpoints, -1); + } + } + + return 0; +} + +static int contact_update_endpoint_state(void *obj, void *arg, int flags) +{ + const struct ast_sip_contact *contact = obj; + struct timeval tv = ast_tvnow(); + + if (!ast_strlen_zero(contact->endpoint_name) && ((int)(contact->qualify_frequency * 1000)) <= 0 && + contact->expiration_time.tv_sec > tv.tv_sec) { + + if (ast_sip_persistent_endpoint_update_state(contact->endpoint_name, AST_ENDPOINT_ONLINE) == -1) { + ast_log(LOG_WARNING, "Unable to find persistent endpoint '%s' for contact '%s/%s'\n", + contact->endpoint_name, contact->aor, contact->uri); + return -1; + } + } + + return 0; +} + +static void update_all_unqualified_endpoints(void) +{ + struct ao2_container *aors; + struct ao2_container *contacts; + RAII_VAR(struct ast_variable *, var_aor, NULL, ast_variables_destroy); + RAII_VAR(struct ast_variable *, var_contact, NULL, ast_variables_destroy); + RAII_VAR(char *, time_now, NULL, ast_free); + struct timeval tv = ast_tvnow(); + + if (!(var_aor = ast_variable_new("contact !=", "", ""))) { + return; + } + if (!(var_aor->next = ast_variable_new("qualify_frequency <=", "0", ""))) { + return; + } + + if (ast_asprintf(&time_now, "%ld", tv.tv_sec) == -1) { + return; + } + if (!(var_contact = ast_variable_new("expiration_time >", time_now, ""))) { + return; + } + if (!(var_contact->next = ast_variable_new("qualify_frequency <=", "0", ""))) { + return; + } + + aors = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "aor", AST_RETRIEVE_FLAG_MULTIPLE, var_aor); + if (aors) { + ao2_callback(aors, OBJ_NODATA, on_aor_update_endpoint_state, NULL); + ao2_ref(aors, -1); + } + + contacts = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "contact", AST_RETRIEVE_FLAG_MULTIPLE, var_contact); + if (contacts) { + ao2_callback(contacts, OBJ_NODATA, contact_update_endpoint_state, NULL); + ao2_ref(contacts, -1); + } +} + int ast_res_pjsip_init_options_handling(int reload) { static const pj_str_t STR_OPTIONS = { "OPTIONS", 7 }; @@ -1289,6 +1414,7 @@ int ast_res_pjsip_init_options_handling(int reload) ast_manager_register2("PJSIPQualify", EVENT_FLAG_SYSTEM | EVENT_FLAG_REPORTING, ami_sip_qualify, NULL, NULL, NULL); ast_cli_register_multiple(cli_options, ARRAY_LEN(cli_options)); + update_all_unqualified_endpoints(); qualify_and_schedule_all(); return 0; -- cgit v1.2.3