From 2b9aa6b5bbe8d3ada6e4a9b0bc614f2f77470cd0 Mon Sep 17 00:00:00 2001 From: George Joseph Date: Sun, 28 Jan 2018 09:10:00 -0700 Subject: res_pjsip_pubsub: Prune subs with reliable transports at startup In an earlier release, inbound registrations on a reliable transport were pruned on Asterisk restart since the TCP connection would have been torn down and become unusable when Asterisk stopped. This same process is now also applied to inbound subscriptions. Also fixed issues in res_pjsip_registrar where it wasn't handling the monitoring correctly when multiple registrations came in over the same transport. To accomplish this, the pjsip_transport_event feature needed to be refactored to allow multiple monitors (multiple subcriptions or registrations from the same endpoint) to exist on the same transport. Since this changed the API, any external modules that may have used the transport monitor feature (highly unlikey) will need to be changed. ASTERISK-27612 Reported by: Ross Beer Change-Id: Iee87cf4eb9b7b2b93d5739a72af52d6ca8fbbe36 --- res/res_pjsip.c | 39 +++++++++++++++ res/res_pjsip/include/res_pjsip_private.h | 14 ++++++ res/res_pjsip/pjsip_transport_events.c | 83 ++++++++++++++++++------------- res/res_pjsip_outbound_registration.c | 13 ++++- res/res_pjsip_pubsub.c | 70 +++++++++++++++++++++++++- res/res_pjsip_registrar.c | 72 +++++++++++++++------------ 6 files changed, 220 insertions(+), 71 deletions(-) (limited to 'res') diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 6a7d918c4..bf859fe88 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3118,6 +3118,45 @@ pjsip_endpoint *ast_sip_get_pjsip_endpoint(void) return ast_pjsip_endpoint; } +int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, struct ast_sip_endpoint *endpoint, + pjsip_rx_data *rdata) +{ + pj_str_t host_name; + int result = 1; + + /* Determine if the contact cannot survive a restart/boot. */ + if (uri->port == rdata->pkt_info.src_port + && !pj_strcmp(&uri->host, + pj_cstr(&host_name, rdata->pkt_info.src_name)) + /* We have already checked if the URI scheme is sip: or sips: */ + && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) { + pj_str_t type_name; + + /* Determine the transport parameter value */ + if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) { + /* WSS is special, as it needs to be ws. */ + pj_cstr(&type_name, "ws"); + } else { + pj_cstr(&type_name, rdata->tp_info.transport->type_name); + } + + if (!pj_stricmp(&uri->transport_param, &type_name) + && (endpoint->nat.rewrite_contact + /* Websockets are always rewritten */ + || !pj_stricmp(&uri->transport_param, + pj_cstr(&type_name, "ws")))) { + /* + * The contact was rewritten to the reliable transport's + * source address. Disconnecting the transport for any + * reason invalidates the contact. + */ + result = 0; + } + } + + return result; +} + int ast_sip_get_transport_name(const struct ast_sip_endpoint *endpoint, pjsip_sip_uri *sip_uri, char *buf, size_t buf_len) { diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 7fafd8007..7d434aa95 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -347,4 +347,18 @@ int ast_sip_initialize_scheduler(void); */ int ast_sip_destroy_scheduler(void); +/*! + * \internal + * \brief Determines if a uri will still be valid after an asterisk restart + * \since 13.20.0 + * + * \param uri uri to test + * \param endpoint The associated endpoint + * \param rdata The rdata to get transport information from + * + * \retval 1 Yes, 0 No + */ +int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, struct ast_sip_endpoint *endpoint, + pjsip_rx_data *rdata); + #endif /* RES_PJSIP_PRIVATE_H_ */ diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c index 0f57303ba..c701b8411 100644 --- a/res/res_pjsip/pjsip_transport_events.c +++ b/res/res_pjsip/pjsip_transport_events.c @@ -135,7 +135,7 @@ static void transport_state_callback(pjsip_transport *transport, break; } monitored->transport = transport; - if (AST_VECTOR_INIT(&monitored->monitors, 2)) { + if (AST_VECTOR_INIT(&monitored->monitors, 5)) { ao2_ref(monitored, -1); break; } @@ -166,6 +166,8 @@ static void transport_state_callback(pjsip_transport *transport, struct transport_monitor_notifier *notifier; notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + ast_debug(3, "running callback %p(%p) for transport %s\n", + notifier->cb, notifier->data, transport->obj_name); notifier->cb(notifier->data); } ao2_ref(monitored, -1); @@ -195,43 +197,66 @@ static void transport_state_callback(pjsip_transport *transport, } } -static int transport_monitor_unregister_all(void *obj, void *arg, int flags) +struct callback_data { + ast_transport_monitor_shutdown_cb cb; + void *data; + ast_transport_monitor_data_matcher matches; +}; + +static int transport_monitor_unregister_cb(void *obj, void *arg, int flags) { struct transport_monitor *monitored = obj; - ast_transport_monitor_shutdown_cb cb = arg; + struct callback_data *cb_data = arg; int idx; for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { struct transport_monitor_notifier *notifier; notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); - if (notifier->cb == cb) { + if (notifier->cb == cb_data->cb && (!cb_data->data + || cb_data->matches(cb_data->data, notifier->data))) { ao2_cleanup(notifier->data); AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx); - break; + ast_debug(3, "Unregistered monitor %p(%p) from transport %s\n", + notifier->cb, notifier->data, monitored->transport->obj_name); } } return 0; } -void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb) +static int ptr_matcher(void *a, void *b) +{ + return a == b; +} + +void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb, + void *data, ast_transport_monitor_data_matcher matches) { struct ao2_container *transports; + struct callback_data cb_data = { + .cb = cb, + .data = data, + .matches = matches ?: ptr_matcher, + }; + + ast_assert(cb != NULL); transports = ao2_global_obj_ref(active_transports); if (!transports) { return; } - ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all, - cb); + ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_cb, &cb_data); ao2_ref(transports, -1); } -void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb) +void ast_sip_transport_monitor_unregister(pjsip_transport *transport, + ast_transport_monitor_shutdown_cb cb, void *data, ast_transport_monitor_data_matcher matches) { struct ao2_container *transports; struct transport_monitor *monitored; + ast_assert(transport != NULL && cb != NULL); + transports = ao2_global_obj_ref(active_transports); if (!transports) { return; @@ -240,18 +265,13 @@ void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transp ao2_lock(transports); monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (monitored) { - int idx; + struct callback_data cb_data = { + .cb = cb, + .data = data, + .matches = matches ?: ptr_matcher, + }; - for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { - struct transport_monitor_notifier *notifier; - - notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); - if (notifier->cb == cb) { - ao2_cleanup(notifier->data); - AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx); - break; - } - } + transport_monitor_unregister_cb(monitored, &cb_data, 0); ao2_ref(monitored, -1); } ao2_unlock(transports); @@ -265,6 +285,8 @@ enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transpor struct transport_monitor *monitored; enum ast_transport_monitor_reg res = AST_TRANSPORT_MONITOR_REG_NOT_FOUND; + ast_assert(transport != NULL && cb != NULL); + transports = ao2_global_obj_ref(active_transports); if (!transports) { return res; @@ -273,31 +295,22 @@ enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transpor ao2_lock(transports); monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (monitored) { - int idx; struct transport_monitor_notifier new_monitor; - /* Check if the callback monitor already exists */ - for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { - struct transport_monitor_notifier *notifier; - - notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); - if (notifier->cb == cb) { - /* The monitor is already in the vector replace with new ao2_data. */ - ao2_replace(notifier->data, ao2_data); - res = AST_TRANSPORT_MONITOR_REG_REPLACED; - goto register_done; - } - } - /* Add new monitor to vector */ new_monitor.cb = cb; new_monitor.data = ao2_bump(ao2_data); if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) { ao2_cleanup(ao2_data); res = AST_TRANSPORT_MONITOR_REG_FAILED; + ast_debug(3, "Register monitor %p(%p) to transport %s FAILED\n", + cb, ao2_data, transport->obj_name); + } else { + res = AST_TRANSPORT_MONITOR_REG_SUCCESS; + ast_debug(3, "Registered monitor %p(%p) to transport %s\n", + cb, ao2_data, transport->obj_name); } -register_done: ao2_ref(monitored, -1); } ao2_unlock(transports); diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 4baf23c93..d0f754604 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -850,6 +850,14 @@ static void registration_transport_shutdown_cb(void *obj) } } +static int monitor_matcher(void *a, void *b) +{ + char *ma = a; + char *mb = b; + + return strcmp(ma, mb) == 0; +} + static void registration_transport_monitor_setup(pjsip_transport *transport, const char *registration_name) { char *monitor; @@ -950,7 +958,8 @@ static int handle_registration_response(void *data) ast_debug(1, "Outbound unregistration to '%s' with client '%s' successful\n", server_uri, client_uri); update_client_state_status(response->client_state, SIP_REGISTRATION_UNREGISTERED); ast_sip_transport_monitor_unregister(response->rdata->tp_info.transport, - registration_transport_shutdown_cb); + registration_transport_shutdown_cb, response->client_state->registration_name, + monitor_matcher); } } else if (response->client_state->destroy) { /* We need to deal with the pending destruction instead. */ @@ -2149,7 +2158,7 @@ static int unload_module(void) ao2_global_obj_release(current_states); - ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb); + ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb, NULL, NULL); /* Wait for registration serializers to get destroyed. */ ast_debug(2, "Waiting for registration transactions to complete for unload.\n"); diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 369e06d4c..c78f20c2b 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -127,6 +127,11 @@ The Contact URI of the dialog for the subscription + + If set, indicates that the contact used a reliable transport + and therefore the subscription must be deleted after an asterisk restart. + + Resource list configuration parameters. @@ -382,6 +387,8 @@ struct subscription_persistence { struct timeval expires; /*! Contact URI */ char contact_uri[PJSIP_MAX_URL_SIZE]; + /*! Prune subscription on restart */ + int prune_on_boot; }; /*! @@ -446,6 +453,10 @@ struct sip_subscription_tree { * capable of restarting the timer. */ struct ast_sip_sched_task *expiration_task; + /*! The transport the subscription was received on. + * Only used for reliable transports. + */ + pjsip_transport *transport; }; /*! @@ -549,6 +560,17 @@ static void *publication_resource_alloc(const char *name) return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy); } +static void sub_tree_transport_cb(void *data) { + struct sip_subscription_tree *sub_tree = data; + + ast_debug(3, "Transport destroyed. Removing subscription '%s->%s' prune on restart: %d\n", + sub_tree->persistence->endpoint, sub_tree->root->resource, + sub_tree->persistence->prune_on_boot); + + sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS; + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); +} + /*! \brief Destructor for subscription persistence */ static void subscription_persistence_destroy(void *obj) { @@ -599,8 +621,9 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr return; } - ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint, - sub_tree->root->resource); + ast_debug(3, "Updating persistence for '%s->%s' prune on restart: %s\n", + sub_tree->persistence->endpoint, sub_tree->root->resource, + sub_tree->persistence->prune_on_boot ? "yes" : "no"); dlg = sub_tree->dlg; sub_tree->persistence->cseq = dlg->local.cseq; @@ -614,6 +637,28 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1)); if (contact_hdr) { + if (contact_hdr) { + if (type == SUBSCRIPTION_PERSISTENCE_CREATED) { + sub_tree->persistence->prune_on_boot = + !ast_sip_will_uri_survive_restart( + (pjsip_sip_uri *)pjsip_uri_get_uri(contact_hdr->uri), + sub_tree->endpoint, rdata); + + if (sub_tree->persistence->prune_on_boot) { + ast_debug(3, "adding transport monitor on %s for '%s->%s' prune on restart: %d\n", + rdata->tp_info.transport->obj_name, + sub_tree->persistence->endpoint, sub_tree->root->resource, + sub_tree->persistence->prune_on_boot); + sub_tree->transport = rdata->tp_info.transport; + ast_sip_transport_monitor_register(rdata->tp_info.transport, + sub_tree_transport_cb, sub_tree); + /* + * FYI: ast_sip_transport_monitor_register holds a reference to the sub_tree + */ + } + } + } + pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri, sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri)); } else { @@ -656,6 +701,15 @@ static void subscription_persistence_remove(struct sip_subscription_tree *sub_tr return; } + if (sub_tree->persistence->prune_on_boot && sub_tree->transport) { + ast_debug(3, "Unregistering transport monitor on %s '%s->%s'\n", + sub_tree->transport->obj_name, + sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown", + sub_tree->root ? sub_tree->root->resource : "Unknown"); + ast_sip_transport_monitor_unregister(sub_tree->transport, + sub_tree_transport_cb, sub_tree, NULL); + } + ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence); ao2_ref(sub_tree->persistence, -1); sub_tree->persistence = NULL; @@ -1564,6 +1618,14 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) pjsip_rx_data rdata; struct persistence_recreate_data recreate_data; + /* If this subscription used a reliable transport it can't be reestablished so remove it */ + if (persistence->prune_on_boot) { + ast_debug(3, "Deleting subscription marked as 'prune' from persistent store '%s' %s\n", + persistence->endpoint, persistence->tag); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + /* If this subscription has already expired remove it */ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n", @@ -5416,6 +5478,8 @@ static int load_module(void) persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0); ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0, CHARFLDSET(struct subscription_persistence, contact_uri)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "prune_on_boot", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, prune_on_boot)); if (apply_list_configuration(sorcery)) { ast_sched_context_destroy(sched); @@ -5492,6 +5556,8 @@ static int unload_module(void) AST_TEST_UNREGISTER(loop); AST_TEST_UNREGISTER(bad_event); + ast_sip_transport_monitor_unregister_all(sub_tree_transport_cb, NULL, NULL); + ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND); diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c index 078e13ee2..2e519b72b 100644 --- a/res/res_pjsip_registrar.c +++ b/res/res_pjsip_registrar.c @@ -327,6 +327,15 @@ struct contact_transport_monitor { char aor_name[0]; }; +static int contact_transport_monitor_matcher(void *a, void *b) +{ + struct contact_transport_monitor *ma = a; + struct contact_transport_monitor *mb = b; + + return strcmp(ma->aor_name, mb->aor_name) == 0 + && strcmp(ma->contact_name, mb->contact_name) == 0; +} + static void register_contact_transport_shutdown_cb(void *data) { struct contact_transport_monitor *monitor = data; @@ -578,8 +587,7 @@ static void register_aor_core(pjsip_rx_data *rdata, contact = ao2_callback(contacts, OBJ_UNLINK, registrar_find_contact, &details); if (!contact) { - int prune_on_boot = 0; - pj_str_t host_name; + int prune_on_boot; /* If they are actually trying to delete a contact that does not exist... be forgiving */ if (!expiration) { @@ -588,35 +596,7 @@ static void register_aor_core(pjsip_rx_data *rdata, continue; } - /* Determine if the contact cannot survive a restart/boot. */ - if (details.uri->port == rdata->pkt_info.src_port - && !pj_strcmp(&details.uri->host, - pj_cstr(&host_name, rdata->pkt_info.src_name)) - /* We have already checked if the URI scheme is sip: or sips: */ - && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) { - pj_str_t type_name; - - /* Determine the transport parameter value */ - if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) { - /* WSS is special, as it needs to be ws. */ - pj_cstr(&type_name, "ws"); - } else { - pj_cstr(&type_name, rdata->tp_info.transport->type_name); - } - - if (!pj_stricmp(&details.uri->transport_param, &type_name) - && (endpoint->nat.rewrite_contact - /* Websockets are always rewritten */ - || !pj_stricmp(&details.uri->transport_param, - pj_cstr(&type_name, "ws")))) { - /* - * The contact was rewritten to the reliable transport's - * source address. Disconnecting the transport for any - * reason invalidates the contact. - */ - prune_on_boot = 1; - } - } + prune_on_boot = !ast_sip_will_uri_survive_restart(details.uri, endpoint, rdata); contact = ast_sip_location_create_contact(aor, contact_uri, ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)), @@ -703,6 +683,21 @@ static void register_aor_core(pjsip_rx_data *rdata, contact_update->user_agent); ao2_cleanup(contact_update); } else { + if (contact->prune_on_boot) { + struct contact_transport_monitor *monitor; + const char *contact_name = + ast_sorcery_object_get_id(contact); + + monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(aor_name) + + strlen(contact_name)); + strcpy(monitor->aor_name, aor_name);/* Safe */ + monitor->contact_name = monitor->aor_name + strlen(aor_name) + 1; + strcpy(monitor->contact_name, contact_name);/* Safe */ + + ast_sip_transport_monitor_unregister(rdata->tp_info.transport, + register_contact_transport_shutdown_cb, monitor, contact_transport_monitor_matcher); + } + /* We want to report the user agent that was actually in the removed contact */ ast_sip_location_delete_contact(contact); ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name); @@ -1114,6 +1109,19 @@ static int expire_contact(void *obj, void *arg, int flags) */ ao2_lock(lock); if (ast_tvdiff_ms(ast_tvnow(), contact->expiration_time) > 0) { + if (contact->prune_on_boot) { + struct contact_transport_monitor *monitor; + const char *contact_name = ast_sorcery_object_get_id(contact); + + monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(contact->aor) + + strlen(contact_name)); + strcpy(monitor->aor_name, contact->aor);/* Safe */ + monitor->contact_name = monitor->aor_name + strlen(contact->aor) + 1; + strcpy(monitor->contact_name, contact_name);/* Safe */ + + ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb, + monitor, contact_transport_monitor_matcher); + } ast_sip_location_delete_contact(contact); } ao2_unlock(lock); @@ -1221,7 +1229,7 @@ static int unload_module(void) ast_manager_unregister(AMI_SHOW_REGISTRATIONS); ast_manager_unregister(AMI_SHOW_REGISTRATION_CONTACT_STATUSES); ast_sip_unregister_service(®istrar_module); - ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb); + ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb, NULL, NULL); return 0; } -- cgit v1.2.3