diff options
author | Mark Michelson <mmichelson@digium.com> | 2014-06-25 20:57:28 +0000 |
---|---|---|
committer | Mark Michelson <mmichelson@digium.com> | 2014-06-25 20:57:28 +0000 |
commit | bc8c08c6090da176afd48712477f4d66fc71ab2b (patch) | |
tree | 7de55cfa4d0ab5e595acb181172b38f4557280fe /res/res_pjsip_pubsub.c | |
parent | 4a7a36a0a1533304844feac6901a724633bc71cc (diff) |
Abstract PJSIP-specific elements from the pubsub API.
This helps to pave the way for RLS work that is to come.
Since this is a self-contained change and subscription
tests still pass, this work is being committed directly
to trunk instead of a working branch.
ASTERISK-23865 #close
Review: https://reviewboard.asterisk.org/r/3628
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@417233 65c4cc65-6c06-0410-ace0-fbb531ad65f3
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r-- | res/res_pjsip_pubsub.c | 747 |
1 files changed, 469 insertions, 278 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index dfca643bc..39846823e 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -120,8 +120,8 @@ static struct pjsip_module pubsub_module = { .on_rx_request = pubsub_on_rx_request, }; -#define MOD_DATA_BODY_GENERATOR "sub_body_generator" #define MOD_DATA_PERSISTENCE "sub_persistence" +#define MOD_DATA_MSG "sub_msg" static const pj_str_t str_event_name = { "Event", 5 }; @@ -249,6 +249,55 @@ struct subscription_persistence { }; /*! + * \brief Real subscription details + * + * A real subscription is one that has a direct link to a + * PJSIP subscription and dialog. + */ +struct ast_sip_real_subscription { + /*! The underlying PJSIP event subscription structure */ + pjsip_evsub *evsub; + /*! The underlying PJSIP dialog */ + pjsip_dialog *dlg; +}; + +/*! + * \brief Virtual subscription details + * + * A virtual subscription is one that does not have a direct + * link to a PJSIP subscription. Instead, it is a descendent + * of an ast_sip_subscription. Following the ancestry will + * eventually lead to a real subscription. + */ +struct ast_sip_virtual_subscription { + struct ast_sip_subscription *parent; +}; + +/*! + * \brief Discriminator between real and virtual subscriptions + */ +enum sip_subscription_type { + /*! + * \brief a "real" subscription. + * + * Real subscriptions are at the root of a tree of subscriptions. + * A real subscription has a corresponding SIP subscription in the + * PJSIP stack. + */ + SIP_SUBSCRIPTION_REAL, + /*! + * \brief a "virtual" subscription. + * + * Virtual subscriptions are the descendents of real subscriptions + * in a tree of subscriptions. Virtual subscriptions do not have + * a corresponding SIP subscription in the PJSIP stack. Instead, + * when a state change happens on a virtual subscription, the + * state change is indicated to the virtual subscription's parent. + */ + SIP_SUBSCRIPTION_VIRTUAL, +}; + +/*! * \brief Structure representing a SIP subscription */ struct ast_sip_subscription { @@ -262,16 +311,23 @@ struct ast_sip_subscription { const struct ast_sip_subscription_handler *handler; /*! The role for this subscription */ enum ast_sip_subscription_role role; - /*! The underlying PJSIP event subscription structure */ - pjsip_evsub *evsub; - /*! The underlying PJSIP dialog */ - pjsip_dialog *dlg; + /*! Indicator of real or virtual subscription */ + enum sip_subscription_type type; + /*! Real and virtual components of the subscription */ + union { + struct ast_sip_real_subscription real; + struct ast_sip_virtual_subscription virtual; + } reality; /*! Body generaator for NOTIFYs */ struct ast_sip_pubsub_body_generator *body_generator; /*! Persistence information */ struct subscription_persistence *persistence; /*! Next item in the list */ AST_LIST_ENTRY(ast_sip_subscription) next; + /*! List of child subscriptions */ + AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children; + /*! Name of resource being subscribed to */ + char resource[0]; }; static const char *sip_subscription_roles_map[] = { @@ -284,6 +340,16 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription); AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator); AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement); +static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub) +{ + return sub->reality.real.evsub; +} + +static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub) +{ + return sub->reality.real.dlg; +} + /*! \brief Destructor for subscription persistence */ static void subscription_persistence_destroy(void *obj) { @@ -310,12 +376,14 @@ static struct subscription_persistence *subscription_persistence_create(struct a struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(), "subscription_persistence", NULL); + pjsip_dialog *dlg = sip_subscription_get_dlg(sub); + if (!persistence) { return NULL; } persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint)); - ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag)); + ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag)); persistence->tag = ast_strdup(tag); ast_sorcery_create(ast_sip_get_sorcery(), persistence); @@ -326,11 +394,14 @@ static struct subscription_persistence *subscription_persistence_create(struct a static void subscription_persistence_update(struct ast_sip_subscription *sub, pjsip_rx_data *rdata) { + pjsip_dialog *dlg; + if (!sub->persistence) { return; } - sub->persistence->cseq = sub->dlg->local.cseq; + dlg = sip_subscription_get_dlg(sub); + sub->persistence->cseq = dlg->local.cseq; if (rdata) { int expires; @@ -410,13 +481,17 @@ static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rda /* If a SUBSCRIBE contains no Accept headers, then we must assume that * the default accept type for the event package is to be used. */ - ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0])); + ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0])); num_accept_headers = 1; } return find_body_generator(accept, num_accept_headers); } +static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler, + struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource, + struct ast_sip_pubsub_body_generator *generator); + /*! \brief Callback function to perform the actual recreation of a subscription */ static int subscription_persistence_recreate(void *obj, void *arg, int flags) { @@ -428,6 +503,10 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); struct ast_sip_subscription *sub; struct ast_sip_pubsub_body_generator *generator; + int resp; + char *resource; + size_t resource_size; + pjsip_sip_uri *request_uri; /* If this subscription has already expired remove it */ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { @@ -454,6 +533,11 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) return 0; } + request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri); + resource_size = pj_strlen(&request_uri->user) + 1; + resource = alloca(resource_size); + ast_copy_pj_str(resource, &request_uri->user, resource_size); + /* Update the expiration header with the new expiration */ expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); if (!expires_header) { @@ -467,7 +551,7 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); handler = subscription_get_handler_from_rdata(&rdata); - if (!handler) { + if (!handler || !handler->notifier) { ast_sorcery_delete(ast_sip_get_sorcery(), persistence); return 0; } @@ -479,12 +563,11 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) } ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, - pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator); - ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); - sub = handler->new_subscribe(endpoint, &rdata); - if (sub) { + resp = handler->notifier->new_subscribe(endpoint, resource); + if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator); sub->persistence = ao2_bump(persistence); subscription_persistence_update(sub, &rdata); } else { @@ -596,11 +679,11 @@ static void sip_subscription_to_ami(struct ast_sip_subscription *sub, ast_str_append(buf, 0, "Endpoint: %s\r\n", ast_sorcery_object_get_id(sub->endpoint)); - ast_copy_pj_str(str, &sub->dlg->call_id->id, sizeof(str)); + ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str)); ast_str_append(buf, 0, "Callid: %s\r\n", str); ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name( - ast_sip_subscription_get_evsub(sub))); + sip_subscription_get_evsub(sub))); ast_callerid_merge(str, sizeof(str), S_COR(id->self.name.valid, id->self.name.str, NULL), @@ -653,8 +736,8 @@ static int subscription_remove_serializer(void *obj) * subscription is destroyed so that we can guarantee that our attempt to * remove the serializer will be successful. */ - ast_sip_dialog_set_serializer(sub->dlg, NULL); - pjsip_dlg_dec_session(sub->dlg, &pubsub_module); + ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL); + pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module); return 0; } @@ -672,14 +755,14 @@ static void subscription_destructor(void *obj) ao2_cleanup(sub->datastores); ao2_cleanup(sub->endpoint); - if (sub->dlg) { + if (sip_subscription_get_dlg(sub)) { ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub); } ast_taskprocessor_unreference(sub->serializer); } + static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event); -static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event); static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, @@ -690,41 +773,23 @@ static void pubsub_on_server_timeout(pjsip_evsub *sub); static pjsip_evsub_user pubsub_cb = { .on_evsub_state = pubsub_on_evsub_state, - .on_tsx_state = pubsub_on_tsx_state, .on_rx_refresh = pubsub_on_rx_refresh, .on_rx_notify = pubsub_on_rx_notify, .on_client_refresh = pubsub_on_client_refresh, .on_server_timeout = pubsub_on_server_timeout, }; -static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role, - struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg) -{ - pjsip_evsub *evsub; - /* PJSIP is kind enough to have some built-in support for certain - * events. We need to use the correct initialization function for the - * built-in events - */ - if (role == AST_SIP_NOTIFIER) { - pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub); - } else { - pj_str_t pj_event; - pj_cstr(&pj_event, event); - pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub); - } - return evsub; -} - -struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, - enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler, + struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role) { - struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor); - pjsip_dialog *dlg; - struct subscription_persistence *persistence; + struct ast_sip_subscription *sub; + sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor); if (!sub) { return NULL; } + strcpy(sub->resource, resource); /* Safe */ + sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp); if (!sub->datastores) { ao2_ref(sub, -1); @@ -735,28 +800,46 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su ao2_ref(sub, -1); return NULL; } - sub->body_generator = ast_sip_mod_data_get(rdata->endpt_info.mod_data, - pubsub_module.id, MOD_DATA_BODY_GENERATOR); sub->role = role; - if (role == AST_SIP_NOTIFIER) { - dlg = ast_sip_create_dialog_uas(endpoint, rdata); - } else { - RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup); + sub->type = SIP_SUBSCRIPTION_REAL; + sub->endpoint = ao2_bump(endpoint); + sub->handler = handler; - contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors); - if (!contact || ast_strlen_zero(contact->uri)) { - ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n", - ast_sorcery_object_get_id(endpoint)); - ao2_ref(sub, -1); - return NULL; - } - dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL); + return sub; +} + +static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg) +{ + /* We keep a reference to the dialog until our subscription is destroyed. See + * the subscription_destructor for more details + */ + pjsip_dlg_inc_session(dlg, &pubsub_module); + sub->reality.real.dlg = dlg; + ast_sip_dialog_set_serializer(dlg, sub->serializer); + pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub); +} + +static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler, + struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource, + struct ast_sip_pubsub_body_generator *generator) +{ + struct ast_sip_subscription *sub; + pjsip_dialog *dlg; + struct subscription_persistence *persistence; + + sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER); + if (!sub) { + return NULL; } + + sub->body_generator = generator; + dlg = ast_sip_create_dialog_uas(endpoint, rdata); if (!dlg) { ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); ao2_ref(sub, -1); return NULL; } + persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE); if (persistence) { @@ -768,62 +851,102 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su dlg->local.cseq = persistence->cseq; dlg->remote.cseq = persistence->cseq; } - sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg); - /* We keep a reference to the dialog until our subscription is destroyed. See - * the subscription_destructor for more details - */ - pjsip_dlg_inc_session(dlg, &pubsub_module); - sub->dlg = dlg; - ast_sip_dialog_set_serializer(dlg, sub->serializer); - pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub); - ao2_ref(endpoint, +1); - sub->endpoint = endpoint; - sub->handler = handler; + + pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub); + subscription_setup_dialog(sub, dlg); + + ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG, + pjsip_msg_clone(dlg->pool, rdata->msg_info.msg)); add_subscription(sub); return sub; } -struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) +void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header) { - ast_assert(sub->endpoint != NULL); - ao2_ref(sub->endpoint, +1); - return sub->endpoint; -} + pjsip_dialog *dlg = sip_subscription_get_dlg(sub); + pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG); + pj_str_t name; -struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) -{ - ast_assert(sub->serializer != NULL); - return sub->serializer; + pj_cstr(&name, header); + + return pjsip_msg_find_hdr_by_name(msg, &name, NULL); } -pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub) +struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, + struct ast_sip_endpoint *endpoint, const char *resource) { - return sub->evsub; + struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor); + pjsip_dialog *dlg; + struct ast_sip_contact *contact; + pj_str_t event; + pjsip_tx_data *tdata; + pjsip_evsub *evsub; + + sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER); + if (!sub) { + return NULL; + } + + contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors); + if (!contact || ast_strlen_zero(contact->uri)) { + ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n", + ast_sorcery_object_get_id(endpoint)); + ao2_ref(sub, -1); + ao2_cleanup(contact); + return NULL; + } + + dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL); + ao2_cleanup(contact); + if (!dlg) { + ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); + ao2_ref(sub, -1); + return NULL; + } + + pj_cstr(&event, handler->event_name); + pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub); + subscription_setup_dialog(sub, dlg); + + add_subscription(sub); + + evsub = sip_subscription_get_evsub(sub); + + if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) { + pjsip_evsub_send_request(evsub, tdata); + } else { + /* pjsip_evsub_terminate will result in pubsub_on_evsub_state, + * being called and terminating the subscription. Therefore, we don't + * need to decrease the reference count of sub here. + */ + pjsip_evsub_terminate(evsub, PJ_TRUE); + return NULL; + } + + return sub; } -pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub) +struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) { - return sub->dlg; + ast_assert(sub->endpoint != NULL); + ao2_ref(sub->endpoint, +1); + return sub->endpoint; } -int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response) +struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) { - /* If this is a persistence recreation the subscription has already been accepted */ - if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { - return 0; - } - - return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1; + ast_assert(sub->serializer != NULL); + return sub->serializer; } -int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) +static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) { struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub); int res; ao2_ref(sub, +1); - res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub), + res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub), tdata) == PJ_SUCCESS ? 0 : -1; subscription_persistence_update(sub, NULL); @@ -831,7 +954,7 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", "StateText: %s\r\n" "Endpoint: %s\r\n", - pjsip_evsub_get_state_name(ast_sip_subscription_get_evsub(sub)), + pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)), ast_sorcery_object_get_id(endpoint)); ao2_cleanup(sub); ao2_cleanup(endpoint); @@ -839,6 +962,83 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx return res; } +int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data, + int terminate) +{ + struct ast_sip_body body = { + .type = ast_sip_subscription_get_body_type(sub), + .subtype = ast_sip_subscription_get_body_subtype(sub), + }; + struct ast_str *body_text = ast_str_create(64); + pjsip_evsub *evsub = sip_subscription_get_evsub(sub); + pjsip_tx_data *tdata; + pjsip_evsub_state state; + + if (!body_text) { + return -1; + } + + if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) { + ast_free(body_text); + return -1; + } + + body.body_text = ast_str_buffer(body_text); + + if (terminate) { + state = PJSIP_EVSUB_STATE_TERMINATED; + } else { + state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ? + PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED; + } + + ast_log_backtrace(); + + if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) { + ast_free(body_text); + return -1; + } + if (ast_sip_add_body(tdata, &body)) { + ast_free(body_text); + pjsip_tx_data_dec_ref(tdata); + return -1; + } + if (sip_subscription_send_request(sub, tdata)) { + ast_free(body_text); + pjsip_tx_data_dec_ref(tdata); + return -1; + } + + return 0; +} + +void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size) +{ + pjsip_dialog *dlg = sip_subscription_get_dlg(sub); + ast_copy_pj_str(buf, &dlg->local.info_str, size); +} + +void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size) +{ + pjsip_dialog *dlg = sip_subscription_get_dlg(sub); + ast_copy_pj_str(buf, &dlg->remote.info_str, size); +} + +const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub) +{ + return sub->resource; +} + +static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response) +{ + /* If this is a persistence recreation the subscription has already been accepted */ + if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { + return 0; + } + + return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1; +} + static void subscription_datastore_destroy(void *obj) { struct ast_datastore *datastore = obj; @@ -1019,9 +1219,9 @@ static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(cons int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler) { pj_str_t event; - pj_str_t accept[AST_SIP_MAX_ACCEPT]; + pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, }; struct ast_sip_subscription_handler *existing; - int i; + int i = 0; if (ast_strlen_zero(handler->event_name)) { ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n"); @@ -1117,6 +1317,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); struct ast_sip_subscription *sub; struct ast_sip_pubsub_body_generator *generator; + char *resource; + pjsip_uri *request_uri; + pjsip_sip_uri *request_uri_sip; + size_t resource_size; + int resp; endpoint = ast_pjsip_rdata_get_endpoint(rdata); ast_assert(endpoint != NULL); @@ -1127,6 +1332,22 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) return PJ_TRUE; } + request_uri = rdata->msg_info.msg->line.req.uri; + + if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { + char uri_str[PJSIP_MAX_URL_SIZE]; + + pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); + ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); + return PJ_TRUE; + } + + request_uri_sip = pjsip_uri_get_uri(request_uri); + resource_size = pj_strlen(&request_uri_sip->user) + 1; + resource = alloca(resource_size); + ast_copy_pj_str(resource, &request_uri_sip->user, resource_size); + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next); if (expires_header) { @@ -1142,7 +1363,7 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL); return PJ_TRUE; } - } + } handler = subscription_get_handler_from_rdata(rdata); if (!handler) { @@ -1156,27 +1377,22 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) return PJ_TRUE; } - ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data, - pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator); + resp = handler->notifier->new_subscribe(endpoint, resource); + if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); + return PJ_TRUE; + } - sub = handler->new_subscribe(endpoint, rdata); + sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator); if (!sub) { - pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata); - - if (trans) { - pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata); - pjsip_tx_data *tdata; - - if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) { - return PJ_TRUE; - } - pjsip_dlg_send_response(dlg, trans, tdata); - } else { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); - } + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); } else { sub->persistence = subscription_persistence_create(sub); subscription_persistence_update(sub, rdata); + sip_subscription_accept(sub, rdata, resp); + if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) { + pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE); + } } return PJ_TRUE; @@ -1229,10 +1445,114 @@ static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata, return SIP_PUBLISH_UNKNOWN; } +/*! \brief Internal destructor for publications */ +static void publication_destroy_fn(void *obj) +{ + struct ast_sip_publication *publication = obj; + + ast_debug(3, "Destroying SIP publication\n"); + + ao2_cleanup(publication->datastores); + ao2_cleanup(publication->endpoint); +} + +static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +{ + struct ast_sip_publication *publication; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + ast_assert(endpoint != NULL); + + if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) { + return NULL; + } + + if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) { + ao2_ref(publication, -1); + return NULL; + } + + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + ao2_ref(endpoint, +1); + publication->endpoint = endpoint; + publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + publication->sched_id = -1; + + return publication; +} + +static int sip_publication_respond(struct ast_sip_publication *pub, int status_code, + pjsip_rx_data *rdata) +{ + pj_status_t status; + pjsip_tx_data *tdata; + pjsip_transaction *tsx; + + if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) { + return -1; + } + + if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { + RAII_VAR(char *, entity_tag, NULL, ast_free_ptr); + RAII_VAR(char *, expires, NULL, ast_free_ptr); + + if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) || + (ast_asprintf(&expires, "%d", pub->expires) < 0)) { + pjsip_tx_data_dec_ref(tdata); + return -1; + } + + ast_sip_add_header(tdata, "SIP-ETag", entity_tag); + ast_sip_add_header(tdata, "Expires", expires); + } + + if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) { + return -1; + } + + pjsip_tsx_recv_msg(tsx, rdata); + + if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) { + return -1; + } + + return 0; +} + static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, struct ast_sip_publish_handler *handler) { - struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata); + struct ast_sip_publication *publication; + char *resource; + size_t resource_size; + pjsip_uri *request_uri; + pjsip_sip_uri *request_uri_sip; + int resp; + + request_uri = rdata->msg_info.msg->line.req.uri; + + if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { + char uri_str[PJSIP_MAX_URL_SIZE]; + + pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); + ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); + return NULL; + } + + request_uri_sip = pjsip_uri_get_uri(request_uri); + resource_size = pj_strlen(&request_uri_sip->user) + 1; + resource = alloca(resource_size); + ast_copy_pj_str(resource, &request_uri_sip->user, resource_size); + + resp = handler->new_publication(endpoint, resource); + + if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); + return NULL; + } + + publication = sip_create_publication(endpoint, rdata); if (!publication) { pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); @@ -1240,6 +1560,14 @@ static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoi } publication->handler = handler; + if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_INITIALIZED)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); + ao2_cleanup(publication); + return NULL; + } + + sip_publication_respond(publication, resp, rdata); return publication; } @@ -1321,14 +1649,19 @@ static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) publication = publish_request_initial(endpoint, rdata, handler); break; case SIP_PUBLISH_REFRESH: + sip_publication_respond(publication, 200, rdata); case SIP_PUBLISH_MODIFY: - if (handler->publish_refresh(publication, rdata)) { + if (handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_ACTIVE)) { /* If an error occurs we want to terminate the publication */ expires = 0; } + sip_publication_respond(publication, 200, rdata); break; case SIP_PUBLISH_REMOVE: - handler->publish_termination(publication, rdata); + handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_TERMINATED); + sip_publication_respond(publication, 200, rdata); break; case SIP_PUBLISH_UNKNOWN: default: @@ -1350,85 +1683,11 @@ static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) return PJ_TRUE; } -/*! \brief Internal destructor for publications */ -static void publication_destroy_fn(void *obj) -{ - struct ast_sip_publication *publication = obj; - - ast_debug(3, "Destroying SIP publication\n"); - - ao2_cleanup(publication->datastores); - ao2_cleanup(publication->endpoint); -} - -struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) -{ - struct ast_sip_publication *publication; - pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); - - ast_assert(endpoint != NULL); - - if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) { - return NULL; - } - - if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) { - ao2_ref(publication, -1); - return NULL; - } - - publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); - ao2_ref(endpoint, +1); - publication->endpoint = endpoint; - publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; - publication->sched_id = -1; - - return publication; -} - struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub) { return pub->endpoint; } -int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata, - pjsip_tx_data **tdata) -{ - if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) { - return -1; - } - - if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { - RAII_VAR(char *, entity_tag, NULL, ast_free_ptr); - RAII_VAR(char *, expires, NULL, ast_free_ptr); - - if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) || - (ast_asprintf(&expires, "%d", pub->expires) < 0)) { - pjsip_tx_data_dec_ref(*tdata); - return -1; - } - - ast_sip_add_header(*tdata, "SIP-ETag", entity_tag); - ast_sip_add_header(*tdata, "Expires", expires); - } - - return 0; -} - -pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata, - pjsip_tx_data *tdata) -{ - pj_status_t status; - pjsip_transaction *tsx; - - if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) { - return status; - } - - pjsip_tsx_recv_msg(tsx, rdata); - - return pjsip_tsx_send_msg(tsx, tdata); -} int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator) { @@ -1590,123 +1849,53 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); } -static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event) -{ - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - - if (!sub) { - return; - } - - if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC && - event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) { - sub->handler->notify_response(sub, event->body.tsx_state.src.rdata); - } -} - -static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code, - pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body, - struct ast_sip_subscription_response_data *response_data) -{ - ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699); - *p_st_code = response_data->status_code; - - if (!ast_strlen_zero(response_data->status_text)) { - pj_strdup2(pool, *p_st_text, response_data->status_text); - } - - if (response_data->headers) { - struct ast_variable *iter; - for (iter = response_data->headers; iter; iter = iter->next) { - pj_str_t header_name; - pj_str_t header_value; - pjsip_generic_string_hdr *hdr; - - pj_cstr(&header_name, iter->name); - pj_cstr(&header_value, iter->value); - hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value); - pj_list_insert_before(res_hdr, hdr); - } - } - - if (response_data->body) { - pj_str_t type; - pj_str_t subtype; - pj_str_t body_text; - - pj_cstr(&type, response_data->body->type); - pj_cstr(&subtype, response_data->body->subtype); - pj_cstr(&body_text, response_data->body->body_text); - - *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text); - } -} - -static int response_data_changed(struct ast_sip_subscription_response_data *response_data) -{ - if (response_data->status_code != 200 || - !ast_strlen_zero(response_data->status_text) || - response_data->headers || - response_data->body) { - return 1; - } - return 0; -} - static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) { struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - struct ast_sip_subscription_response_data response_data = { - .status_code = 200, - }; + enum ast_sip_subscription_notify_reason reason; if (!sub) { return; } - if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) { - sub->handler->subscription_terminated(sub, rdata); - return; + if (pjsip_evsub_get_state(sip_subscription_get_evsub(sub)) == PJSIP_EVSUB_STATE_TERMINATED) { + reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED; + } else { + reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED; } - - sub->handler->resubscribe(sub, rdata, &response_data); - - if (!response_data_changed(&response_data)) { - return; + if (sub->handler->notifier->notify_required(sub, reason)) { + *p_st_code = 500; } - - set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text, - res_hdr, p_body, &response_data); } static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) { struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - struct ast_sip_subscription_response_data response_data = { - .status_code = 200, - }; - - if (!sub || !sub->handler->notify_request) { - return; - } - - sub->handler->notify_request(sub, rdata, &response_data); - if (!response_data_changed(&response_data)) { + if (!sub) { return; } - set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text, - res_hdr, p_body, &response_data); + sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body, + pjsip_evsub_get_state(evsub)); } static int serialized_pubsub_on_client_refresh(void *userdata) { struct ast_sip_subscription *sub = userdata; + pjsip_evsub *evsub; + pjsip_tx_data *tdata; + + evsub = sip_subscription_get_evsub(sub); - sub->handler->refresh_subscription(sub); + if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) { + pjsip_evsub_send_request(evsub, tdata); + } else { + pjsip_evsub_terminate(evsub, PJ_TRUE); + return 0; + } ao2_cleanup(sub); return 0; } @@ -1723,7 +1912,9 @@ static int serialized_pubsub_on_server_timeout(void *userdata) { struct ast_sip_subscription *sub = userdata; - sub->handler->subscription_timeout(sub); + sub->handler->notifier->notify_required(sub, + AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED); + ao2_cleanup(sub); return 0; } |