diff options
author | George Joseph <gjoseph@digium.com> | 2017-02-07 12:17:12 -0700 |
---|---|---|
committer | George Joseph <gjoseph@digium.com> | 2017-02-15 13:11:46 -0600 |
commit | 4bdf5d329fb10d90d3ba53188834c58f54a5324c (patch) | |
tree | 1fa70b8698d938da159efd69350ff19551ff4988 /res/res_pjsip_pubsub.c | |
parent | 0b5a17082be78be17c5191d86f806936dcdc7d61 (diff) |
res_pjsip_pubsub: Correctly implement persisted subscriptions
This patch fixes 2 original issues and more that those 2 exposed.
* When we send a NOTIFY, and the client either doesn't respond or
responds with a non OK, pjproject only calls our
pubsub_on_evsub_state callback, no others. Since
pubsub_on_evsub_state (which does the sub_tree cleanup) does not
expect to be called back without the other callbacks being called
first, it just returns leaving the sub_tree orphaned. Now
pubsub_on_evsub_state checks the event for PJSIP_EVENT_TSX_STATE
which is what pjproject will set to tell us that it was the
transaction that timed out or failed and not the subscription
itself timing our or being terminated by the client. If is
TSX_STATE, pubsub_on_evsub_state now does the proper cleanup
regardless of the state of the subscription.
* When a client renews a subscription, we don't update the
persisted subscription with the new expires timestamp. This causes
subscription_persistence_recreate to prune the subscription if/when
asterisk restarts. Now, pubsub_on_rx_refresh calls
subscription_persistence_update to apply the new expires timestamp.
This exposed other issues however...
* When creating a dialog from rdata (which sub_persistence_recreate
does from the packet buffer) there must NOT be a tag on the To
header (which there will be when a client refreshes a
subscription). If there is one, pjsip_dlg_create_uas will fail.
To address this, subscription_persistence_update now accepts a flag
that indicates that the original packet buffer must not be updated.
New subscribes don't set the flag and renews do. This makes sure
that when the rdata is recreated on asterisk startup, it's done
from the original subscribe packet which won't have the tag on To.
* When creating a dialog from rdata, we were setting the dialog's
remote (SUBSCRIBE) cseq to be the same as the local (NOTIFY) cseq.
When the client tried to resubscribe after a restart with the
correct cseq, we'd reject the request with an Invalid CSeq error.
* The acts of creating a dialog and evsub by themselves when
recreating a subscription does NOT restart pjproject's subscription
timer. The result was that even if we did correctly recreate the
subscription, we never removed it if the client happened to go away
or send a non-OK response to a NOTIFY. However, there is no
pjproject function exposed to just set the timer on an evsub that
wasn't created by an incoming subscribe request. To address this,
we create our own timer using ast_sip_schedule_task. This timer is
used only for re-establishing subscriptions after a restart.
An earlier approach was to add support for setting pjproject's
timer (via a pjproject patch) and while that patch is still included
here, we don't use that call at the moment.
While addressing these issues, additional debugging was added and
some existing messages made more useful. A few formatting changes
were also made to 'pjsip show scheduled tasks' to make displaying
the subscription timers a little more friendly.
ASTERISK-26696
ASTERISK-26756
Change-Id: I8c605fc1e3923f466a74db087d5ab6f90abce68e
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r-- | res/res_pjsip_pubsub.c | 231 |
1 files changed, 189 insertions, 42 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 42f0dc11e..709dc6640 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -392,6 +392,13 @@ enum sip_subscription_tree_state { SIP_SUB_TREE_TERMINATED, }; +static char *sub_tree_state_description[] = { + "Normal", + "TerminatePending", + "TerminateInProgress", + "Terminated" +}; + /*! * \brief A tree of SIP subscriptions * @@ -428,6 +435,11 @@ struct sip_subscription_tree { AST_LIST_ENTRY(sip_subscription_tree) next; /*! Subscription tree state */ enum sip_subscription_tree_state state; + /*! On asterisk restart, this is the task data used + * to restart the expiration timer if pjproject isn't + * capable of restarting the timer. + */ + struct ast_sip_sched_task *expiration_task; }; /*! @@ -482,6 +494,17 @@ static const char *sip_subscription_roles_map[] = { [AST_SIP_NOTIFIER] = "Notifier" }; +enum sip_persistence_update_type { + /*! Called from send request */ + SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0, + /*! Subscription created from initial client request */ + SUBSCRIPTION_PERSISTENCE_CREATED, + /*! Subscription recreated by asterisk on startup */ + SUBSCRIPTION_PERSISTENCE_RECREATED, + /*! Subscription created from client refresh */ + SUBSCRIPTION_PERSISTENCE_REFRESHED, +}; + AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree); AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator); @@ -560,7 +583,7 @@ static struct subscription_persistence *subscription_persistence_create(struct s /*! \brief Function which updates persistence information of a subscription in sorcery */ static void subscription_persistence_update(struct sip_subscription_tree *sub_tree, - pjsip_rx_data *rdata) + pjsip_rx_data *rdata, enum sip_persistence_update_type type) { pjsip_dialog *dlg; @@ -568,6 +591,9 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr return; } + ast_debug(3, "Updating persistence for '%s->%s'\n", + ast_sorcery_object_get_id(sub_tree->endpoint), sub_tree->root->resource); + dlg = sub_tree->dlg; sub_tree->persistence->cseq = dlg->local.cseq; @@ -584,12 +610,15 @@ static void subscription_persistence_update(struct sip_subscription_tree *sub_tr * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will * only ever have a single SIP message on it, and so we base persistence on that. */ - if (rdata->msg_info.msg_buf) { - ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf, - MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len)); - } else { - ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet, - sizeof(sub_tree->persistence->packet)); + if (type == SUBSCRIPTION_PERSISTENCE_CREATED + || type == SUBSCRIPTION_PERSISTENCE_RECREATED) { + if (rdata->msg_info.msg_buf) { + ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf, + MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len)); + } else { + ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet, + sizeof(sub_tree->persistence->packet)); + } } ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub_tree->persistence->src_name)); @@ -986,7 +1015,8 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a struct resources visited; if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) { - ast_debug(2, "Subscription to resource %s is not to a list\n", resource); + ast_debug(2, "Subscription '%s->%s' is not to a list\n", + ast_sorcery_object_get_id(endpoint), resource); tree->root = tree_node_alloc(resource, NULL, 0); if (!tree->root) { return 500; @@ -994,7 +1024,8 @@ static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct a return handler->notifier->new_subscribe(endpoint, resource); } - ast_debug(2, "Subscription to resource %s is a list\n", resource); + ast_debug(2, "Subscription '%s->%s' is a list\n", + ast_sorcery_object_get_id(endpoint), resource); if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) { return 500; } @@ -1033,8 +1064,8 @@ static void remove_subscription(struct sip_subscription_tree *obj) if (i == obj) { AST_RWLIST_REMOVE_CURRENT(next); if (i->root) { - ast_debug(2, "Removing subscription to resource %s from list of subscriptions\n", - ast_sip_subscription_get_resource_name(i->root)); + ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n", + ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root)); } break; } @@ -1045,7 +1076,8 @@ static void remove_subscription(struct sip_subscription_tree *obj) static void destroy_subscription(struct ast_sip_subscription *sub) { - ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource); + ast_debug(3, "Destroying SIP subscription from '%s->%s'\n", + ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource); ast_free(sub->body_text); AST_VECTOR_FREE(&sub->children); @@ -1197,7 +1229,10 @@ static void subscription_tree_destructor(void *obj) { struct sip_subscription_tree *sub_tree = obj; - ast_debug(3, "Destroying subscription tree %p\n", sub_tree); + ast_debug(3, "Destroying subscription tree %p '%s->%s'\n", + sub_tree, + sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown", + sub_tree->root ? sub_tree->root->resource : "Unknown"); ao2_cleanup(sub_tree->endpoint); @@ -1213,7 +1248,8 @@ static void subscription_tree_destructor(void *obj) void ast_sip_subscription_destroy(struct ast_sip_subscription *sub) { - ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree); + ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n", + sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree); ao2_cleanup(sub->tree); } @@ -1320,7 +1356,6 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag); pjsip_ua_register_dlg(pjsip_ua_instance(), dlg); dlg->local.cseq = persistence->cseq; - dlg->remote.cseq = persistence->cseq; } pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub); @@ -1345,6 +1380,12 @@ static struct sip_subscription_tree *create_subscription_tree(const struct ast_s return sub_tree; } +/*! Wrapper structure for initial_notify_task */ +struct initial_notify_data { + struct sip_subscription_tree *sub_tree; + int expires; +}; + static int initial_notify_task(void *obj); static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state); @@ -1433,9 +1474,12 @@ static int sub_persistence_recreate(void *obj) } pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header); } + expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); if (expires_header->ivalue <= 0) { /* The subscription expired since we started recreating the subscription. */ + ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n", + persistence->endpoint, persistence->tag); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); ao2_ref(endpoint, -1); return 0; @@ -1456,18 +1500,30 @@ static int sub_persistence_recreate(void *obj) ast_sorcery_delete(ast_sip_get_sorcery(), persistence); } } else { + struct initial_notify_data *ind = ast_malloc(sizeof(*ind)); + + if (!ind) { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + goto error; + } + + ind->sub_tree = ao2_bump(sub_tree); + ind->expires = expires_header->ivalue; + sub_tree->persistence = ao2_bump(persistence); - subscription_persistence_update(sub_tree, rdata); - if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, - ao2_bump(sub_tree))) { + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED); + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) { /* Could not send initial subscribe NOTIFY */ pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); ao2_ref(sub_tree, -1); + ast_free(ind); } } } else { ast_sorcery_delete(ast_sip_get_sorcery(), persistence); } + +error: resource_tree_destroy(&tree); ao2_ref(endpoint, -1); @@ -1485,6 +1541,8 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) /* If this subscription has already expired remove it */ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n", + persistence->endpoint, persistence->tag); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } @@ -1814,7 +1872,7 @@ static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, res = internal_pjsip_evsub_send_request(sub_tree, tdata); - subscription_persistence_update(sub_tree, NULL); + subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST); ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", "StateText: %s\r\n" @@ -2713,21 +2771,45 @@ static int generate_initial_notify(struct ast_sip_subscription *sub) return res; } +static int pubsub_on_refresh_timeout(void *userdata); + static int initial_notify_task(void * obj) { - struct sip_subscription_tree *sub_tree; + struct initial_notify_data *ind = obj; - sub_tree = obj; - if (generate_initial_notify(sub_tree->root)) { - pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + if (generate_initial_notify(ind->sub_tree->root)) { + pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE); } else { - send_notify(sub_tree, 1); + send_notify(ind->sub_tree, 1); ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED", "Resource: %s", - sub_tree->root->resource); + ind->sub_tree->root->resource); + } + + if (ind->expires > -1) { + char *name = ast_alloca(strlen("->/ ") + + strlen(ind->sub_tree->persistence->endpoint) + + strlen(ind->sub_tree->root->resource) + + strlen(ind->sub_tree->root->handler->event_name) + + ind->sub_tree->dlg->call_id->id.slen + 1); + + sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint, + ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name, + (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr); + + ast_debug(3, "Scheduling timer: %s\n", name); + ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer, + ind->expires * 1000, pubsub_on_refresh_timeout, name, + ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2); + if (!ind->sub_tree->expiration_task) { + ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n", + ind->expires, name); + } } - ao2_ref(sub_tree, -1); + ao2_ref(ind->sub_tree, -1); + ast_free(ind); + return 0; } @@ -2820,12 +2902,25 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); } } else { + struct initial_notify_data *ind = ast_malloc(sizeof(*ind)); + + if (!ind) { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + resource_tree_destroy(&tree); + return PJ_TRUE; + } + + ind->sub_tree = ao2_bump(sub_tree); + /* Since this is a normal subscribe, pjproject takes care of the timer */ + ind->expires = -1; + sub_tree->persistence = subscription_persistence_create(sub_tree); - subscription_persistence_update(sub_tree, rdata); + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED); sip_subscription_accept(sub_tree, rdata, resp); - if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) { + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) { pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); ao2_ref(sub_tree, -1); + ast_free(ind); } } @@ -3360,7 +3455,7 @@ static void set_state_terminated(struct ast_sip_subscription *sub) * send_notify ultimately calls pjsip_evsub_send_request * pjsip_evsub_send_request calls evsub's set_state * set_state calls pubsub_evsub_set_state - * pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS + * pubsub_on_evsub_state checks state == TERMINATE_IN_PROGRESS * removes the subscriptions * cleans up references to evsub * sets state = TERMINATED @@ -3378,6 +3473,15 @@ static void set_state_terminated(struct ast_sip_subscription *sub) * serialized_pubsub_on_refresh_timeout starts * See (1) Above * + * * Transmission failure sending NOTIFY or error response from client + * pjproject transaction timer expires or non OK response + * pjproject locks dialog + * calls pubsub_on_evsub_state with event TSX_STATE + * pubsub_on_evsub_state checks event == TSX_STATE + * removes the subscriptions + * cleans up references to evsub + * sets state = TERMINATED + * pjproject unlocks dialog * * * ast_sip_subscription_notify is called * checks state == NORMAL @@ -3403,25 +3507,41 @@ static void set_state_terminated(struct ast_sip_subscription *sub) * * Although this function is called for every state change, we only care * about the TERMINATED state, and only when we're actually processing the final - * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS). In this case, we do all - * the subscription tree cleanup tasks and decrement the evsub reference. + * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS) OR when a transmission failure + * occurs (PJSIP_EVENT_TSX_STATE). In this case, we do all the subscription tree + * cleanup tasks and decrement the evsub reference. */ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) { - struct sip_subscription_tree *sub_tree; + struct sip_subscription_tree *sub_tree = + pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - ast_debug(3, "on_evsub_state called with state %s\n", pjsip_evsub_get_state_name(evsub)); + ast_debug(3, "evsub %p state %s event %s sub_tree %p sub_tree state %s\n", evsub, + pjsip_evsub_get_state_name(evsub), pjsip_event_str(event->type), sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); - if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) { + if (!sub_tree || pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) { return; } - sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - if (!sub_tree || sub_tree->state != SIP_SUB_TREE_TERMINATE_IN_PROGRESS) { - ast_debug(1, "Possible terminate race prevented %p\n", sub_tree); + /* It's easier to write this as what we WANT to process, then negate it. */ + if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS + || (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL) + )) { + ast_debug(3, "Do nothing.\n"); return; } + if (sub_tree->expiration_task) { + char task_name[256]; + + ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name)); + ast_debug(3, "Cancelling timer: %s\n", task_name); + ast_sip_sched_task_cancel(sub_tree->expiration_task); + ao2_cleanup(sub_tree->expiration_task); + sub_tree->expiration_task = NULL; + } + remove_subscription(sub_tree); pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); @@ -3443,16 +3563,17 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) ao2_ref(sub_tree, -1); } -static int serialized_pubsub_on_refresh_timeout(void *userdata) +static int pubsub_on_refresh_timeout(void *userdata) { struct sip_subscription_tree *sub_tree = userdata; pjsip_dialog *dlg = sub_tree->dlg; + ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + pjsip_dlg_inc_lock(dlg); if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) { - ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree->evsub, sub_tree->state); pjsip_dlg_dec_lock(dlg); - ao2_cleanup(sub_tree); return 0; } @@ -3468,7 +3589,20 @@ static int serialized_pubsub_on_refresh_timeout(void *userdata) "Resource: %s", sub_tree->root->resource); pjsip_dlg_dec_lock(dlg); + + return 0; +} + +static int serialized_pubsub_on_refresh_timeout(void *userdata) +{ + struct sip_subscription_tree *sub_tree = userdata; + + ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + + pubsub_on_refresh_timeout(userdata); ao2_cleanup(sub_tree); + return 0; } @@ -3487,11 +3621,23 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, struct sip_subscription_tree *sub_tree; sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) { - ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 ); return; } + if (sub_tree->expiration_task) { + char task_name[256]; + + ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name)); + ast_debug(3, "Cancelling timer: %s\n", task_name); + ast_sip_sched_task_cancel(sub_tree->expiration_task); + ao2_cleanup(sub_tree->expiration_task); + sub_tree->expiration_task = NULL; + } + /* PJSIP will set the evsub's state to terminated before calling into this function * if the Expires value of the incoming SUBSCRIBE is 0. */ @@ -3500,6 +3646,8 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING; } + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_REFRESHED); + if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) { /* If we can't push the NOTIFY refreshing task...we'll just go with it. */ ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n"); @@ -3577,7 +3725,6 @@ static void pubsub_on_server_timeout(pjsip_evsub *evsub) sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) { - ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 ); return; } |