summaryrefslogtreecommitdiff
path: root/res/res_pjsip_pubsub.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r--res/res_pjsip_pubsub.c1155
1 files changed, 1155 insertions, 0 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c
new file mode 100644
index 000000000..1ccfc1972
--- /dev/null
+++ b/res/res_pjsip_pubsub.c
@@ -0,0 +1,1155 @@
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2013, Digium, Inc.
+ *
+ * Mark Michelson <mmichelson@digium.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+/*!
+ * \brief Opaque structure representing an RFC 3265 SIP subscription
+ */
+
+/*** MODULEINFO
+ <depend>pjproject</depend>
+ <depend>res_pjsip</depend>
+ <support_level>core</support_level>
+ ***/
+
+#include "asterisk.h"
+
+#include <pjsip.h>
+#include <pjsip_simple.h>
+#include <pjlib.h>
+
+#include "asterisk/res_pjsip_pubsub.h"
+#include "asterisk/module.h"
+#include "asterisk/linkedlists.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/datastore.h"
+#include "asterisk/uuid.h"
+#include "asterisk/taskprocessor.h"
+#include "asterisk/sched.h"
+#include "asterisk/res_pjsip.h"
+
+static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
+
+static struct pjsip_module pubsub_module = {
+ .name = { "PubSub Module", 13 },
+ .priority = PJSIP_MOD_PRIORITY_APPLICATION,
+ .on_rx_request = pubsub_on_rx_request,
+};
+
+static const pj_str_t str_event_name = { "Event", 5 };
+
+/*! \brief Scheduler used for automatically expiring publications */
+static struct ast_sched_context *sched;
+
+/*! \brief Number of buckets for publications (on a per handler) */
+#define PUBLICATIONS_BUCKETS 37
+
+/*! \brief Default expiration time for PUBLISH if one is not specified */
+#define DEFAULT_PUBLISH_EXPIRES 3600
+
+/*! \brief Defined method for PUBLISH */
+const pjsip_method pjsip_publish_method =
+{
+ PJSIP_OTHER_METHOD,
+ { "PUBLISH", 7 }
+};
+
+/*!
+ * \brief The types of PUBLISH messages defined in RFC 3903
+ */
+enum sip_publish_type {
+ /*!
+ * \brief Unknown
+ *
+ * \details
+ * This actually is not defined in RFC 3903. We use this as a constant
+ * to indicate that an incoming PUBLISH does not fit into any of the
+ * other categories and is thus invalid.
+ */
+ SIP_PUBLISH_UNKNOWN,
+
+ /*!
+ * \brief Initial
+ *
+ * \details
+ * The first PUBLISH sent. This will contain a non-zero Expires header
+ * as well as a body that indicates the current state of the endpoint
+ * that has sent the message. The initial PUBLISH is the only type
+ * of PUBLISH to not contain a Sip-If-Match header in it.
+ */
+ SIP_PUBLISH_INITIAL,
+
+ /*!
+ * \brief Refresh
+ *
+ * \details
+ * Used to keep a published state from expiring. This will contain a
+ * non-zero Expires header but no body since its purpose is not to
+ * update state.
+ */
+ SIP_PUBLISH_REFRESH,
+
+ /*!
+ * \brief Modify
+ *
+ * \details
+ * Used to change state from its previous value. This will contain
+ * a body updating the published state. May or may not contain an
+ * Expires header.
+ */
+ SIP_PUBLISH_MODIFY,
+
+ /*!
+ * \brief Remove
+ *
+ * \details
+ * Used to remove published state from an ESC. This will contain
+ * an Expires header set to 0 and likely no body.
+ */
+ SIP_PUBLISH_REMOVE,
+};
+
+/*!
+ * Used to create new entity IDs by ESCs.
+ */
+static int esc_etag_counter;
+
+/*!
+ * \brief Structure representing a SIP publication
+ */
+struct ast_sip_publication {
+ /*! Publication datastores set up by handlers */
+ struct ao2_container *datastores;
+ /*! \brief Entity tag for the publication */
+ int entity_tag;
+ /*! \brief Handler for this publication */
+ struct ast_sip_publish_handler *handler;
+ /*! \brief The endpoint with which the subscription is communicating */
+ struct ast_sip_endpoint *endpoint;
+ /*! \brief Expiration time of the publication */
+ int expires;
+ /*! \brief Scheduled item for expiration of publication */
+ int sched_id;
+};
+
+/*!
+ * \brief Structure representing a SIP subscription
+ */
+struct ast_sip_subscription {
+ /*! Subscription datastores set up by handlers */
+ struct ao2_container *datastores;
+ /*! The endpoint with which the subscription is communicating */
+ struct ast_sip_endpoint *endpoint;
+ /*! Serializer on which to place operations for this subscription */
+ struct ast_taskprocessor *serializer;
+ /*! The handler for this subscription */
+ const struct ast_sip_subscription_handler *handler;
+ /*! The role for this subscription */
+ enum ast_sip_subscription_role role;
+ /*! The underlying PJSIP event subscription structure */
+ pjsip_evsub *evsub;
+ /*! The underlying PJSIP dialog */
+ pjsip_dialog *dlg;
+};
+
+#define DATASTORE_BUCKETS 53
+
+#define DEFAULT_EXPIRES 3600
+
+static int datastore_hash(const void *obj, int flags)
+{
+ const struct ast_datastore *datastore = obj;
+ const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
+
+ ast_assert(uid != NULL);
+
+ return ast_str_hash(uid);
+}
+
+static int datastore_cmp(void *obj, void *arg, int flags)
+{
+ const struct ast_datastore *datastore1 = obj;
+ const struct ast_datastore *datastore2 = arg;
+ const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
+
+ ast_assert(datastore1->uid != NULL);
+ ast_assert(uid2 != NULL);
+
+ return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+static void subscription_destructor(void *obj)
+{
+ struct ast_sip_subscription *sub = obj;
+
+ ast_debug(3, "Destroying SIP subscription\n");
+
+ ao2_cleanup(sub->datastores);
+ ao2_cleanup(sub->endpoint);
+
+ if (sub->dlg) {
+ /* This is why we keep the dialog on the subscription. When the subscription
+ * is destroyed, there is no guarantee that the underlying dialog is ready
+ * to be destroyed. Furthermore, there's no guarantee in the opposite direction
+ * either. The dialog could be destroyed before our subscription is. We fix
+ * this problem by keeping a reference to the dialog until it is time to
+ * destroy the subscription. We need to have the dialog available when the
+ * subscription is destroyed so that we can guarantee that our attempt to
+ * remove the serializer will be successful.
+ */
+ ast_sip_dialog_set_serializer(sub->dlg, NULL);
+ pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
+ }
+ ast_taskprocessor_unreference(sub->serializer);
+}
+
+static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
+static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
+static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
+ int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
+ pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
+static void pubsub_on_client_refresh(pjsip_evsub *sub);
+static void pubsub_on_server_timeout(pjsip_evsub *sub);
+
+
+static pjsip_evsub_user pubsub_cb = {
+ .on_evsub_state = pubsub_on_evsub_state,
+ .on_tsx_state = pubsub_on_tsx_state,
+ .on_rx_refresh = pubsub_on_rx_refresh,
+ .on_rx_notify = pubsub_on_rx_notify,
+ .on_client_refresh = pubsub_on_client_refresh,
+ .on_server_timeout = pubsub_on_server_timeout,
+};
+
+static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
+ struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
+{
+ pjsip_evsub *evsub;
+ /* PJSIP is kind enough to have some built-in support for certain
+ * events. We need to use the correct initialization function for the
+ * built-in events
+ */
+ if (role == AST_SIP_NOTIFIER) {
+ if (!strcmp(event, "message-summary")) {
+ pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub);
+ } else {
+ pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
+ }
+ } else {
+ if (!strcmp(event, "message-summary")) {
+ pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub);
+ } else {
+ pj_str_t pj_event;
+ pj_cstr(&pj_event, event);
+ pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
+ }
+ }
+ return evsub;
+}
+
+struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
+ enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
+{
+ struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
+ pjsip_dialog *dlg;
+
+ if (!sub) {
+ return NULL;
+ }
+ sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+ if (!sub->datastores) {
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ sub->serializer = ast_sip_create_serializer();
+ if (!sub->serializer) {
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ sub->role = role;
+ if (role == AST_SIP_NOTIFIER) {
+ pjsip_dlg_create_uas(pjsip_ua_instance(), rdata, NULL, &dlg);
+ } else {
+ RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
+
+ contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
+ if (!contact || ast_strlen_zero(contact->uri)) {
+ ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
+ ast_sorcery_object_get_id(endpoint));
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ dlg = ast_sip_create_dialog(endpoint, contact->uri, NULL);
+ }
+ if (!dlg) {
+ ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
+ ao2_ref(sub, -1);
+ return NULL;
+ }
+ sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
+ /* We keep a reference to the dialog until our subscription is destroyed. See
+ * the subscription_destructor for more details
+ */
+ pjsip_dlg_inc_session(dlg, &pubsub_module);
+ sub->dlg = dlg;
+ ast_sip_dialog_set_serializer(dlg, sub->serializer);
+ pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
+ ao2_ref(endpoint, +1);
+ sub->endpoint = endpoint;
+ sub->handler = handler;
+ return sub;
+}
+
+struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
+{
+ ast_assert(sub->endpoint != NULL);
+ ao2_ref(sub->endpoint, +1);
+ return sub->endpoint;
+}
+
+struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
+{
+ ast_assert(sub->serializer != NULL);
+ return sub->serializer;
+}
+
+pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
+{
+ return sub->evsub;
+}
+
+pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
+{
+ return sub->dlg;
+}
+
+int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
+{
+ return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
+ tdata) == PJ_SUCCESS ? 0 : -1;
+}
+
+static void subscription_datastore_destroy(void *obj)
+{
+ struct ast_datastore *datastore = obj;
+
+ /* Using the destroy function (if present) destroy the data */
+ if (datastore->info->destroy != NULL && datastore->data != NULL) {
+ datastore->info->destroy(datastore->data);
+ datastore->data = NULL;
+ }
+
+ ast_free((void *) datastore->uid);
+ datastore->uid = NULL;
+}
+
+struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
+{
+ RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
+ const char *uid_ptr = uid;
+
+ if (!info) {
+ return NULL;
+ }
+
+ datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
+ if (!datastore) {
+ return NULL;
+ }
+
+ datastore->info = info;
+ if (ast_strlen_zero(uid)) {
+ /* They didn't provide an ID so we'll provide one ourself */
+ struct ast_uuid *uuid = ast_uuid_generate();
+ char uuid_buf[AST_UUID_STR_LEN];
+ if (!uuid) {
+ return NULL;
+ }
+ uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
+ ast_free(uuid);
+ }
+
+ datastore->uid = ast_strdup(uid_ptr);
+ if (!datastore->uid) {
+ return NULL;
+ }
+
+ ao2_ref(datastore, +1);
+ return datastore;
+}
+
+int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
+{
+ ast_assert(datastore != NULL);
+ ast_assert(datastore->info != NULL);
+ ast_assert(!ast_strlen_zero(datastore->uid));
+
+ if (!ao2_link(subscription->datastores, datastore)) {
+ return -1;
+ }
+ return 0;
+}
+
+struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
+{
+ return ao2_find(subscription->datastores, name, OBJ_KEY);
+}
+
+void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
+{
+ ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
+}
+
+int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
+{
+ ast_assert(datastore != NULL);
+ ast_assert(datastore->info != NULL);
+ ast_assert(!ast_strlen_zero(datastore->uid));
+
+ if (!ao2_link(publication->datastores, datastore)) {
+ return -1;
+ }
+ return 0;
+}
+
+struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
+{
+ return ao2_find(publication->datastores, name, OBJ_KEY);
+}
+
+void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
+{
+ ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
+}
+
+AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
+
+static int publication_hash_fn(const void *obj, const int flags)
+{
+ const struct ast_sip_publication *publication = obj;
+ const int *entity_tag = obj;
+
+ return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
+}
+
+static int publication_cmp_fn(void *obj, void *arg, int flags)
+{
+ const struct ast_sip_publication *publication1 = obj;
+ const struct ast_sip_publication *publication2 = arg;
+ const int *entity_tag = arg;
+
+ return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
+ CMP_MATCH | CMP_STOP : 0);
+}
+
+static void publish_add_handler(struct ast_sip_publish_handler *handler)
+{
+ SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
+}
+
+int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
+{
+ if (ast_strlen_zero(handler->event_name)) {
+ ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
+ return -1;
+ }
+
+ if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
+ publication_hash_fn, publication_cmp_fn))) {
+ ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
+ handler->event_name);
+ return -1;
+ }
+
+ publish_add_handler(handler);
+
+ ast_module_ref(ast_module_info->self);
+
+ return 0;
+}
+
+void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
+{
+ struct ast_sip_publish_handler *iter;
+ SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
+ if (handler == iter) {
+ AST_RWLIST_REMOVE_CURRENT(next);
+ ao2_cleanup(handler->publications);
+ ast_module_unref(ast_module_info->self);
+ break;
+ }
+ }
+ AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
+AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
+
+static void sub_add_handler(struct ast_sip_subscription_handler *handler)
+{
+ SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
+ ast_module_ref(ast_module_info->self);
+}
+
+static int sub_handler_exists_for_event_name(const char *event_name)
+{
+ struct ast_sip_subscription_handler *iter;
+ SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
+ if (!strcmp(iter->event_name, event_name)) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
+{
+ pj_str_t accept[AST_SIP_MAX_ACCEPT];
+ int i;
+
+ if (ast_strlen_zero(handler->event_name)) {
+ ast_log(LOG_ERROR, "No event package specifief for subscription handler. Cannot register\n");
+ return -1;
+ }
+
+ if (ast_strlen_zero(handler->accept[0])) {
+ ast_log(LOG_ERROR, "Subscription handler must supply at least one 'Accept' format\n");
+ return -1;
+ }
+
+ for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
+ pj_cstr(&accept[i], handler->accept[i]);
+ }
+
+ if (!sub_handler_exists_for_event_name(handler->event_name)) {
+ pj_str_t event;
+
+ pj_cstr(&event, handler->event_name);
+
+ if (!strcmp(handler->event_name, "message-summary")) {
+ pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
+ } else {
+ pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
+ }
+ } else {
+ pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL,
+ i, accept);
+ }
+
+ sub_add_handler(handler);
+ return 0;
+}
+
+void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
+{
+ struct ast_sip_subscription_handler *iter;
+ SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
+ if (handler == iter) {
+ AST_RWLIST_REMOVE_CURRENT(next);
+ ast_module_unref(ast_module_info->self);
+ break;
+ }
+ }
+ AST_RWLIST_TRAVERSE_SAFE_END;
+}
+
+static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
+{
+ struct ast_sip_subscription_handler *iter;
+ int match = 0;
+ SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+ AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
+ int i;
+ int j;
+ if (strcmp(event, iter->event_name)) {
+ ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
+ continue;
+ }
+ ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
+ for (i = 0; i < num_accept; ++i) {
+ for (j = 0; j < num_accept; ++j) {
+ if (ast_strlen_zero(iter->accept[i])) {
+ ast_debug(3, "Breaking because subscription handler has run out of 'accept' types\n");
+ break;
+ }
+ if (!strcmp(accept[j], iter->accept[i])) {
+ ast_debug(3, "Accept headers match: %s = %s\n", accept[j], iter->accept[i]);
+ match = 1;
+ break;
+ }
+ ast_debug(3, "Accept %s does not match %s\n", accept[j], iter->accept[i]);
+ }
+ if (match) {
+ break;
+ }
+ }
+ if (match) {
+ break;
+ }
+ }
+
+ return iter;
+}
+
+static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
+{
+ char event[32];
+ char accept[AST_SIP_MAX_ACCEPT][64];
+ pjsip_accept_hdr *accept_header;
+ pjsip_event_hdr *event_header;
+ pjsip_expires_hdr *expires_header;
+ struct ast_sip_subscription_handler *handler;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ struct ast_sip_subscription *sub;
+ int i;
+
+ endpoint = ast_pjsip_rdata_get_endpoint(rdata);
+ ast_assert(endpoint != NULL);
+
+ if (!endpoint->subscription.allow) {
+ ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
+
+ if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) {
+ ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %d\n",
+ expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
+ if (!event_header) {
+ ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
+ if (!accept_header) {
+ ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Accept header\n");
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+ for (i = 0; i < accept_header->count; ++i) {
+ ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
+ }
+
+ handler = find_sub_handler(event, accept, accept_header->count);
+ if (!handler) {
+ ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+ sub = handler->new_subscribe(endpoint, rdata);
+ if (!sub) {
+ pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
+
+ if (trans) {
+ pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
+ pjsip_tx_data *tdata;
+
+ if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
+ return PJ_TRUE;
+ }
+ pjsip_dlg_send_response(dlg, trans, tdata);
+ } else {
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
+ }
+ }
+ return PJ_TRUE;
+}
+
+static struct ast_sip_publish_handler *find_pub_handler(const char *event)
+{
+ struct ast_sip_publish_handler *iter = NULL;
+ SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
+
+ AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
+ if (strcmp(event, iter->event_name)) {
+ ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
+ continue;
+ }
+ ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
+ break;
+ }
+
+ return iter;
+}
+
+static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
+ pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
+{
+ pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+ if (etag_hdr) {
+ char etag[pj_strlen(&etag_hdr->hvalue) + 1];
+
+ ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
+
+ if (sscanf(etag, "%30d", entity_id) != 1) {
+ return SIP_PUBLISH_UNKNOWN;
+ }
+ }
+
+ *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+
+ if (!(*expires)) {
+ return SIP_PUBLISH_REMOVE;
+ } else if (!etag_hdr && rdata->msg_info.msg->body) {
+ return SIP_PUBLISH_INITIAL;
+ } else if (etag_hdr && !rdata->msg_info.msg->body) {
+ return SIP_PUBLISH_REFRESH;
+ } else if (etag_hdr && rdata->msg_info.msg->body) {
+ return SIP_PUBLISH_MODIFY;
+ }
+
+ return SIP_PUBLISH_UNKNOWN;
+}
+
+static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
+ struct ast_sip_publish_handler *handler)
+{
+ struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
+
+ if (!publication) {
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+ return NULL;
+ }
+
+ publication->handler = handler;
+
+ return publication;
+}
+
+static int publish_expire_callback(void *data)
+{
+ RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
+
+ publication->handler->publish_expire(publication);
+
+ return 0;
+}
+
+static int publish_expire(const void *data)
+{
+ struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
+
+ ao2_unlink(publication->handler->publications, publication);
+ publication->sched_id = -1;
+
+ if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
+ ao2_cleanup(publication);
+ }
+
+ return 0;
+}
+
+static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
+{
+ pjsip_event_hdr *event_header;
+ struct ast_sip_publish_handler *handler;
+ RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
+ char event[32];
+ static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
+ pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
+ enum sip_publish_type publish_type;
+ RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
+ int expires = 0, entity_id;
+
+ endpoint = ast_pjsip_rdata_get_endpoint(rdata);
+ ast_assert(endpoint != NULL);
+
+ event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
+ if (!event_header) {
+ ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+ ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
+
+ handler = find_pub_handler(event);
+ if (!handler) {
+ ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
+
+ /* If this is not an initial publish ensure that a publication is present */
+ if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
+ if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
+ static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
+
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
+ NULL, NULL);
+ return PJ_TRUE;
+ }
+
+ /* Per the RFC every response has to have a new entity tag */
+ publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
+
+ /* Update the expires here so that the created responses will contain the correct value */
+ publication->expires = expires;
+ }
+
+ switch (publish_type) {
+ case SIP_PUBLISH_INITIAL:
+ publication = publish_request_initial(endpoint, rdata, handler);
+ break;
+ case SIP_PUBLISH_REFRESH:
+ case SIP_PUBLISH_MODIFY:
+ if (handler->publish_refresh(publication, rdata)) {
+ /* If an error occurs we want to terminate the publication */
+ expires = 0;
+ }
+ break;
+ case SIP_PUBLISH_REMOVE:
+ handler->publish_termination(publication, rdata);
+ break;
+ case SIP_PUBLISH_UNKNOWN:
+ default:
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
+ break;
+ }
+
+ if (publication) {
+ if (expires) {
+ ao2_link(handler->publications, publication);
+
+ AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
+ ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
+ } else {
+ AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
+ }
+ }
+
+ return PJ_TRUE;
+}
+
+/*! \brief Internal destructor for publications */
+static void publication_destroy_fn(void *obj)
+{
+ struct ast_sip_publication *publication = obj;
+
+ ast_debug(3, "Destroying SIP publication\n");
+
+ ao2_cleanup(publication->datastores);
+ ao2_cleanup(publication->endpoint);
+}
+
+struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
+{
+ struct ast_sip_publication *publication;
+ pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
+
+ ast_assert(endpoint != NULL);
+
+ if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
+ return NULL;
+ }
+
+ if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
+ ao2_ref(publication, -1);
+ return NULL;
+ }
+
+ publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
+ ao2_ref(endpoint, +1);
+ publication->endpoint = endpoint;
+ publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
+ publication->sched_id = -1;
+
+ return publication;
+}
+
+struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
+{
+ return pub->endpoint;
+}
+
+int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
+ pjsip_tx_data **tdata)
+{
+ if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
+ return -1;
+ }
+
+ if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
+ RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
+ RAII_VAR(char *, expires, NULL, ast_free_ptr);
+
+ if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
+ (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
+ pjsip_tx_data_dec_ref(*tdata);
+ return -1;
+ }
+
+ ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
+ ast_sip_add_header(*tdata, "Expires", expires);
+ }
+
+ return 0;
+}
+
+pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
+ pjsip_tx_data *tdata)
+{
+ pj_status_t status;
+ pjsip_transaction *tsx;
+
+ if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
+ return status;
+ }
+
+ pjsip_tsx_recv_msg(tsx, rdata);
+
+ return pjsip_tsx_send_msg(tsx, tdata);
+}
+
+static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
+{
+ if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
+ return pubsub_on_rx_subscribe_request(rdata);
+ } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
+ return pubsub_on_rx_publish_request(rdata);
+ }
+
+ return PJ_FALSE;
+}
+
+static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
+{
+ struct ast_sip_subscription *sub;
+ if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
+ return;
+ }
+
+ sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+ if (!sub) {
+ return;
+ }
+
+ if (event->type == PJSIP_EVENT_RX_MSG) {
+ sub->handler->subscription_terminated(sub, event->body.rx_msg.rdata);
+ }
+
+ if (event->type == PJSIP_EVENT_TSX_STATE &&
+ event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
+ sub->handler->subscription_terminated(sub, event->body.tsx_state.src.rdata);
+ }
+
+ if (sub->handler->subscription_shutdown) {
+ sub->handler->subscription_shutdown(sub);
+ }
+ pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
+}
+
+static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
+{
+ struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+
+ if (!sub) {
+ return;
+ }
+
+ if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
+ event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
+ sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
+ }
+}
+
+static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
+ pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
+ struct ast_sip_subscription_response_data *response_data)
+{
+ ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
+ *p_st_code = response_data->status_code;
+
+ if (!ast_strlen_zero(response_data->status_text)) {
+ pj_strdup2(pool, *p_st_text, response_data->status_text);
+ }
+
+ if (response_data->headers) {
+ struct ast_variable *iter;
+ for (iter = response_data->headers; iter; iter = iter->next) {
+ pj_str_t header_name;
+ pj_str_t header_value;
+ pjsip_generic_string_hdr *hdr;
+
+ pj_cstr(&header_name, iter->name);
+ pj_cstr(&header_value, iter->value);
+ hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
+ pj_list_insert_before(res_hdr, hdr);
+ }
+ }
+
+ if (response_data->body) {
+ pj_str_t type;
+ pj_str_t subtype;
+ pj_str_t body_text;
+
+ pj_cstr(&type, response_data->body->type);
+ pj_cstr(&subtype, response_data->body->subtype);
+ pj_cstr(&body_text, response_data->body->body_text);
+
+ *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
+ }
+}
+
+static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
+{
+ if (response_data->status_code != 200 ||
+ !ast_strlen_zero(response_data->status_text) ||
+ response_data->headers ||
+ response_data->body) {
+ return 1;
+ }
+ return 0;
+}
+
+static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
+ int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
+{
+ struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+ struct ast_sip_subscription_response_data response_data = {
+ .status_code = 200,
+ };
+
+ if (!sub) {
+ return;
+ }
+
+ sub->handler->resubscribe(sub, rdata, &response_data);
+
+ if (!response_data_changed(&response_data)) {
+ return;
+ }
+
+ set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
+ res_hdr, p_body, &response_data);
+}
+
+static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
+ pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
+{
+ struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+ struct ast_sip_subscription_response_data response_data = {
+ .status_code = 200,
+ };
+
+ if (!sub || !sub->handler->notify_request) {
+ return;
+ }
+
+ sub->handler->notify_request(sub, rdata, &response_data);
+
+ if (!response_data_changed(&response_data)) {
+ return;
+ }
+
+ set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
+ res_hdr, p_body, &response_data);
+}
+
+static int serialized_pubsub_on_client_refresh(void *userdata)
+{
+ struct ast_sip_subscription *sub = userdata;
+
+ sub->handler->refresh_subscription(sub);
+ ao2_cleanup(sub);
+ return 0;
+}
+
+static void pubsub_on_client_refresh(pjsip_evsub *evsub)
+{
+ struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+
+ ao2_ref(sub, +1);
+ ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
+}
+
+static int serialized_pubsub_on_server_timeout(void *userdata)
+{
+ struct ast_sip_subscription *sub = userdata;
+
+ sub->handler->subscription_timeout(sub);
+ ao2_cleanup(sub);
+ return 0;
+}
+
+static void pubsub_on_server_timeout(pjsip_evsub *evsub)
+{
+ struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
+
+ ao2_ref(sub, +1);
+ ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
+}
+
+static int load_module(void)
+{
+ static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
+
+ pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
+
+ if (!(sched = ast_sched_context_create())) {
+ ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ if (ast_sched_start_thread(sched)) {
+ ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
+ ast_sched_context_destroy(sched);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
+
+ if (ast_sip_register_service(&pubsub_module)) {
+ ast_log(LOG_ERROR, "Could not register pubsub service\n");
+ ast_sched_context_destroy(sched);
+ return AST_MODULE_LOAD_FAILURE;
+ }
+
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+ if (sched) {
+ ast_sched_context_destroy(sched);
+ }
+
+ return 0;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
+ .load = load_module,
+ .unload = unload_module,
+ .load_pri = AST_MODPRI_CHANNEL_DEPEND,
+);