diff options
-rw-r--r-- | contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py | 36 | ||||
-rw-r--r-- | include/asterisk/res_pjsip.h | 17 | ||||
-rw-r--r-- | include/asterisk/res_pjsip_pubsub.h | 12 | ||||
-rw-r--r-- | res/res_pjsip.c | 30 | ||||
-rw-r--r-- | res/res_pjsip_exten_state.c | 3 | ||||
-rw-r--r-- | res/res_pjsip_mwi.c | 4 | ||||
-rw-r--r-- | res/res_pjsip_pubsub.c | 476 | ||||
-rw-r--r-- | res/res_pjsip_pubsub.exports.in | 1 |
8 files changed, 542 insertions, 37 deletions
diff --git a/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py b/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py new file mode 100644 index 000000000..aa5c8c435 --- /dev/null +++ b/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py @@ -0,0 +1,36 @@ +"""create pjsip subscription persistence table + +Revision ID: c6d929b23a8 +Revises: e96a0b8071c +Create Date: 2014-06-06 02:17:38.660447 + +""" + +# revision identifiers, used by Alembic. +revision = 'c6d929b23a8' +down_revision = 'e96a0b8071c' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table( + 'ps_subscription_persistence', + sa.Column('id', sa.String(40), nullable=False, unique=True), + sa.Column('packet', sa.String(2048)), + sa.Column('src_name', sa.String(128)), + sa.Column('src_port', sa.Integer), + sa.Column('transport_key', sa.String(64)), + sa.Column('local_name', sa.String(128)), + sa.Column('local_port', sa.Integer), + sa.Column('cseq', sa.Integer), + sa.Column('tag', sa.String(128)), + sa.Column('endpoint', sa.String(40)), + sa.Column('expires', sa.Integer), + ) + + op.create_index('ps_subscription_persistence_id', 'ps_subscription_persistence', ['id']) + +def downgrade(): + op.drop_table('ps_subscription_persistence') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 36d25fbec..c7e99aded 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1221,6 +1221,23 @@ pjsip_dialog *ast_sip_create_dialog_uac(const struct ast_sip_endpoint *endpoint, pjsip_dialog *ast_sip_create_dialog_uas(const struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata); /*! + * \brief General purpose method for creating an rdata structure using specific information + * + * \param rdata[out] The rdata structure that will be populated + * \param packet A SIP message + * \param src_name The source IP address of the message + * \param src_port The source port of the message + * \param transport_type The type of transport the message was received on + * \param local_name The local IP address the message was received on + * \param local_port The local port the message was received on + * + * \retval 0 success + * \retval -1 failure + */ +int ast_sip_create_rdata(pjsip_rx_data *rdata, char *packet, const char *src_name, int src_port, char *transport_type, + const char *local_name, int local_port); + +/*! * \brief General purpose method for creating a SIP request * * Its typical use would be to create one-off requests such as an out of dialog diff --git a/include/asterisk/res_pjsip_pubsub.h b/include/asterisk/res_pjsip_pubsub.h index 0b0a49e66..d46cbae30 100644 --- a/include/asterisk/res_pjsip_pubsub.h +++ b/include/asterisk/res_pjsip_pubsub.h @@ -456,6 +456,18 @@ pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub); pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub); /*! + * \brief Accept a subscription request + * + * \param sub The subscription to be accepted + * \param rdata The received subscription request + * \param response The response code to send + * + * \retval 0 Success + * \retval non-zero Failure + */ +int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response); + +/*! * \brief Send a request created via a PJSIP evsub method * * Callers of this function should take care to do so within a SIP servant diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 5a3fe2fe8..339cf1dfe 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1613,6 +1613,36 @@ pjsip_dialog *ast_sip_create_dialog_uas(const struct ast_sip_endpoint *endpoint, return dlg; } +int ast_sip_create_rdata(pjsip_rx_data *rdata, char *packet, const char *src_name, int src_port, + char *transport_type, const char *local_name, int local_port) +{ + pj_str_t tmp; + + rdata->tp_info.transport = PJ_POOL_ZALLOC_T(rdata->tp_info.pool, pjsip_transport); + if (!rdata->tp_info.transport) { + return -1; + } + + ast_copy_string(rdata->pkt_info.packet, packet, sizeof(rdata->pkt_info.packet)); + ast_copy_string(rdata->pkt_info.src_name, src_name, sizeof(rdata->pkt_info.src_name)); + rdata->pkt_info.src_port = src_port; + + pjsip_parse_rdata(packet, strlen(packet), rdata); + if (!rdata->msg_info.msg) { + return -1; + } + + pj_strdup2(rdata->tp_info.pool, &rdata->msg_info.via->recvd_param, rdata->pkt_info.src_name); + rdata->msg_info.via->rport_param = -1; + + rdata->tp_info.transport->key.type = pjsip_transport_get_type_from_name(pj_cstr(&tmp, transport_type)); + rdata->tp_info.transport->type_name = transport_type; + pj_strdup2(rdata->tp_info.pool, &rdata->tp_info.transport->local_name.host, local_name); + rdata->tp_info.transport->local_name.port = local_port; + + return 0; +} + /* PJSIP doesn't know about the INFO method, so we have to define it ourselves */ static const pjsip_method info_method = {PJSIP_OTHER_METHOD, {"INFO", 4} }; static const pjsip_method message_method = {PJSIP_OTHER_METHOD, {"MESSAGE", 7} }; diff --git a/res/res_pjsip_exten_state.c b/res/res_pjsip_exten_state.c index 7a9067939..f4bfef772 100644 --- a/res/res_pjsip_exten_state.c +++ b/res/res_pjsip_exten_state.c @@ -445,8 +445,7 @@ static struct ast_sip_subscription *new_subscribe(struct ast_sip_endpoint *endpo return NULL; } - if (pjsip_evsub_accept(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub), - rdata, 200, NULL) != PJ_SUCCESS) { + if (ast_sip_subscription_accept(exten_state_sub->sip_sub, rdata, 200)) { ast_log(LOG_WARNING, "Unable to accept the incoming extension state subscription.\n"); pjsip_evsub_terminate(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub), PJ_FALSE); return NULL; diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 6e45ded33..f25a7c48f 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -699,7 +699,6 @@ static struct ast_sip_subscription *mwi_new_subscribe(struct ast_sip_endpoint *e RAII_VAR(struct mwi_subscription *, sub, NULL, ao2_cleanup); pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri; pjsip_sip_uri *sip_ruri; - pjsip_evsub *evsub; char aor_name[80]; if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) { @@ -715,8 +714,7 @@ static struct ast_sip_subscription *mwi_new_subscribe(struct ast_sip_endpoint *e return NULL; } - evsub = ast_sip_subscription_get_evsub(sub->sip_sub); - pjsip_evsub_accept(evsub, rdata, 200, NULL); + ast_sip_subscription_accept(sub->sip_sub, rdata, 200); send_mwi_notify(sub, PJSIP_EVSUB_STATE_ACTIVE, NULL); return sub->sip_sub; diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 93ed744cf..88e284faf 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -72,6 +72,44 @@ </para> </description> </manager> + <configInfo name="res_pjsip_pubsub" language="en_US"> + <synopsis>Module that implements publish and subscribe support.</synopsis> + <configFile name="pjsip.conf"> + <configObject name="subscription_persistence"> + <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis> + <configOption name="packet"> + <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis> + </configOption> + <configOption name="src_name"> + <synopsis>The source address of the subscription</synopsis> + </configOption> + <configOption name="src_port"> + <synopsis>The source port of the subscription</synopsis> + </configOption> + <configOption name="transport_key"> + <synopsis>The type of transport the subscription was received on</synopsis> + </configOption> + <configOption name="local_name"> + <synopsis>The local address the subscription was received on</synopsis> + </configOption> + <configOption name="local_port"> + <synopsis>The local port the subscription was received on</synopsis> + </configOption> + <configOption name="cseq"> + <synopsis>The sequence number of the next NOTIFY to be sent</synopsis> + </configOption> + <configOption name="tag"> + <synopsis>The local tag of the dialog for the subscription</synopsis> + </configOption> + <configOption name="endpoint"> + <synopsis>The name of the endpoint that subscribed</synopsis> + </configOption> + <configOption name="expires"> + <synopsis>The time at which the subscription expires</synopsis> + </configOption> + </configObject> + </configFile> + </configInfo> ***/ static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata); @@ -83,6 +121,7 @@ static struct pjsip_module pubsub_module = { }; #define MOD_DATA_BODY_GENERATOR "sub_body_generator" +#define MOD_DATA_PERSISTENCE "sub_persistence" static const pj_str_t str_event_name = { "Event", 5 }; @@ -180,6 +219,35 @@ struct ast_sip_publication { int sched_id; }; + +/*! + * \brief Structure used for persisting an inbound subscription + */ +struct subscription_persistence { + /*! Sorcery object details */ + SORCERY_OBJECT(details); + /*! The name of the endpoint involved in the subscrption */ + char *endpoint; + /*! SIP message that creates the subscription */ + char packet[PJSIP_MAX_PKT_LEN]; + /*! Source address of the message */ + char src_name[PJ_INET6_ADDRSTRLEN]; + /*! Source port of the message */ + int src_port; + /*! Local transport key type */ + char transport_key[32]; + /*! Local transport address */ + char local_name[PJ_INET6_ADDRSTRLEN]; + /*! Local transport port */ + int local_port; + /*! Next CSeq to use for message */ + unsigned int cseq; + /*! Local tag of the dialog */ + char *tag; + /*! When this subscription expires */ + struct timeval expires; +}; + /*! * \brief Structure representing a SIP subscription */ @@ -200,6 +268,8 @@ struct ast_sip_subscription { pjsip_dialog *dlg; /*! Body generaator for NOTIFYs */ struct ast_sip_pubsub_body_generator *body_generator; + /*! Persistence information */ + struct subscription_persistence *persistence; /*! Next item in the list */ AST_LIST_ENTRY(ast_sip_subscription) next; }; @@ -214,6 +284,265 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription); AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator); AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement); +/*! \brief Destructor for subscription persistence */ +static void subscription_persistence_destroy(void *obj) +{ + struct subscription_persistence *persistence = obj; + + ast_free(persistence->endpoint); + ast_free(persistence->tag); +} + +/*! \brief Allocator for subscription persistence */ +static void *subscription_persistence_alloc(const char *name) +{ + return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy); +} + +/*! \brief Function which creates initial persistence information of a subscription in sorcery */ +static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub) +{ + char tag[PJ_GUID_STRING_LENGTH + 1]; + + /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to + * look it up by id at all. + */ + struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(), + "subscription_persistence", NULL); + + if (!persistence) { + return NULL; + } + + persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint)); + ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag)); + persistence->tag = ast_strdup(tag); + + ast_sorcery_create(ast_sip_get_sorcery(), persistence); + return persistence; +} + +/*! \brief Function which updates persistence information of a subscription in sorcery */ +static void subscription_persistence_update(struct ast_sip_subscription *sub, + pjsip_rx_data *rdata) +{ + if (!sub->persistence) { + return; + } + + sub->persistence->cseq = sub->dlg->local.cseq; + + if (rdata) { + int expires; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1)); + + ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet)); + ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name)); + sub->persistence->src_port = rdata->pkt_info.src_port; + ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name, + sizeof(sub->persistence->transport_key)); + ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host, + sizeof(sub->persistence->local_name)); + sub->persistence->local_port = rdata->tp_info.transport->local_name.port; + } + + ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence); +} + +/*! \brief Function which removes persistence of a subscription from sorcery */ +static void subscription_persistence_remove(struct ast_sip_subscription *sub) +{ + if (!sub->persistence) { + return; + } + + ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence); + ao2_ref(sub->persistence, -1); +} + + +static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name); +static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64], + size_t num_accept); + +/*! \brief Retrieve a handler using the Event header of an rdata message */ +static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata) +{ + pjsip_event_hdr *event_header; + char event[32]; + struct ast_sip_subscription_handler *handler; + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); + return NULL; + } + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + + handler = find_sub_handler_for_event_name(event); + if (!handler) { + ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); + } + + return handler; +} + +/*! \brief Retrieve a body generator using the Accept header of an rdata message */ +static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata, + const struct ast_sip_subscription_handler *handler) +{ + pjsip_accept_hdr *accept_header; + char accept[AST_SIP_MAX_ACCEPT][64]; + size_t num_accept_headers; + + accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); + if (accept_header) { + int i; + + for (i = 0; i < accept_header->count; ++i) { + ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); + } + num_accept_headers = accept_header->count; + } else { + /* If a SUBSCRIBE contains no Accept headers, then we must assume that + * the default accept type for the event package is to be used. + */ + ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0])); + num_accept_headers = 1; + } + + return find_body_generator(accept, num_accept_headers); +} + +/*! \brief Callback function to perform the actual recreation of a subscription */ +static int subscription_persistence_recreate(void *obj, void *arg, int flags) +{ + struct subscription_persistence *persistence = obj; + pj_pool_t *pool = arg; + pjsip_rx_data rdata = { { 0, }, }; + pjsip_expires_hdr *expires_header; + struct ast_sip_subscription_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct ast_sip_subscription *sub; + struct ast_sip_pubsub_body_generator *generator; + + /* If this subscription has already expired remove it */ + if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint); + if (!endpoint) { + ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + pj_pool_reset(pool); + rdata.tp_info.pool = pool; + + if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, + persistence->transport_key, persistence->local_name, persistence->local_port)) { + ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + /* Update the expiration header with the new expiration */ + expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); + if (!expires_header) { + expires_header = pjsip_expires_hdr_create(pool, 0); + if (!expires_header) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header); + } + expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); + + handler = subscription_get_handler_from_rdata(&rdata); + if (!handler) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + generator = subscription_get_generator_from_rdata(&rdata, handler); + if (!generator) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, + pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator); + ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); + + sub = handler->new_subscribe(endpoint, &rdata); + if (sub) { + sub->persistence = ao2_bump(persistence); + subscription_persistence_update(sub, &rdata); + } else { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + + return 0; +} + +/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */ +static int subscription_persistence_load(void *data) +{ + struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + pj_pool_t *pool; + + pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC); + if (!pool) { + ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n"); + return 0; + } + + ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool); + + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + + ao2_ref(persisted_subscriptions, -1); + return 0; +} + +/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */ +static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct ast_json_payload *payload; + const char *type; + + if (stasis_message_type(message) != ast_manager_get_generic_type()) { + return; + } + + payload = stasis_message_data(message); + type = ast_json_string_get(ast_json_object_get(payload->json, "type")); + + /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we + * recreate SIP subscriptions. + */ + if (strcmp(type, "FullyBooted")) { + return; + } + + /* This has to be here so the subscription is recreated when the body generator is available */ + ast_sip_push_task(NULL, subscription_persistence_load, NULL); + + /* Once the system is fully booted we don't care anymore */ + stasis_unsubscribe(sub); +} + static void add_subscription(struct ast_sip_subscription *obj) { SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); @@ -335,6 +664,9 @@ static void subscription_destructor(void *obj) struct ast_sip_subscription *sub = obj; ast_debug(3, "Destroying SIP subscription\n"); + + subscription_persistence_remove(sub); + remove_subscription(sub); ao2_cleanup(sub->datastores); @@ -388,6 +720,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su { struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor); pjsip_dialog *dlg; + struct subscription_persistence *persistence; if (!sub) { return NULL; @@ -424,6 +757,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su ao2_ref(sub, -1); return NULL; } + persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE); + if (persistence) { + /* Update the created dialog with the persisted information */ + pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg); + pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag); + dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag); + pjsip_ua_register_dlg(pjsip_ua_instance(), dlg); + dlg->local.cseq = persistence->cseq; + dlg->remote.cseq = persistence->cseq; + } sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg); /* We keep a reference to the dialog until our subscription is destroyed. See * the subscription_destructor for more details @@ -463,6 +807,16 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub) return sub->dlg; } +int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response) +{ + /* If this is a persistence recreation the subscription has already been accepted */ + if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { + return 0; + } + + return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1; +} + int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) { struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub); @@ -472,6 +826,8 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub), tdata) == PJ_SUCCESS ? 0 : -1; + subscription_persistence_update(sub, NULL); + ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", "StateText: %s\r\n" "Endpoint: %s\r\n", @@ -688,6 +1044,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); sub_add_handler(handler); + return 0; } @@ -755,15 +1112,10 @@ static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) { - char event[32]; - char accept[AST_SIP_MAX_ACCEPT][64]; - pjsip_accept_hdr *accept_header; - pjsip_event_hdr *event_header; pjsip_expires_hdr *expires_header; struct ast_sip_subscription_handler *handler; RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); struct ast_sip_subscription *sub; - size_t num_accept_headers; struct ast_sip_pubsub_body_generator *generator; endpoint = ast_pjsip_rdata_get_endpoint(rdata); @@ -784,38 +1136,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) return PJ_TRUE; } - event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); - if (!event_header) { - ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; - } - ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); - - handler = find_sub_handler_for_event_name(event); + handler = subscription_get_handler_from_rdata(rdata); if (!handler) { - ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); return PJ_TRUE; } - accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); - if (accept_header) { - int i; - - for (i = 0; i < accept_header->count; ++i) { - ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); - } - num_accept_headers = accept_header->count; - } else { - /* If a SUBSCRIBE contains no Accept headers, then we must assume that - * the default accept type for the event package is to be used. - */ - ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0])); - num_accept_headers = 1; - } - - generator = find_body_generator(accept, num_accept_headers); + generator = subscription_get_generator_from_rdata(rdata, handler); if (!generator) { pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); return PJ_TRUE; @@ -839,7 +1166,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) } else { pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); } + } else { + sub->persistence = subscription_persistence_create(sub); + subscription_persistence_update(sub, rdata); } + return PJ_TRUE; } @@ -1471,9 +1802,54 @@ static int ami_show_subscriptions_outbound(struct mansession *s, const struct me #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound" #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound" +static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->endpoint = ast_strdup(var->value); + return 0; +} + +static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->endpoint); + return 0; +} + +static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->tag = ast_strdup(var->value); + return 0; +} + +static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->tag); + return 0; +} + +static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL); +} + +static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0; +} + static int load_module(void) { static const pj_str_t str_PUBLISH = { "PUBLISH", 7 }; + struct ast_sorcery *sorcery = ast_sip_get_sorcery(); pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint()); @@ -1496,6 +1872,42 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } + ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub"); + ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence"); + if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc, + NULL, NULL)) { + ast_log(LOG_ERROR, "Could not register subscription persistence object support\n"); + ast_sip_unregister_service(&pubsub_module); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; + } + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, packet)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, src_name)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, src_port)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, transport_key)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, local_name)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, local_port)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, cseq)); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "", + persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "", + persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "", + persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0); + + if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + ast_sip_push_task(NULL, subscription_persistence_load, NULL); + } else { + stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + } + ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, ami_show_subscriptions_inbound); ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM, diff --git a/res/res_pjsip_pubsub.exports.in b/res/res_pjsip_pubsub.exports.in index f279b5433..0877d5c6c 100644 --- a/res/res_pjsip_pubsub.exports.in +++ b/res/res_pjsip_pubsub.exports.in @@ -5,6 +5,7 @@ LINKER_SYMBOL_PREFIXast_sip_subscription_get_serializer; LINKER_SYMBOL_PREFIXast_sip_subscription_get_evsub; LINKER_SYMBOL_PREFIXast_sip_subscription_get_dlg; + LINKER_SYMBOL_PREFIXast_sip_subscription_accept; LINKER_SYMBOL_PREFIXast_sip_subscription_send_request; LINKER_SYMBOL_PREFIXast_sip_subscription_alloc_datastore; LINKER_SYMBOL_PREFIXast_sip_subscription_add_datastore; |