From ceb1007ed7b201059aafd4b8dcde8c5dd62e803d Mon Sep 17 00:00:00 2001 From: Richard Mudgett Date: Fri, 27 May 2016 16:28:39 -0500 Subject: res_pjsip_pubsub.c: Recreate subscriptions using distributor serializer. * Resolves potential reentrancy problems if system restarted in the middle of subscription message transactions. * Fixes memory leak recreating persistent subscriptions when the subscription resource tree could not be created. ASTERISK-26088 Reported by: Richard Mudgett Change-Id: I71e34d7ae8ed35a694f1030e820e2548c48697be --- res/res_pjsip_pubsub.c | 183 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 125 insertions(+), 58 deletions(-) (limited to 'res/res_pjsip_pubsub.c') diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 012fdea26..79019fb25 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -355,7 +355,7 @@ struct ast_sip_publication { struct subscription_persistence { /*! Sorcery object details */ SORCERY_OBJECT(details); - /*! The name of the endpoint involved in the subscrption */ + /*! The name of the endpoint involved in the subscription */ char *endpoint; /*! SIP message that creates the subscription */ char packet[PJSIP_MAX_PKT_LEN]; @@ -1347,109 +1347,176 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s static int initial_notify_task(void *obj); static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state); -/*! \brief Callback function to perform the actual recreation of a subscription */ -static int subscription_persistence_recreate(void *obj, void *arg, int flags) +/*! Persistent subscription recreation continuation under distributor serializer data */ +struct persistence_recreate_data { + struct subscription_persistence *persistence; + pjsip_rx_data *rdata; +}; + +/*! + * \internal + * \brief subscription_persistence_recreate continuation under distributor serializer. + * \since 13.10.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int sub_persistence_recreate(void *obj) { - struct subscription_persistence *persistence = obj; - pj_pool_t *pool = arg; - pjsip_rx_data rdata = { { 0, }, }; - RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct persistence_recreate_data *recreate_data = obj; + struct subscription_persistence *persistence = recreate_data->persistence; + pjsip_rx_data *rdata = recreate_data->rdata; + struct ast_sip_endpoint *endpoint; struct sip_subscription_tree *sub_tree; struct ast_sip_pubsub_body_generator *generator; - int resp; + struct ast_sip_subscription_handler *handler; char *resource; - size_t resource_size; pjsip_sip_uri *request_uri; + size_t resource_size; + int resp; struct resource_tree tree; pjsip_expires_hdr *expires_header; - struct ast_sip_subscription_handler *handler; - /* If this subscription has already expired remove it */ - if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } + request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri); + resource_size = pj_strlen(&request_uri->user) + 1; + resource = ast_alloca(resource_size); + ast_copy_pj_str(resource, &request_uri->user, resource_size); - endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint); - if (!endpoint) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n", + handler = subscription_get_handler_from_rdata(rdata); + if (!handler || !handler->notifier) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } - pj_pool_reset(pool); - rdata.tp_info.pool = pool; - - if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, - persistence->transport_key, persistence->local_name, persistence->local_port)) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n", + generator = subscription_get_generator_from_rdata(rdata, handler); + if (!generator) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } - if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) { - ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n", - ast_sorcery_object_get_id(endpoint)); + ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); + + /* Getting the endpoint may take some time that can affect the expiration. */ + endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", + persistence->endpoint); + if (!endpoint) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n", + persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri); - resource_size = pj_strlen(&request_uri->user) + 1; - resource = ast_alloca(resource_size); - ast_copy_pj_str(resource, &request_uri->user, resource_size); - /* Update the expiration header with the new expiration */ - expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, + rdata->msg_info.msg->hdr.next); if (!expires_header) { - expires_header = pjsip_expires_hdr_create(pool, 0); + expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0); if (!expires_header) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n", + persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header); + pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header); } expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); - - handler = subscription_get_handler_from_rdata(&rdata); - if (!handler || !handler->notifier) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } - - generator = subscription_get_generator_from_rdata(&rdata, handler); - if (!generator) { + if (expires_header->ivalue <= 0) { + /* The subscription expired since we started recreating the subscription. */ ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); return 0; } - ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, - pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); - memset(&tree, 0, sizeof(tree)); resp = build_resource_tree(endpoint, handler, resource, &tree, - ast_sip_pubsub_has_eventlist_support(&rdata)); + ast_sip_pubsub_has_eventlist_support(rdata)); if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { pj_status_t dlg_status; - sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status); + sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, + &tree, &dlg_status); if (!sub_tree) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint); - return 0; - } - sub_tree->persistence = ao2_bump(persistence); - subscription_persistence_update(sub_tree, &rdata); - if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) { - pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); - ao2_ref(sub_tree, -1); + if (dlg_status != PJ_EEXISTS) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + } else { + sub_tree->persistence = ao2_bump(persistence); + subscription_persistence_update(sub_tree, rdata); + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, + ao2_bump(sub_tree))) { + /* Could not send initial subscribe NOTIFY */ + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + ao2_ref(sub_tree, -1); + } } } else { ast_sorcery_delete(ast_sip_get_sorcery(), persistence); } resource_tree_destroy(&tree); + ao2_ref(endpoint, -1); + + return 0; +} + +/*! \brief Callback function to perform the actual recreation of a subscription */ +static int subscription_persistence_recreate(void *obj, void *arg, int flags) +{ + struct subscription_persistence *persistence = obj; + pj_pool_t *pool = arg; + struct ast_taskprocessor *serializer; + pjsip_rx_data rdata; + struct persistence_recreate_data recreate_data; + + /* If this subscription has already expired remove it */ + if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + memset(&rdata, 0, sizeof(rdata)); + pj_pool_reset(pool); + rdata.tp_info.pool = pool; + + if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, + persistence->transport_key, persistence->local_name, persistence->local_port)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) { + ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + /* Continue the remainder in the distributor serializer */ + serializer = ast_sip_get_distributor_serializer(&rdata); + if (!serializer) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + recreate_data.persistence = persistence; + recreate_data.rdata = &rdata; + if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + ast_taskprocessor_unreference(serializer); return 0; } -- cgit v1.2.3