summaryrefslogtreecommitdiff
path: root/res/res_pjsip_pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r--res/res_pjsip_pubsub.c476
1 files changed, 444 insertions, 32 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index 93ed744cf..88e284faf 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -72,6 +72,44 @@
</para>
</description>
</manager>
+ <configInfo name="res_pjsip_pubsub" language="en_US">
+ <synopsis>Module that implements publish and subscribe support.</synopsis>
+ <configFile name="pjsip.conf">
+ <configObject name="subscription_persistence">
+ <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
+ <configOption name="packet">
+ <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
+ </configOption>
+ <configOption name="src_name">
+ <synopsis>The source address of the subscription</synopsis>
+ </configOption>
+ <configOption name="src_port">
+ <synopsis>The source port of the subscription</synopsis>
+ </configOption>
+ <configOption name="transport_key">
+ <synopsis>The type of transport the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="local_name">
+ <synopsis>The local address the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="local_port">
+ <synopsis>The local port the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="cseq">
+ <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
+ </configOption>
+ <configOption name="tag">
+ <synopsis>The local tag of the dialog for the subscription</synopsis>
+ </configOption>
+ <configOption name="endpoint">
+ <synopsis>The name of the endpoint that subscribed</synopsis>
+ </configOption>
+ <configOption name="expires">
+ <synopsis>The time at which the subscription expires</synopsis>
+ </configOption>
+ </configObject>
+ </configFile>
+ </configInfo>
***/
static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
@@ -83,6 +121,7 @@ static struct pjsip_module pubsub_module = {
};
#define MOD_DATA_BODY_GENERATOR "sub_body_generator"
+#define MOD_DATA_PERSISTENCE "sub_persistence"
static const pj_str_t str_event_name = { "Event", 5 };
@@ -180,6 +219,35 @@ struct ast_sip_publication {
int sched_id;
};
+
+/*!
+ * \brief Structure used for persisting an inbound subscription
+ */
+struct subscription_persistence {
+ /*! Sorcery object details */
+ SORCERY_OBJECT(details);
+ /*! The name of the endpoint involved in the subscrption */
+ char *endpoint;
+ /*! SIP message that creates the subscription */
+ char packet[PJSIP_MAX_PKT_LEN];
+ /*! Source address of the message */
+ char src_name[PJ_INET6_ADDRSTRLEN];
+ /*! Source port of the message */
+ int src_port;
+ /*! Local transport key type */
+ char transport_key[32];
+ /*! Local transport address */
+ char local_name[PJ_INET6_ADDRSTRLEN];
+ /*! Local transport port */
+ int local_port;
+ /*! Next CSeq to use for message */
+ unsigned int cseq;
+ /*! Local tag of the dialog */
+ char *tag;
+ /*! When this subscription expires */
+ struct timeval expires;
+};
+
/*!
* \brief Structure representing a SIP subscription
*/
@@ -200,6 +268,8 @@ struct ast_sip_subscription {
pjsip_dialog *dlg;
/*! Body generaator for NOTIFYs */
struct ast_sip_pubsub_body_generator *body_generator;
+ /*! Persistence information */
+ struct subscription_persistence *persistence;
/*! Next item in the list */
AST_LIST_ENTRY(ast_sip_subscription) next;
};
@@ -214,6 +284,265 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
+/*! \brief Destructor for subscription persistence */
+static void subscription_persistence_destroy(void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ ast_free(persistence->endpoint);
+ ast_free(persistence->tag);
+}
+
+/*! \brief Allocator for subscription persistence */
+static void *subscription_persistence_alloc(const char *name)
+{
+ return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
+}
+
+/*! \brief Function which creates initial persistence information of a subscription in sorcery */
+static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
+{
+ char tag[PJ_GUID_STRING_LENGTH + 1];
+
+ /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
+ * look it up by id at all.
+ */
+ struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
+ "subscription_persistence", NULL);
+
+ if (!persistence) {
+ return NULL;
+ }
+
+ persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
+ ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
+ persistence->tag = ast_strdup(tag);
+
+ ast_sorcery_create(ast_sip_get_sorcery(), persistence);
+ return persistence;
+}
+
+/*! \brief Function which updates persistence information of a subscription in sorcery */
+static void subscription_persistence_update(struct ast_sip_subscription *sub,
+ pjsip_rx_data *rdata)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ sub->persistence->cseq = sub->dlg->local.cseq;
+
+ if (rdata) {
+ int expires;
+ pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+ expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+ sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
+
+ ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
+ ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
+ sub->persistence->src_port = rdata->pkt_info.src_port;
+ ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
+ sizeof(sub->persistence->transport_key));
+ ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
+ sizeof(sub->persistence->local_name));
+ sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
+ }
+
+ ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
+}
+
+/*! \brief Function which removes persistence of a subscription from sorcery */
+static void subscription_persistence_remove(struct ast_sip_subscription *sub)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
+ ao2_ref(sub->persistence, -1);
+}
+
+
+static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
+static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
+ size_t num_accept);
+
+/*! \brief Retrieve a handler using the Event header of an rdata message */
+static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
+{
+ pjsip_event_hdr *event_header;
+ char event[32];
+ struct ast_sip_subscription_handler *handler;
+
+ event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
+ if (!event_header) {
+ ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
+ return NULL;
+ }
+ ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+
+ handler = find_sub_handler_for_event_name(event);
+ if (!handler) {
+ ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
+ }
+
+ return handler;
+}
+
+/*! \brief Retrieve a body generator using the Accept header of an rdata message */
+static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
+ const struct ast_sip_subscription_handler *handler)
+{
+ pjsip_accept_hdr *accept_header;
+ char accept[AST_SIP_MAX_ACCEPT][64];
+ size_t num_accept_headers;
+
+ accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
+ if (accept_header) {
+ int i;
+
+ for (i = 0; i < accept_header->count; ++i) {
+ ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ }
+ num_accept_headers = accept_header->count;
+ } else {
+ /* If a SUBSCRIBE contains no Accept headers, then we must assume that
+ * the default accept type for the event package is to be used.
+ */
+ ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
+ num_accept_headers = 1;
+ }
+
+ return find_body_generator(accept, num_accept_headers);
+}
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+ struct subscription_persistence *persistence = obj;
+ pj_pool_t *pool = arg;
+ pjsip_rx_data rdata = { { 0, }, };
+ pjsip_expires_hdr *expires_header;
+ struct ast_sip_subscription_handler *handler;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ struct ast_sip_subscription *sub;
+ struct ast_sip_pubsub_body_generator *generator;
+
+ /* If this subscription has already expired remove it */
+ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+ if (!endpoint) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ pj_pool_reset(pool);
+ rdata.tp_info.pool = pool;
+
+ if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+ persistence->transport_key, persistence->local_name, persistence->local_port)) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ /* Update the expiration header with the new expiration */
+ expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+ if (!expires_header) {
+ expires_header = pjsip_expires_hdr_create(pool, 0);
+ if (!expires_header) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+ pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+ }
+ expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+ handler = subscription_get_handler_from_rdata(&rdata);
+ if (!handler) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ generator = subscription_get_generator_from_rdata(&rdata, handler);
+ if (!generator) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+ sub = handler->new_subscribe(endpoint, &rdata);
+ if (sub) {
+ sub->persistence = ao2_bump(persistence);
+ subscription_persistence_update(sub, &rdata);
+ } else {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ }
+
+ return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+ struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+ "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ pj_pool_t *pool;
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+ PJSIP_POOL_RDATA_INC);
+ if (!pool) {
+ ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+ return 0;
+ }
+
+ ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+ ao2_ref(persisted_subscriptions, -1);
+ return 0;
+}
+
+/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
+static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct ast_json_payload *payload;
+ const char *type;
+
+ if (stasis_message_type(message) != ast_manager_get_generic_type()) {
+ return;
+ }
+
+ payload = stasis_message_data(message);
+ type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+
+ /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
+ * recreate SIP subscriptions.
+ */
+ if (strcmp(type, "FullyBooted")) {
+ return;
+ }
+
+ /* This has to be here so the subscription is recreated when the body generator is available */
+ ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+
+ /* Once the system is fully booted we don't care anymore */
+ stasis_unsubscribe(sub);
+}
+
static void add_subscription(struct ast_sip_subscription *obj)
{
SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@@ -335,6 +664,9 @@ static void subscription_destructor(void *obj)
struct ast_sip_subscription *sub = obj;
ast_debug(3, "Destroying SIP subscription\n");
+
+ subscription_persistence_remove(sub);
+
remove_subscription(sub);
ao2_cleanup(sub->datastores);
@@ -388,6 +720,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
{
struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
pjsip_dialog *dlg;
+ struct subscription_persistence *persistence;
if (!sub) {
return NULL;
@@ -424,6 +757,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
ao2_ref(sub, -1);
return NULL;
}
+ persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE);
+ if (persistence) {
+ /* Update the created dialog with the persisted information */
+ pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
+ pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
+ dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
+ pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
+ dlg->local.cseq = persistence->cseq;
+ dlg->remote.cseq = persistence->cseq;
+ }
sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
/* We keep a reference to the dialog until our subscription is destroyed. See
* the subscription_destructor for more details
@@ -463,6 +807,16 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
return sub->dlg;
}
+int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
+{
+ /* If this is a persistence recreation the subscription has already been accepted */
+ if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
+ return 0;
+ }
+
+ return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
+}
+
int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
{
struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
@@ -472,6 +826,8 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx
res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
tdata) == PJ_SUCCESS ? 0 : -1;
+ subscription_persistence_update(sub, NULL);
+
ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
"StateText: %s\r\n"
"Endpoint: %s\r\n",
@@ -688,6 +1044,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h
pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
sub_add_handler(handler);
+
return 0;
}
@@ -755,15 +1112,10 @@ static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST
static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
{
- char event[32];
- char accept[AST_SIP_MAX_ACCEPT][64];
- pjsip_accept_hdr *accept_header;
- pjsip_event_hdr *event_header;
pjsip_expires_hdr *expires_header;
struct ast_sip_subscription_handler *handler;
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
struct ast_sip_subscription *sub;
- size_t num_accept_headers;
struct ast_sip_pubsub_body_generator *generator;
endpoint = ast_pjsip_rdata_get_endpoint(rdata);
@@ -784,38 +1136,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
return PJ_TRUE;
}
- event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
- if (!event_header) {
- ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
- pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
- return PJ_TRUE;
- }
- ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
-
- handler = find_sub_handler_for_event_name(event);
+ handler = subscription_get_handler_from_rdata(rdata);
if (!handler) {
- ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
return PJ_TRUE;
}
- accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
- if (accept_header) {
- int i;
-
- for (i = 0; i < accept_header->count; ++i) {
- ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
- }
- num_accept_headers = accept_header->count;
- } else {
- /* If a SUBSCRIBE contains no Accept headers, then we must assume that
- * the default accept type for the event package is to be used.
- */
- ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
- num_accept_headers = 1;
- }
-
- generator = find_body_generator(accept, num_accept_headers);
+ generator = subscription_get_generator_from_rdata(rdata, handler);
if (!generator) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
return PJ_TRUE;
@@ -839,7 +1166,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
} else {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
}
+ } else {
+ sub->persistence = subscription_persistence_create(sub);
+ subscription_persistence_update(sub, rdata);
}
+
return PJ_TRUE;
}
@@ -1471,9 +1802,54 @@ static int ami_show_subscriptions_outbound(struct mansession *s, const struct me
#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
+static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->endpoint = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->endpoint);
+ return 0;
+}
+
+static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->tag = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->tag);
+ return 0;
+}
+
+static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+ return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
+}
+
+static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+ return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
+}
+
static int load_module(void)
{
static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
+ struct ast_sorcery *sorcery = ast_sip_get_sorcery();
pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
@@ -1496,6 +1872,42 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
+ ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
+ ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
+ if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
+ NULL, NULL)) {
+ ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
+ ast_sip_unregister_service(&pubsub_module);
+ ast_sched_context_destroy(sched);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, packet));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, src_name));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, src_port));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, transport_key));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, local_name));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, local_port));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, cseq));
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
+ persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
+ persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
+ persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
+
+ if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+ ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+ } else {
+ stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+ }
+
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
ami_show_subscriptions_inbound);
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,