summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py36
-rw-r--r--include/asterisk/res_pjsip.h17
-rw-r--r--include/asterisk/res_pjsip_pubsub.h12
-rw-r--r--res/res_pjsip.c30
-rw-r--r--res/res_pjsip_exten_state.c3
-rw-r--r--res/res_pjsip_mwi.c4
-rw-r--r--res/res_pjsip_pubsub.c476
-rw-r--r--res/res_pjsip_pubsub.exports.in1
8 files changed, 542 insertions, 37 deletions
diff --git a/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py b/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py
new file mode 100644
index 000000000..aa5c8c435
--- /dev/null
+++ b/contrib/ast-db-manage/config/versions/c6d929b23a8_create_pjsip_subscription_persistence_.py
@@ -0,0 +1,36 @@
+"""create pjsip subscription persistence table
+
+Revision ID: c6d929b23a8
+Revises: e96a0b8071c
+Create Date: 2014-06-06 02:17:38.660447
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'c6d929b23a8'
+down_revision = 'e96a0b8071c'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.create_table(
+ 'ps_subscription_persistence',
+ sa.Column('id', sa.String(40), nullable=False, unique=True),
+ sa.Column('packet', sa.String(2048)),
+ sa.Column('src_name', sa.String(128)),
+ sa.Column('src_port', sa.Integer),
+ sa.Column('transport_key', sa.String(64)),
+ sa.Column('local_name', sa.String(128)),
+ sa.Column('local_port', sa.Integer),
+ sa.Column('cseq', sa.Integer),
+ sa.Column('tag', sa.String(128)),
+ sa.Column('endpoint', sa.String(40)),
+ sa.Column('expires', sa.Integer),
+ )
+
+ op.create_index('ps_subscription_persistence_id', 'ps_subscription_persistence', ['id'])
+
+def downgrade():
+ op.drop_table('ps_subscription_persistence')
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 36d25fbec..c7e99aded 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -1221,6 +1221,23 @@ pjsip_dialog *ast_sip_create_dialog_uac(const struct ast_sip_endpoint *endpoint,
pjsip_dialog *ast_sip_create_dialog_uas(const struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata);
/*!
+ * \brief General purpose method for creating an rdata structure using specific information
+ *
+ * \param rdata[out] The rdata structure that will be populated
+ * \param packet A SIP message
+ * \param src_name The source IP address of the message
+ * \param src_port The source port of the message
+ * \param transport_type The type of transport the message was received on
+ * \param local_name The local IP address the message was received on
+ * \param local_port The local port the message was received on
+ *
+ * \retval 0 success
+ * \retval -1 failure
+ */
+int ast_sip_create_rdata(pjsip_rx_data *rdata, char *packet, const char *src_name, int src_port, char *transport_type,
+ const char *local_name, int local_port);
+
+/*!
* \brief General purpose method for creating a SIP request
*
* Its typical use would be to create one-off requests such as an out of dialog
diff --git a/include/asterisk/res_pjsip_pubsub.h b/include/asterisk/res_pjsip_pubsub.h
index 0b0a49e66..d46cbae30 100644
--- a/include/asterisk/res_pjsip_pubsub.h
+++ b/include/asterisk/res_pjsip_pubsub.h
@@ -456,6 +456,18 @@ pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub);
pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub);
/*!
+ * \brief Accept a subscription request
+ *
+ * \param sub The subscription to be accepted
+ * \param rdata The received subscription request
+ * \param response The response code to send
+ *
+ * \retval 0 Success
+ * \retval non-zero Failure
+ */
+int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response);
+
+/*!
* \brief Send a request created via a PJSIP evsub method
*
* Callers of this function should take care to do so within a SIP servant
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index 5a3fe2fe8..339cf1dfe 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -1613,6 +1613,36 @@ pjsip_dialog *ast_sip_create_dialog_uas(const struct ast_sip_endpoint *endpoint,
return dlg;
}
+int ast_sip_create_rdata(pjsip_rx_data *rdata, char *packet, const char *src_name, int src_port,
+ char *transport_type, const char *local_name, int local_port)
+{
+ pj_str_t tmp;
+
+ rdata->tp_info.transport = PJ_POOL_ZALLOC_T(rdata->tp_info.pool, pjsip_transport);
+ if (!rdata->tp_info.transport) {
+ return -1;
+ }
+
+ ast_copy_string(rdata->pkt_info.packet, packet, sizeof(rdata->pkt_info.packet));
+ ast_copy_string(rdata->pkt_info.src_name, src_name, sizeof(rdata->pkt_info.src_name));
+ rdata->pkt_info.src_port = src_port;
+
+ pjsip_parse_rdata(packet, strlen(packet), rdata);
+ if (!rdata->msg_info.msg) {
+ return -1;
+ }
+
+ pj_strdup2(rdata->tp_info.pool, &rdata->msg_info.via->recvd_param, rdata->pkt_info.src_name);
+ rdata->msg_info.via->rport_param = -1;
+
+ rdata->tp_info.transport->key.type = pjsip_transport_get_type_from_name(pj_cstr(&tmp, transport_type));
+ rdata->tp_info.transport->type_name = transport_type;
+ pj_strdup2(rdata->tp_info.pool, &rdata->tp_info.transport->local_name.host, local_name);
+ rdata->tp_info.transport->local_name.port = local_port;
+
+ return 0;
+}
+
/* PJSIP doesn't know about the INFO method, so we have to define it ourselves */
static const pjsip_method info_method = {PJSIP_OTHER_METHOD, {"INFO", 4} };
static const pjsip_method message_method = {PJSIP_OTHER_METHOD, {"MESSAGE", 7} };
diff --git a/res/res_pjsip_exten_state.c b/res/res_pjsip_exten_state.c
index 7a9067939..f4bfef772 100644
--- a/res/res_pjsip_exten_state.c
+++ b/res/res_pjsip_exten_state.c
@@ -445,8 +445,7 @@ static struct ast_sip_subscription *new_subscribe(struct ast_sip_endpoint *endpo
return NULL;
}
- if (pjsip_evsub_accept(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub),
- rdata, 200, NULL) != PJ_SUCCESS) {
+ if (ast_sip_subscription_accept(exten_state_sub->sip_sub, rdata, 200)) {
ast_log(LOG_WARNING, "Unable to accept the incoming extension state subscription.\n");
pjsip_evsub_terminate(ast_sip_subscription_get_evsub(exten_state_sub->sip_sub), PJ_FALSE);
return NULL;
diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c
index 6e45ded33..f25a7c48f 100644
--- a/res/res_pjsip_mwi.c
+++ b/res/res_pjsip_mwi.c
@@ -699,7 +699,6 @@ static struct ast_sip_subscription *mwi_new_subscribe(struct ast_sip_endpoint *e
RAII_VAR(struct mwi_subscription *, sub, NULL, ao2_cleanup);
pjsip_uri *ruri = rdata->msg_info.msg->line.req.uri;
pjsip_sip_uri *sip_ruri;
- pjsip_evsub *evsub;
char aor_name[80];
if (!PJSIP_URI_SCHEME_IS_SIP(ruri) && !PJSIP_URI_SCHEME_IS_SIPS(ruri)) {
@@ -715,8 +714,7 @@ static struct ast_sip_subscription *mwi_new_subscribe(struct ast_sip_endpoint *e
return NULL;
}
- evsub = ast_sip_subscription_get_evsub(sub->sip_sub);
- pjsip_evsub_accept(evsub, rdata, 200, NULL);
+ ast_sip_subscription_accept(sub->sip_sub, rdata, 200);
send_mwi_notify(sub, PJSIP_EVSUB_STATE_ACTIVE, NULL);
return sub->sip_sub;
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
index 93ed744cf..88e284faf 100644
--- a/res/res_pjsip_pubsub.c
+++ b/res/res_pjsip_pubsub.c
@@ -72,6 +72,44 @@
</para>
</description>
</manager>
+ <configInfo name="res_pjsip_pubsub" language="en_US">
+ <synopsis>Module that implements publish and subscribe support.</synopsis>
+ <configFile name="pjsip.conf">
+ <configObject name="subscription_persistence">
+ <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
+ <configOption name="packet">
+ <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
+ </configOption>
+ <configOption name="src_name">
+ <synopsis>The source address of the subscription</synopsis>
+ </configOption>
+ <configOption name="src_port">
+ <synopsis>The source port of the subscription</synopsis>
+ </configOption>
+ <configOption name="transport_key">
+ <synopsis>The type of transport the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="local_name">
+ <synopsis>The local address the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="local_port">
+ <synopsis>The local port the subscription was received on</synopsis>
+ </configOption>
+ <configOption name="cseq">
+ <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
+ </configOption>
+ <configOption name="tag">
+ <synopsis>The local tag of the dialog for the subscription</synopsis>
+ </configOption>
+ <configOption name="endpoint">
+ <synopsis>The name of the endpoint that subscribed</synopsis>
+ </configOption>
+ <configOption name="expires">
+ <synopsis>The time at which the subscription expires</synopsis>
+ </configOption>
+ </configObject>
+ </configFile>
+ </configInfo>
***/
static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
@@ -83,6 +121,7 @@ static struct pjsip_module pubsub_module = {
};
#define MOD_DATA_BODY_GENERATOR "sub_body_generator"
+#define MOD_DATA_PERSISTENCE "sub_persistence"
static const pj_str_t str_event_name = { "Event", 5 };
@@ -180,6 +219,35 @@ struct ast_sip_publication {
int sched_id;
};
+
+/*!
+ * \brief Structure used for persisting an inbound subscription
+ */
+struct subscription_persistence {
+ /*! Sorcery object details */
+ SORCERY_OBJECT(details);
+ /*! The name of the endpoint involved in the subscrption */
+ char *endpoint;
+ /*! SIP message that creates the subscription */
+ char packet[PJSIP_MAX_PKT_LEN];
+ /*! Source address of the message */
+ char src_name[PJ_INET6_ADDRSTRLEN];
+ /*! Source port of the message */
+ int src_port;
+ /*! Local transport key type */
+ char transport_key[32];
+ /*! Local transport address */
+ char local_name[PJ_INET6_ADDRSTRLEN];
+ /*! Local transport port */
+ int local_port;
+ /*! Next CSeq to use for message */
+ unsigned int cseq;
+ /*! Local tag of the dialog */
+ char *tag;
+ /*! When this subscription expires */
+ struct timeval expires;
+};
+
/*!
* \brief Structure representing a SIP subscription
*/
@@ -200,6 +268,8 @@ struct ast_sip_subscription {
pjsip_dialog *dlg;
/*! Body generaator for NOTIFYs */
struct ast_sip_pubsub_body_generator *body_generator;
+ /*! Persistence information */
+ struct subscription_persistence *persistence;
/*! Next item in the list */
AST_LIST_ENTRY(ast_sip_subscription) next;
};
@@ -214,6 +284,265 @@ AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
+/*! \brief Destructor for subscription persistence */
+static void subscription_persistence_destroy(void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ ast_free(persistence->endpoint);
+ ast_free(persistence->tag);
+}
+
+/*! \brief Allocator for subscription persistence */
+static void *subscription_persistence_alloc(const char *name)
+{
+ return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
+}
+
+/*! \brief Function which creates initial persistence information of a subscription in sorcery */
+static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
+{
+ char tag[PJ_GUID_STRING_LENGTH + 1];
+
+ /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
+ * look it up by id at all.
+ */
+ struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
+ "subscription_persistence", NULL);
+
+ if (!persistence) {
+ return NULL;
+ }
+
+ persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
+ ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
+ persistence->tag = ast_strdup(tag);
+
+ ast_sorcery_create(ast_sip_get_sorcery(), persistence);
+ return persistence;
+}
+
+/*! \brief Function which updates persistence information of a subscription in sorcery */
+static void subscription_persistence_update(struct ast_sip_subscription *sub,
+ pjsip_rx_data *rdata)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ sub->persistence->cseq = sub->dlg->local.cseq;
+
+ if (rdata) {
+ int expires;
+ pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+ expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+ sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
+
+ ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
+ ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
+ sub->persistence->src_port = rdata->pkt_info.src_port;
+ ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
+ sizeof(sub->persistence->transport_key));
+ ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
+ sizeof(sub->persistence->local_name));
+ sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
+ }
+
+ ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
+}
+
+/*! \brief Function which removes persistence of a subscription from sorcery */
+static void subscription_persistence_remove(struct ast_sip_subscription *sub)
+{
+ if (!sub->persistence) {
+ return;
+ }
+
+ ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
+ ao2_ref(sub->persistence, -1);
+}
+
+
+static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
+static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
+ size_t num_accept);
+
+/*! \brief Retrieve a handler using the Event header of an rdata message */
+static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
+{
+ pjsip_event_hdr *event_header;
+ char event[32];
+ struct ast_sip_subscription_handler *handler;
+
+ event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
+ if (!event_header) {
+ ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
+ return NULL;
+ }
+ ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+
+ handler = find_sub_handler_for_event_name(event);
+ if (!handler) {
+ ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
+ }
+
+ return handler;
+}
+
+/*! \brief Retrieve a body generator using the Accept header of an rdata message */
+static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
+ const struct ast_sip_subscription_handler *handler)
+{
+ pjsip_accept_hdr *accept_header;
+ char accept[AST_SIP_MAX_ACCEPT][64];
+ size_t num_accept_headers;
+
+ accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
+ if (accept_header) {
+ int i;
+
+ for (i = 0; i < accept_header->count; ++i) {
+ ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ }
+ num_accept_headers = accept_header->count;
+ } else {
+ /* If a SUBSCRIBE contains no Accept headers, then we must assume that
+ * the default accept type for the event package is to be used.
+ */
+ ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
+ num_accept_headers = 1;
+ }
+
+ return find_body_generator(accept, num_accept_headers);
+}
+
+/*! \brief Callback function to perform the actual recreation of a subscription */
+static int subscription_persistence_recreate(void *obj, void *arg, int flags)
+{
+ struct subscription_persistence *persistence = obj;
+ pj_pool_t *pool = arg;
+ pjsip_rx_data rdata = { { 0, }, };
+ pjsip_expires_hdr *expires_header;
+ struct ast_sip_subscription_handler *handler;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ struct ast_sip_subscription *sub;
+ struct ast_sip_pubsub_body_generator *generator;
+
+ /* If this subscription has already expired remove it */
+ if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
+ if (!endpoint) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ pj_pool_reset(pool);
+ rdata.tp_info.pool = pool;
+
+ if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
+ persistence->transport_key, persistence->local_name, persistence->local_port)) {
+ ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
+ persistence->endpoint);
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ /* Update the expiration header with the new expiration */
+ expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
+ if (!expires_header) {
+ expires_header = pjsip_expires_hdr_create(pool, 0);
+ if (!expires_header) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+ pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
+ }
+ expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
+
+ handler = subscription_get_handler_from_rdata(&rdata);
+ if (!handler) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ generator = subscription_get_generator_from_rdata(&rdata, handler);
+ if (!generator) {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ return 0;
+ }
+
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
+ ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
+
+ sub = handler->new_subscribe(endpoint, &rdata);
+ if (sub) {
+ sub->persistence = ao2_bump(persistence);
+ subscription_persistence_update(sub, &rdata);
+ } else {
+ ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
+ }
+
+ return 0;
+}
+
+/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
+static int subscription_persistence_load(void *data)
+{
+ struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
+ "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+ pj_pool_t *pool;
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
+ PJSIP_POOL_RDATA_INC);
+ if (!pool) {
+ ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
+ return 0;
+ }
+
+ ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+
+ ao2_ref(persisted_subscriptions, -1);
+ return 0;
+}
+
+/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
+static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
+{
+ struct ast_json_payload *payload;
+ const char *type;
+
+ if (stasis_message_type(message) != ast_manager_get_generic_type()) {
+ return;
+ }
+
+ payload = stasis_message_data(message);
+ type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
+
+ /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
+ * recreate SIP subscriptions.
+ */
+ if (strcmp(type, "FullyBooted")) {
+ return;
+ }
+
+ /* This has to be here so the subscription is recreated when the body generator is available */
+ ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+
+ /* Once the system is fully booted we don't care anymore */
+ stasis_unsubscribe(sub);
+}
+
static void add_subscription(struct ast_sip_subscription *obj)
{
SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@@ -335,6 +664,9 @@ static void subscription_destructor(void *obj)
struct ast_sip_subscription *sub = obj;
ast_debug(3, "Destroying SIP subscription\n");
+
+ subscription_persistence_remove(sub);
+
remove_subscription(sub);
ao2_cleanup(sub->datastores);
@@ -388,6 +720,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
{
struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
pjsip_dialog *dlg;
+ struct subscription_persistence *persistence;
if (!sub) {
return NULL;
@@ -424,6 +757,17 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su
ao2_ref(sub, -1);
return NULL;
}
+ persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
+ pubsub_module.id, MOD_DATA_PERSISTENCE);
+ if (persistence) {
+ /* Update the created dialog with the persisted information */
+ pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
+ pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
+ dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
+ pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
+ dlg->local.cseq = persistence->cseq;
+ dlg->remote.cseq = persistence->cseq;
+ }
sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
/* We keep a reference to the dialog until our subscription is destroyed. See
* the subscription_destructor for more details
@@ -463,6 +807,16 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
return sub->dlg;
}
+int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
+{
+ /* If this is a persistence recreation the subscription has already been accepted */
+ if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
+ return 0;
+ }
+
+ return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
+}
+
int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
{
struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
@@ -472,6 +826,8 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx
res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
tdata) == PJ_SUCCESS ? 0 : -1;
+ subscription_persistence_update(sub, NULL);
+
ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
"StateText: %s\r\n"
"Endpoint: %s\r\n",
@@ -688,6 +1044,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h
pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
sub_add_handler(handler);
+
return 0;
}
@@ -755,15 +1112,10 @@ static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST
static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
{
- char event[32];
- char accept[AST_SIP_MAX_ACCEPT][64];
- pjsip_accept_hdr *accept_header;
- pjsip_event_hdr *event_header;
pjsip_expires_hdr *expires_header;
struct ast_sip_subscription_handler *handler;
RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
struct ast_sip_subscription *sub;
- size_t num_accept_headers;
struct ast_sip_pubsub_body_generator *generator;
endpoint = ast_pjsip_rdata_get_endpoint(rdata);
@@ -784,38 +1136,13 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
return PJ_TRUE;
}
- event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
- if (!event_header) {
- ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
- pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
- return PJ_TRUE;
- }
- ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
-
- handler = find_sub_handler_for_event_name(event);
+ handler = subscription_get_handler_from_rdata(rdata);
if (!handler) {
- ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
return PJ_TRUE;
}
- accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
- if (accept_header) {
- int i;
-
- for (i = 0; i < accept_header->count; ++i) {
- ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
- }
- num_accept_headers = accept_header->count;
- } else {
- /* If a SUBSCRIBE contains no Accept headers, then we must assume that
- * the default accept type for the event package is to be used.
- */
- ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
- num_accept_headers = 1;
- }
-
- generator = find_body_generator(accept, num_accept_headers);
+ generator = subscription_get_generator_from_rdata(rdata, handler);
if (!generator) {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
return PJ_TRUE;
@@ -839,7 +1166,11 @@ static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
} else {
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
}
+ } else {
+ sub->persistence = subscription_persistence_create(sub);
+ subscription_persistence_update(sub, rdata);
}
+
return PJ_TRUE;
}
@@ -1471,9 +1802,54 @@ static int ami_show_subscriptions_outbound(struct mansession *s, const struct me
#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
+static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->endpoint = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->endpoint);
+ return 0;
+}
+
+static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+
+ persistence->tag = ast_strdup(var->value);
+ return 0;
+}
+
+static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+
+ *buf = ast_strdup(persistence->tag);
+ return 0;
+}
+
+static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
+{
+ struct subscription_persistence *persistence = obj;
+ return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
+}
+
+static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
+{
+ const struct subscription_persistence *persistence = obj;
+ return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
+}
+
static int load_module(void)
{
static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
+ struct ast_sorcery *sorcery = ast_sip_get_sorcery();
pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
@@ -1496,6 +1872,42 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE;
}
+ ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
+ ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
+ if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
+ NULL, NULL)) {
+ ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
+ ast_sip_unregister_service(&pubsub_module);
+ ast_sched_context_destroy(sched);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, packet));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, src_name));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, src_port));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, transport_key));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
+ CHARFLDSET(struct subscription_persistence, local_name));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, local_port));
+ ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
+ FLDSET(struct subscription_persistence, cseq));
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
+ persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
+ persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
+ ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
+ persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
+
+ if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+ ast_sip_push_task(NULL, subscription_persistence_load, NULL);
+ } else {
+ stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
+ }
+
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
ami_show_subscriptions_inbound);
ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
diff --git a/res/res_pjsip_pubsub.exports.in b/res/res_pjsip_pubsub.exports.in
index f279b5433..0877d5c6c 100644
--- a/res/res_pjsip_pubsub.exports.in
+++ b/res/res_pjsip_pubsub.exports.in
@@ -5,6 +5,7 @@
LINKER_SYMBOL_PREFIXast_sip_subscription_get_serializer;
LINKER_SYMBOL_PREFIXast_sip_subscription_get_evsub;
LINKER_SYMBOL_PREFIXast_sip_subscription_get_dlg;
+ LINKER_SYMBOL_PREFIXast_sip_subscription_accept;
LINKER_SYMBOL_PREFIXast_sip_subscription_send_request;
LINKER_SYMBOL_PREFIXast_sip_subscription_alloc_datastore;
LINKER_SYMBOL_PREFIXast_sip_subscription_add_datastore;