summaryrefslogtreecommitdiff
path: root/res
diff options
context:
space:
mode:
authorGeorge Joseph <gjoseph@digium.com>2018-02-01 11:26:49 -0600
committerGerrit Code Review <gerrit2@gerrit.digium.api>2018-02-01 11:26:49 -0600
commitb1484537084ed560df14b8c0bb563948411fcbbb (patch)
treeeef2d65ce59a1cd50f617aef8a7aed9795e74d2b /res
parentb9efe5adf03a12c314d53b27a80709799be9e251 (diff)
parent2b9aa6b5bbe8d3ada6e4a9b0bc614f2f77470cd0 (diff)
Merge "res_pjsip_pubsub: Prune subs with reliable transports at startup"
Diffstat (limited to 'res')
-rw-r--r--res/res_pjsip.c39
-rw-r--r--res/res_pjsip/include/res_pjsip_private.h14
-rw-r--r--res/res_pjsip/pjsip_transport_events.c83
-rw-r--r--res/res_pjsip_outbound_registration.c13
-rw-r--r--res/res_pjsip_pubsub.c70
-rw-r--r--res/res_pjsip_registrar.c72
6 files changed, 220 insertions, 71 deletions
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 6a7d918c4..bf859fe88 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3118,6 +3118,45 @@ pjsip_endpoint *ast_sip_get_pjsip_endpoint(void)
return ast_pjsip_endpoint;
}
+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, struct ast_sip_endpoint *endpoint,
+ pjsip_rx_data *rdata)
+{
+ pj_str_t host_name;
+ int result = 1;
+
+ /* Determine if the contact cannot survive a restart/boot. */
+ if (uri->port == rdata->pkt_info.src_port
+ && !pj_strcmp(&uri->host,
+ pj_cstr(&host_name, rdata->pkt_info.src_name))
+ /* We have already checked if the URI scheme is sip: or sips: */
+ && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {
+ pj_str_t type_name;
+
+ /* Determine the transport parameter value */
+ if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {
+ /* WSS is special, as it needs to be ws. */
+ pj_cstr(&type_name, "ws");
+ } else {
+ pj_cstr(&type_name, rdata->tp_info.transport->type_name);
+ }
+
+ if (!pj_stricmp(&uri->transport_param, &type_name)
+ && (endpoint->nat.rewrite_contact
+ /* Websockets are always rewritten */
+ || !pj_stricmp(&uri->transport_param,
+ pj_cstr(&type_name, "ws")))) {
+ /*
+ * The contact was rewritten to the reliable transport's
+ * source address. Disconnecting the transport for any
+ * reason invalidates the contact.
+ */
+ result = 0;
+ }
+ }
+
+ return result;
+}
+
int ast_sip_get_transport_name(const struct ast_sip_endpoint *endpoint,
pjsip_sip_uri *sip_uri, char *buf, size_t buf_len)
{
diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h
index 7fafd8007..7d434aa95 100644
--- a/res/res_pjsip/include/res_pjsip_private.h
+++ b/res/res_pjsip/include/res_pjsip_private.h
@@ -347,4 +347,18 @@ int ast_sip_initialize_scheduler(void);
*/
int ast_sip_destroy_scheduler(void);
+/*!
+ * \internal
+ * \brief Determines if a uri will still be valid after an asterisk restart
+ * \since 13.20.0
+ *
+ * \param uri uri to test
+ * \param endpoint The associated endpoint
+ * \param rdata The rdata to get transport information from
+ *
+ * \retval 1 Yes, 0 No
+ */
+int ast_sip_will_uri_survive_restart(pjsip_sip_uri *uri, struct ast_sip_endpoint *endpoint,
+ pjsip_rx_data *rdata);
+
#endif /* RES_PJSIP_PRIVATE_H_ */
diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c
index 0f57303ba..c701b8411 100644
--- a/res/res_pjsip/pjsip_transport_events.c
+++ b/res/res_pjsip/pjsip_transport_events.c
@@ -135,7 +135,7 @@ static void transport_state_callback(pjsip_transport *transport,
break;
}
monitored->transport = transport;
- if (AST_VECTOR_INIT(&monitored->monitors, 2)) {
+ if (AST_VECTOR_INIT(&monitored->monitors, 5)) {
ao2_ref(monitored, -1);
break;
}
@@ -166,6 +166,8 @@ static void transport_state_callback(pjsip_transport *transport,
struct transport_monitor_notifier *notifier;
notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
+ ast_debug(3, "running callback %p(%p) for transport %s\n",
+ notifier->cb, notifier->data, transport->obj_name);
notifier->cb(notifier->data);
}
ao2_ref(monitored, -1);
@@ -195,43 +197,66 @@ static void transport_state_callback(pjsip_transport *transport,
}
}
-static int transport_monitor_unregister_all(void *obj, void *arg, int flags)
+struct callback_data {
+ ast_transport_monitor_shutdown_cb cb;
+ void *data;
+ ast_transport_monitor_data_matcher matches;
+};
+
+static int transport_monitor_unregister_cb(void *obj, void *arg, int flags)
{
struct transport_monitor *monitored = obj;
- ast_transport_monitor_shutdown_cb cb = arg;
+ struct callback_data *cb_data = arg;
int idx;
for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
struct transport_monitor_notifier *notifier;
notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
- if (notifier->cb == cb) {
+ if (notifier->cb == cb_data->cb && (!cb_data->data
+ || cb_data->matches(cb_data->data, notifier->data))) {
ao2_cleanup(notifier->data);
AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
- break;
+ ast_debug(3, "Unregistered monitor %p(%p) from transport %s\n",
+ notifier->cb, notifier->data, monitored->transport->obj_name);
}
}
return 0;
}
-void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb)
+static int ptr_matcher(void *a, void *b)
+{
+ return a == b;
+}
+
+void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb,
+ void *data, ast_transport_monitor_data_matcher matches)
{
struct ao2_container *transports;
+ struct callback_data cb_data = {
+ .cb = cb,
+ .data = data,
+ .matches = matches ?: ptr_matcher,
+ };
+
+ ast_assert(cb != NULL);
transports = ao2_global_obj_ref(active_transports);
if (!transports) {
return;
}
- ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all,
- cb);
+ ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_cb, &cb_data);
ao2_ref(transports, -1);
}
-void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb)
+void ast_sip_transport_monitor_unregister(pjsip_transport *transport,
+ ast_transport_monitor_shutdown_cb cb, void *data, ast_transport_monitor_data_matcher matches)
{
struct ao2_container *transports;
struct transport_monitor *monitored;
+ ast_assert(transport != NULL && cb != NULL);
+
transports = ao2_global_obj_ref(active_transports);
if (!transports) {
return;
@@ -240,18 +265,13 @@ void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transp
ao2_lock(transports);
monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (monitored) {
- int idx;
+ struct callback_data cb_data = {
+ .cb = cb,
+ .data = data,
+ .matches = matches ?: ptr_matcher,
+ };
- for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
- struct transport_monitor_notifier *notifier;
-
- notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
- if (notifier->cb == cb) {
- ao2_cleanup(notifier->data);
- AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx);
- break;
- }
- }
+ transport_monitor_unregister_cb(monitored, &cb_data, 0);
ao2_ref(monitored, -1);
}
ao2_unlock(transports);
@@ -265,6 +285,8 @@ enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transpor
struct transport_monitor *monitored;
enum ast_transport_monitor_reg res = AST_TRANSPORT_MONITOR_REG_NOT_FOUND;
+ ast_assert(transport != NULL && cb != NULL);
+
transports = ao2_global_obj_ref(active_transports);
if (!transports) {
return res;
@@ -273,31 +295,22 @@ enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transpor
ao2_lock(transports);
monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (monitored) {
- int idx;
struct transport_monitor_notifier new_monitor;
- /* Check if the callback monitor already exists */
- for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) {
- struct transport_monitor_notifier *notifier;
-
- notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx);
- if (notifier->cb == cb) {
- /* The monitor is already in the vector replace with new ao2_data. */
- ao2_replace(notifier->data, ao2_data);
- res = AST_TRANSPORT_MONITOR_REG_REPLACED;
- goto register_done;
- }
- }
-
/* Add new monitor to vector */
new_monitor.cb = cb;
new_monitor.data = ao2_bump(ao2_data);
if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) {
ao2_cleanup(ao2_data);
res = AST_TRANSPORT_MONITOR_REG_FAILED;
+ ast_debug(3, "Register monitor %p(%p) to transport %s FAILED\n",
+ cb, ao2_data, transport->obj_name);
+ } else {
+ res = AST_TRANSPORT_MONITOR_REG_SUCCESS;
+ ast_debug(3, "Registered monitor %p(%p) to transport %s\n",
+ cb, ao2_data, transport->obj_name);
}
-register_done:
ao2_ref(monitored, -1);
}
ao2_unlock(transports);
diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c
index 4baf23c93..d0f754604 100644
--- a/res/res_pjsip_outbound_registration.c
+++ b/res/res_pjsip_outbound_registration.c
@@ -850,6 +850,14 @@ static void registration_transport_shutdown_cb(void *obj)
}
}
+static int monitor_matcher(void *a, void *b)
+{
+ char *ma = a;
+ char *mb = b;
+
+ return strcmp(ma, mb) == 0;
+}
+
static void registration_transport_monitor_setup(pjsip_transport *transport, const char *registration_name)
{
char *monitor;
@@ -950,7 +958,8 @@ static int handle_registration_response(void *data)
ast_debug(1, "Outbound unregistration to '%s' with client '%s' successful\n", server_uri, client_uri);
update_client_state_status(response->client_state, SIP_REGISTRATION_UNREGISTERED);
ast_sip_transport_monitor_unregister(response->rdata->tp_info.transport,
- registration_transport_shutdown_cb);
+ registration_transport_shutdown_cb, response->client_state->registration_name,
+ monitor_matcher);
}
} else if (response->client_state->destroy) {
/* We need to deal with the pending destruction instead. */
@@ -2149,7 +2158,7 @@ static int unload_module(void)
ao2_global_obj_release(current_states);
- ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb);
+ ast_sip_transport_monitor_unregister_all(registration_transport_shutdown_cb, NULL, NULL);
/* Wait for registration serializers to get destroyed. */
ast_debug(2, "Waiting for registration transactions to complete for unload.\n");
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index 369e06d4c..c78f20c2b 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -127,6 +127,11 @@
<configOption name="contact_uri">
<synopsis>The Contact URI of the dialog for the subscription</synopsis>
</configOption>
+ <configOption name="prune_on_boot">
+ <synopsis>If set, indicates that the contact used a reliable transport
+ and therefore the subscription must be deleted after an asterisk restart.
+ </synopsis>
+ </configOption>
</configObject>
<configObject name="resource_list">
<synopsis>Resource list configuration parameters.</synopsis>
@@ -382,6 +387,8 @@ struct subscription_persistence {
struct timeval expires;
/*! Contact URI */
char contact_uri[PJSIP_MAX_URL_SIZE];
+ /*! Prune subscription on restart */
+ int prune_on_boot;
};
/*!
@@ -446,6 +453,10 @@ struct sip_subscription_tree {
* capable of restarting the timer.
*/
struct ast_sip_sched_task *expiration_task;
+ /*! The transport the subscription was received on.
+ * Only used for reliable transports.
+ */
+ pjsip_transport *transport;
};
/*!
@@ -549,6 +560,17 @@ static void *publication_resource_alloc(const char *name)
return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
}
+static void sub_tree_transport_cb(void *data) {
+ struct sip_subscription_tree *sub_tree = data;
+
+ ast_debug(3, "Transport destroyed. Removing subscription '%s->%s' prune on restart: %d\n",
+ sub_tree->persistence->endpoint, sub_tree->root->resource,
+ sub_tree->persistence->prune_on_boot);
+
+ sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
+ pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
+}
+
/*! \brief Destructor for subscription persistence */
static void subscription_persistence_destroy(void *obj)
{
@@ -599,8 +621,9 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr
return;
}
- ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
- sub_tree->root->resource);
+ ast_debug(3, "Updating persistence for '%s->%s' prune on restart: %s\n",
+ sub_tree->persistence->endpoint, sub_tree->root->resource,
+ sub_tree->persistence->prune_on_boot ? "yes" : "no");
dlg = sub_tree->dlg;
sub_tree->persistence->cseq = dlg->local.cseq;
@@ -614,6 +637,28 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr
sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
if (contact_hdr) {
+ if (contact_hdr) {
+ if (type == SUBSCRIPTION_PERSISTENCE_CREATED) {
+ sub_tree->persistence->prune_on_boot =
+ !ast_sip_will_uri_survive_restart(
+ (pjsip_sip_uri *)pjsip_uri_get_uri(contact_hdr->uri),
+ sub_tree->endpoint, rdata);
+
+ if (sub_tree->persistence->prune_on_boot) {
+ ast_debug(3, "adding transport monitor on %s for '%s->%s' prune on restart: %d\n",
+ rdata->tp_info.transport->obj_name,
+ sub_tree->persistence->endpoint, sub_tree->root->resource,
+ sub_tree->persistence->prune_on_boot);
+ sub_tree->transport = rdata->tp_info.transport;
+ ast_sip_transport_monitor_register(rdata->tp_info.transport,
+ sub_tree_transport_cb, sub_tree);
+ /*
+ * FYI: ast_sip_transport_monitor_register holds a reference to the sub_tree
+ */
+ }
+ }
+ }
+
pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
} else {
@@ -656,6 +701,15 @@ static void subscription_persistence_remove(struct sip_subscription_tree *sub_tr
return;
}
+ if (sub_tree->persistence->prune_on_boot && sub_tree->transport) {
+ ast_debug(3, "Unregistering transport monitor on %s '%s->%s'\n",
+ sub_tree->transport->obj_name,
+ sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
+ sub_tree->root ? sub_tree->root->resource : "Unknown");
+ ast_sip_transport_monitor_unregister(sub_tree->transport,
+ sub_tree_transport_cb, sub_tree, NULL);
+ }
+
ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
ao2_ref(sub_tree->persistence, -1);
sub_tree->persistence = NULL;
@@ -1564,6 +1618,14 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags)
pjsip_rx_data rdata;
struct persistence_recreate_data recreate_data;
+ /* If this subscription used a reliable transport it can't be reestablished so remove it */
+ if (persistence->prune_on_boot) {
+ ast_debug(3, "Deleting subscription marked as 'prune' from persistent store '%s' %s\n",
+ persistence->endpoint, persistence->tag);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
/* If this subscription has already expired remove it */
if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
@@ -5416,6 +5478,8 @@ static int load_module(void)
persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0,
CHARFLDSET(struct subscription_persistence, contact_uri));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "prune_on_boot", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, prune_on_boot));
if (apply_list_configuration(sorcery)) {
ast_sched_context_destroy(sched);
@@ -5492,6 +5556,8 @@ static int unload_module(void)
AST_TEST_UNREGISTER(loop);
AST_TEST_UNREGISTER(bad_event);
+ ast_sip_transport_monitor_unregister_all(sub_tree_transport_cb, NULL, NULL);
+
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
diff --git a/res/res_pjsip_registrar.c b/res/res_pjsip_registrar.c
index 76d8b04a6..bdee91fb3 100644
--- a/res/res_pjsip_registrar.c
+++ b/res/res_pjsip_registrar.c
@@ -328,6 +328,15 @@ struct contact_transport_monitor {
char aor_name[0];
};
+static int contact_transport_monitor_matcher(void *a, void *b)
+{
+ struct contact_transport_monitor *ma = a;
+ struct contact_transport_monitor *mb = b;
+
+ return strcmp(ma->aor_name, mb->aor_name) == 0
+ && strcmp(ma->contact_name, mb->contact_name) == 0;
+}
+
static void register_contact_transport_shutdown_cb(void *data)
{
struct contact_transport_monitor *monitor = data;
@@ -579,8 +588,7 @@ static void register_aor_core(pjsip_rx_data *rdata,
contact = ao2_callback(contacts, OBJ_UNLINK, registrar_find_contact, &details);
if (!contact) {
- int prune_on_boot = 0;
- pj_str_t host_name;
+ int prune_on_boot;
/* If they are actually trying to delete a contact that does not exist... be forgiving */
if (!expiration) {
@@ -589,35 +597,7 @@ static void register_aor_core(pjsip_rx_data *rdata,
continue;
}
- /* Determine if the contact cannot survive a restart/boot. */
- if (details.uri->port == rdata->pkt_info.src_port
- && !pj_strcmp(&details.uri->host,
- pj_cstr(&host_name, rdata->pkt_info.src_name))
- /* We have already checked if the URI scheme is sip: or sips: */
- && PJSIP_TRANSPORT_IS_RELIABLE(rdata->tp_info.transport)) {
- pj_str_t type_name;
-
- /* Determine the transport parameter value */
- if (!strcasecmp("WSS", rdata->tp_info.transport->type_name)) {
- /* WSS is special, as it needs to be ws. */
- pj_cstr(&type_name, "ws");
- } else {
- pj_cstr(&type_name, rdata->tp_info.transport->type_name);
- }
-
- if (!pj_stricmp(&details.uri->transport_param, &type_name)
- && (endpoint->nat.rewrite_contact
- /* Websockets are always rewritten */
- || !pj_stricmp(&details.uri->transport_param,
- pj_cstr(&type_name, "ws")))) {
- /*
- * The contact was rewritten to the reliable transport's
- * source address. Disconnecting the transport for any
- * reason invalidates the contact.
- */
- prune_on_boot = 1;
- }
- }
+ prune_on_boot = !ast_sip_will_uri_survive_restart(details.uri, endpoint, rdata);
contact = ast_sip_location_create_contact(aor, contact_uri,
ast_tvadd(ast_tvnow(), ast_samp2tv(expiration, 1)),
@@ -704,6 +684,21 @@ static void register_aor_core(pjsip_rx_data *rdata,
contact_update->user_agent);
ao2_cleanup(contact_update);
} else {
+ if (contact->prune_on_boot) {
+ struct contact_transport_monitor *monitor;
+ const char *contact_name =
+ ast_sorcery_object_get_id(contact);
+
+ monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(aor_name)
+ + strlen(contact_name));
+ strcpy(monitor->aor_name, aor_name);/* Safe */
+ monitor->contact_name = monitor->aor_name + strlen(aor_name) + 1;
+ strcpy(monitor->contact_name, contact_name);/* Safe */
+
+ ast_sip_transport_monitor_unregister(rdata->tp_info.transport,
+ register_contact_transport_shutdown_cb, monitor, contact_transport_monitor_matcher);
+ }
+
/* We want to report the user agent that was actually in the removed contact */
ast_sip_location_delete_contact(contact);
ast_verb(3, "Removed contact '%s' from AOR '%s' due to request\n", contact_uri, aor_name);
@@ -1115,6 +1110,19 @@ static int expire_contact(void *obj, void *arg, int flags)
*/
ao2_lock(lock);
if (ast_tvdiff_ms(ast_tvnow(), contact->expiration_time) > 0) {
+ if (contact->prune_on_boot) {
+ struct contact_transport_monitor *monitor;
+ const char *contact_name = ast_sorcery_object_get_id(contact);
+
+ monitor = ast_alloca(sizeof(*monitor) + 2 + strlen(contact->aor)
+ + strlen(contact_name));
+ strcpy(monitor->aor_name, contact->aor);/* Safe */
+ monitor->contact_name = monitor->aor_name + strlen(contact->aor) + 1;
+ strcpy(monitor->contact_name, contact_name);/* Safe */
+
+ ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb,
+ monitor, contact_transport_monitor_matcher);
+ }
ast_sip_location_delete_contact(contact);
}
ao2_unlock(lock);
@@ -1222,7 +1230,7 @@ static int unload_module(void)
ast_manager_unregister(AMI_SHOW_REGISTRATIONS);
ast_manager_unregister(AMI_SHOW_REGISTRATION_CONTACT_STATUSES);
ast_sip_unregister_service(&registrar_module);
- ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb);
+ ast_sip_transport_monitor_unregister_all(register_contact_transport_shutdown_cb, NULL, NULL);
return 0;
}