diff options
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r-- | res/res_pjsip_pubsub.c | 476 |
1 files changed, 444 insertions, 32 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 93ed744cf..88e284faf 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -72,6 +72,44 @@ </para> </description> </manager> + <configInfo name="res_pjsip_pubsub" language="en_US"> + <synopsis>Module that implements publish and subscribe support.</synopsis> + <configFile name="pjsip.conf"> + <configObject name="subscription_persistence"> + <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis> + <configOption name="packet"> + <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis> + </configOption> + <configOption name="src_name"> + <synopsis>The source address of the subscription</synopsis> + </configOption> + <configOption name="src_port"> + <synopsis>The source port of the subscription</synopsis> + </configOption> + <configOption name="transport_key"> + <synopsis>The type of transport the subscription was received on</synopsis> + </configOption> + <configOption name="local_name"> + <synopsis>The local address the subscription was received on</synopsis> + </configOption> + <configOption name="local_port"> + <synopsis>The local port the subscription was received on</synopsis> + </configOption> + <configOption name="cseq"> + <synopsis>The sequence number of the next NOTIFY to be sent</synopsis> + </configOption> + <configOption name="tag"> + <synopsis>The local tag of the dialog for the subscription</synopsis> + </configOption> + <configOption name="endpoint"> + <synopsis>The name of the endpoint that subscribed</synopsis> + </configOption> + <configOption name="expires"> + <synopsis>The time at which the subscription expires</synopsis> + </configOption> + </configObject> + </configFile> + </configInfo> ***/ static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata); @@ -83,6 +121,7 @@ static struct pjsip_module pubsub_module = { }; #define MOD_DATA_BODY_GENERATOR "sub_body_generator" +#define MOD_DATA_PERSISTENCE "sub_persistence" static const pj_str_t str_event_name = { "Event", 5 }; @@ -180,6 +219,35 @@ struct ast_sip_publication { int sched_id; }; + +/*! + * \brief Structure used for persisting an inbound subscription + */ +struct subscription_persistence { + /*! Sorcery object details */ + SORCERY_OBJECT(details); + /*! The name of the endpoint involved in the subscrption */ + char *endpoint; + /*! SIP message that creates the subscription */ + char packet[PJSIP_MAX_PKT_LEN]; + /*! Source address of the message */ + char src_name[PJ_INET6_ADDRSTRLEN]; + /*! Source port of the message */ + int src_port; + /*! Local transport key type */ + char transport_key[32]; + /*! Local transport address */ + char local_name[PJ_INET6_ADDRSTRLEN]; + /*! Local transport port */ + int local_port; + /*! Next CSeq to use for message */ + unsigned int cseq; + /*! Local tag of the dialog */ + char *tag; + /*! When this subscription expires */ + struct timeval expires; +}; + /*! * \brief Structure representing a SIP subscription */ @@ -200,6 +268,8 @@ struct ast_sip_subscription { pjsip_dialog *dlg; /*! Body generaator for NOTIFYs */ struct ast_sip_pubsub_body_generator *body_generator; + /*! Persistence information */ + struct subscription_persistence *persistence; /*! Next item in the list */ AST_LIST_ENTRY(ast_sip_subscription) next; }; @@ -214,6 +284,265 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription); AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator); AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement); +/*! \brief Destructor for subscription persistence */ +static void subscription_persistence_destroy(void *obj) +{ + struct subscription_persistence *persistence = obj; + + ast_free(persistence->endpoint); + ast_free(persistence->tag); +} + +/*! \brief Allocator for subscription persistence */ +static void *subscription_persistence_alloc(const char *name) +{ + return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy); +} + +/*! \brief Function which creates initial persistence information of a subscription in sorcery */ +static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub) +{ + char tag[PJ_GUID_STRING_LENGTH + 1]; + + /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to + * look it up by id at all. + */ + struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(), + "subscription_persistence", NULL); + + if (!persistence) { + return NULL; + } + + persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint)); + ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag)); + persistence->tag = ast_strdup(tag); + + ast_sorcery_create(ast_sip_get_sorcery(), persistence); + return persistence; +} + +/*! \brief Function which updates persistence information of a subscription in sorcery */ +static void subscription_persistence_update(struct ast_sip_subscription *sub, + pjsip_rx_data *rdata) +{ + if (!sub->persistence) { + return; + } + + sub->persistence->cseq = sub->dlg->local.cseq; + + if (rdata) { + int expires; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1)); + + ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet)); + ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name)); + sub->persistence->src_port = rdata->pkt_info.src_port; + ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name, + sizeof(sub->persistence->transport_key)); + ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host, + sizeof(sub->persistence->local_name)); + sub->persistence->local_port = rdata->tp_info.transport->local_name.port; + } + + ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence); +} + +/*! \brief Function which removes persistence of a subscription from sorcery */ +static void subscription_persistence_remove(struct ast_sip_subscription *sub) +{ + if (!sub->persistence) { + return; + } + + ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence); + ao2_ref(sub->persistence, -1); +} + + +static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name); +static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64], + size_t num_accept); + +/*! \brief Retrieve a handler using the Event header of an rdata message */ +static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata) +{ + pjsip_event_hdr *event_header; + char event[32]; + struct ast_sip_subscription_handler *handler; + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); + return NULL; + } + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + + handler = find_sub_handler_for_event_name(event); + if (!handler) { + ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); + } + + return handler; +} + +/*! \brief Retrieve a body generator using the Accept header of an rdata message */ +static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata, + const struct ast_sip_subscription_handler *handler) +{ + pjsip_accept_hdr *accept_header; + char accept[AST_SIP_MAX_ACCEPT][64]; + size_t num_accept_headers; + + accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); + if (accept_header) { + int i; + + for (i = 0; i < accept_header->count; ++i) { + ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); + } + num_accept_headers = accept_header->count; + } else { + /* If a SUBSCRIBE contains no Accept headers, then we must assume that + * the default accept type for the event package is to be used. + */ + ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0])); + num_accept_headers = 1; + } + + return find_body_generator(accept, num_accept_headers); +} + +/*! \brief Callback function to perform the actual recreation of a subscription */ +static int subscription_persistence_recreate(void *obj, void *arg, int flags) +{ + struct subscription_persistence *persistence = obj; + pj_pool_t *pool = arg; + pjsip_rx_data rdata = { { 0, }, }; + pjsip_expires_hdr *expires_header; + struct ast_sip_subscription_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct ast_sip_subscription *sub; + struct ast_sip_pubsub_body_generator *generator; + + /* If this subscription has already expired remove it */ + if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint); + if (!endpoint) { + ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + pj_pool_reset(pool); + rdata.tp_info.pool = pool; + + if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, + persistence->transport_key, persistence->local_name, persistence->local_port)) { + ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + /* Update the expiration header with the new expiration */ + expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); + if (!expires_header) { + expires_header = pjsip_expires_hdr_create(pool, 0); + if (!expires_header) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header); + } + expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); + + handler = subscription_get_handler_from_rdata(&rdata); + if (!handler) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + generator = subscription_get_generator_from_rdata(&rdata, handler); + if (!generator) { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } + + ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, + pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator); + ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); + + sub = handler->new_subscribe(endpoint, &rdata); + if (sub) { + sub->persistence = ao2_bump(persistence); + subscription_persistence_update(sub, &rdata); + } else { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + + return 0; +} + +/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */ +static int subscription_persistence_load(void *data) +{ + struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + pj_pool_t *pool; + + pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC); + if (!pool) { + ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n"); + return 0; + } + + ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool); + + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + + ao2_ref(persisted_subscriptions, -1); + return 0; +} + +/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */ +static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct ast_json_payload *payload; + const char *type; + + if (stasis_message_type(message) != ast_manager_get_generic_type()) { + return; + } + + payload = stasis_message_data(message); + type = ast_json_string_get(ast_json_object_get(payload->json, "type")); + + /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we + * recreate SIP subscriptions. + */ + if (strcmp(type, "FullyBooted")) { + return; + } + + /* This has to be here so the subscription is recreated when the body generator is available */ + ast_sip_push_task(NULL, subscription_persistence_load, NULL); + + /* Once the system is fully booted we don't care anymore */ + stasis_unsubscribe(sub); +} + static void add_subscription(struct ast_sip_subscription *obj) { SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); @@ -335,6 +664,9 @@ static void subscription_destructor(void *obj) struct ast_sip_subscription *sub = obj; ast_debug(3, "Destroying SIP subscription\n"); + + subscription_persistence_remove(sub); + remove_subscription(sub); ao2_cleanup(sub->datastores); @@ -388,6 +720,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su { struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor); pjsip_dialog *dlg; + struct subscription_persistence *persistence; if (!sub) { return NULL; @@ -424,6 +757,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su ao2_ref(sub, -1); return NULL; } + persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE); + if (persistence) { + /* Update the created dialog with the persisted information */ + pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg); + pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag); + dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag); + pjsip_ua_register_dlg(pjsip_ua_instance(), dlg); + dlg->local.cseq = persistence->cseq; + dlg->remote.cseq = persistence->cseq; + } sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg); /* We keep a reference to the dialog until our subscription is destroyed. See * the subscription_destructor for more details @@ -463,6 +807,16 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub) return sub->dlg; } +int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response) +{ + /* If this is a persistence recreation the subscription has already been accepted */ + if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { + return 0; + } + + return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1; +} + int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) { struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub); @@ -472,6 +826,8 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub), tdata) == PJ_SUCCESS ? 0 : -1; + subscription_persistence_update(sub, NULL); + ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", "StateText: %s\r\n" "Endpoint: %s\r\n", @@ -688,6 +1044,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); sub_add_handler(handler); + return 0; } @@ -755,15 +1112,10 @@ static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) { - char event[32]; - char accept[AST_SIP_MAX_ACCEPT][64]; - pjsip_accept_hdr *accept_header; - pjsip_event_hdr *event_header; pjsip_expires_hdr *expires_header; struct ast_sip_subscription_handler *handler; RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); struct ast_sip_subscription *sub; - size_t num_accept_headers; struct ast_sip_pubsub_body_generator *generator; endpoint = ast_pjsip_rdata_get_endpoint(rdata); @@ -784,38 +1136,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) return PJ_TRUE; } - event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); - if (!event_header) { - ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; - } - ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); - - handler = find_sub_handler_for_event_name(event); + handler = subscription_get_handler_from_rdata(rdata); if (!handler) { - ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); return PJ_TRUE; } - accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); - if (accept_header) { - int i; - - for (i = 0; i < accept_header->count; ++i) { - ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); - } - num_accept_headers = accept_header->count; - } else { - /* If a SUBSCRIBE contains no Accept headers, then we must assume that - * the default accept type for the event package is to be used. - */ - ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0])); - num_accept_headers = 1; - } - - generator = find_body_generator(accept, num_accept_headers); + generator = subscription_get_generator_from_rdata(rdata, handler); if (!generator) { pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); return PJ_TRUE; @@ -839,7 +1166,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) } else { pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); } + } else { + sub->persistence = subscription_persistence_create(sub); + subscription_persistence_update(sub, rdata); } + return PJ_TRUE; } @@ -1471,9 +1802,54 @@ static int ami_show_subscriptions_outbound(struct mansession *s, const struct me #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound" #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound" +static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->endpoint = ast_strdup(var->value); + return 0; +} + +static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->endpoint); + return 0; +} + +static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->tag = ast_strdup(var->value); + return 0; +} + +static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->tag); + return 0; +} + +static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL); +} + +static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0; +} + static int load_module(void) { static const pj_str_t str_PUBLISH = { "PUBLISH", 7 }; + struct ast_sorcery *sorcery = ast_sip_get_sorcery(); pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint()); @@ -1496,6 +1872,42 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } + ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub"); + ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence"); + if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc, + NULL, NULL)) { + ast_log(LOG_ERROR, "Could not register subscription persistence object support\n"); + ast_sip_unregister_service(&pubsub_module); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; + } + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, packet)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, src_name)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, src_port)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, transport_key)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, local_name)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, local_port)); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0, + FLDSET(struct subscription_persistence, cseq)); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "", + persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "", + persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0); + ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "", + persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0); + + if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + ast_sip_push_task(NULL, subscription_persistence_load, NULL); + } else { + stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + } + ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, ami_show_subscriptions_inbound); ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM, |