diff options
Diffstat (limited to 'res/res_pjsip_pubsub.c')
-rw-r--r-- | res/res_pjsip_pubsub.c | 1155 |
1 files changed, 1155 insertions, 0 deletions
diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c new file mode 100644 index 000000000..1ccfc1972 --- /dev/null +++ b/res/res_pjsip_pubsub.c @@ -0,0 +1,1155 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2013, Digium, Inc. + * + * Mark Michelson <mmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ +/*! + * \brief Opaque structure representing an RFC 3265 SIP subscription + */ + +/*** MODULEINFO + <depend>pjproject</depend> + <depend>res_pjsip</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +#include <pjsip.h> +#include <pjsip_simple.h> +#include <pjlib.h> + +#include "asterisk/res_pjsip_pubsub.h" +#include "asterisk/module.h" +#include "asterisk/linkedlists.h" +#include "asterisk/astobj2.h" +#include "asterisk/datastore.h" +#include "asterisk/uuid.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/sched.h" +#include "asterisk/res_pjsip.h" + +static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata); + +static struct pjsip_module pubsub_module = { + .name = { "PubSub Module", 13 }, + .priority = PJSIP_MOD_PRIORITY_APPLICATION, + .on_rx_request = pubsub_on_rx_request, +}; + +static const pj_str_t str_event_name = { "Event", 5 }; + +/*! \brief Scheduler used for automatically expiring publications */ +static struct ast_sched_context *sched; + +/*! \brief Number of buckets for publications (on a per handler) */ +#define PUBLICATIONS_BUCKETS 37 + +/*! \brief Default expiration time for PUBLISH if one is not specified */ +#define DEFAULT_PUBLISH_EXPIRES 3600 + +/*! \brief Defined method for PUBLISH */ +const pjsip_method pjsip_publish_method = +{ + PJSIP_OTHER_METHOD, + { "PUBLISH", 7 } +}; + +/*! + * \brief The types of PUBLISH messages defined in RFC 3903 + */ +enum sip_publish_type { + /*! + * \brief Unknown + * + * \details + * This actually is not defined in RFC 3903. We use this as a constant + * to indicate that an incoming PUBLISH does not fit into any of the + * other categories and is thus invalid. + */ + SIP_PUBLISH_UNKNOWN, + + /*! + * \brief Initial + * + * \details + * The first PUBLISH sent. This will contain a non-zero Expires header + * as well as a body that indicates the current state of the endpoint + * that has sent the message. The initial PUBLISH is the only type + * of PUBLISH to not contain a Sip-If-Match header in it. + */ + SIP_PUBLISH_INITIAL, + + /*! + * \brief Refresh + * + * \details + * Used to keep a published state from expiring. This will contain a + * non-zero Expires header but no body since its purpose is not to + * update state. + */ + SIP_PUBLISH_REFRESH, + + /*! + * \brief Modify + * + * \details + * Used to change state from its previous value. This will contain + * a body updating the published state. May or may not contain an + * Expires header. + */ + SIP_PUBLISH_MODIFY, + + /*! + * \brief Remove + * + * \details + * Used to remove published state from an ESC. This will contain + * an Expires header set to 0 and likely no body. + */ + SIP_PUBLISH_REMOVE, +}; + +/*! + * Used to create new entity IDs by ESCs. + */ +static int esc_etag_counter; + +/*! + * \brief Structure representing a SIP publication + */ +struct ast_sip_publication { + /*! Publication datastores set up by handlers */ + struct ao2_container *datastores; + /*! \brief Entity tag for the publication */ + int entity_tag; + /*! \brief Handler for this publication */ + struct ast_sip_publish_handler *handler; + /*! \brief The endpoint with which the subscription is communicating */ + struct ast_sip_endpoint *endpoint; + /*! \brief Expiration time of the publication */ + int expires; + /*! \brief Scheduled item for expiration of publication */ + int sched_id; +}; + +/*! + * \brief Structure representing a SIP subscription + */ +struct ast_sip_subscription { + /*! Subscription datastores set up by handlers */ + struct ao2_container *datastores; + /*! The endpoint with which the subscription is communicating */ + struct ast_sip_endpoint *endpoint; + /*! Serializer on which to place operations for this subscription */ + struct ast_taskprocessor *serializer; + /*! The handler for this subscription */ + const struct ast_sip_subscription_handler *handler; + /*! The role for this subscription */ + enum ast_sip_subscription_role role; + /*! The underlying PJSIP event subscription structure */ + pjsip_evsub *evsub; + /*! The underlying PJSIP dialog */ + pjsip_dialog *dlg; +}; + +#define DATASTORE_BUCKETS 53 + +#define DEFAULT_EXPIRES 3600 + +static int datastore_hash(const void *obj, int flags) +{ + const struct ast_datastore *datastore = obj; + const char *uid = flags & OBJ_KEY ? obj : datastore->uid; + + ast_assert(uid != NULL); + + return ast_str_hash(uid); +} + +static int datastore_cmp(void *obj, void *arg, int flags) +{ + const struct ast_datastore *datastore1 = obj; + const struct ast_datastore *datastore2 = arg; + const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid; + + ast_assert(datastore1->uid != NULL); + ast_assert(uid2 != NULL); + + return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP; +} + +static void subscription_destructor(void *obj) +{ + struct ast_sip_subscription *sub = obj; + + ast_debug(3, "Destroying SIP subscription\n"); + + ao2_cleanup(sub->datastores); + ao2_cleanup(sub->endpoint); + + if (sub->dlg) { + /* This is why we keep the dialog on the subscription. When the subscription + * is destroyed, there is no guarantee that the underlying dialog is ready + * to be destroyed. Furthermore, there's no guarantee in the opposite direction + * either. The dialog could be destroyed before our subscription is. We fix + * this problem by keeping a reference to the dialog until it is time to + * destroy the subscription. We need to have the dialog available when the + * subscription is destroyed so that we can guarantee that our attempt to + * remove the serializer will be successful. + */ + ast_sip_dialog_set_serializer(sub->dlg, NULL); + pjsip_dlg_dec_session(sub->dlg, &pubsub_module); + } + ast_taskprocessor_unreference(sub->serializer); +} + +static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event); +static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event); +static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, + int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); +static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, + pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); +static void pubsub_on_client_refresh(pjsip_evsub *sub); +static void pubsub_on_server_timeout(pjsip_evsub *sub); + + +static pjsip_evsub_user pubsub_cb = { + .on_evsub_state = pubsub_on_evsub_state, + .on_tsx_state = pubsub_on_tsx_state, + .on_rx_refresh = pubsub_on_rx_refresh, + .on_rx_notify = pubsub_on_rx_notify, + .on_client_refresh = pubsub_on_client_refresh, + .on_server_timeout = pubsub_on_server_timeout, +}; + +static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role, + struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg) +{ + pjsip_evsub *evsub; + /* PJSIP is kind enough to have some built-in support for certain + * events. We need to use the correct initialization function for the + * built-in events + */ + if (role == AST_SIP_NOTIFIER) { + if (!strcmp(event, "message-summary")) { + pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub); + } else { + pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub); + } + } else { + if (!strcmp(event, "message-summary")) { + pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub); + } else { + pj_str_t pj_event; + pj_cstr(&pj_event, event); + pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub); + } + } + return evsub; +} + +struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, + enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +{ + struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor); + pjsip_dialog *dlg; + + if (!sub) { + return NULL; + } + sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp); + if (!sub->datastores) { + ao2_ref(sub, -1); + return NULL; + } + sub->serializer = ast_sip_create_serializer(); + if (!sub->serializer) { + ao2_ref(sub, -1); + return NULL; + } + sub->role = role; + if (role == AST_SIP_NOTIFIER) { + pjsip_dlg_create_uas(pjsip_ua_instance(), rdata, NULL, &dlg); + } else { + RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup); + + contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors); + if (!contact || ast_strlen_zero(contact->uri)) { + ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n", + ast_sorcery_object_get_id(endpoint)); + ao2_ref(sub, -1); + return NULL; + } + dlg = ast_sip_create_dialog(endpoint, contact->uri, NULL); + } + if (!dlg) { + ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); + ao2_ref(sub, -1); + return NULL; + } + sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg); + /* We keep a reference to the dialog until our subscription is destroyed. See + * the subscription_destructor for more details + */ + pjsip_dlg_inc_session(dlg, &pubsub_module); + sub->dlg = dlg; + ast_sip_dialog_set_serializer(dlg, sub->serializer); + pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub); + ao2_ref(endpoint, +1); + sub->endpoint = endpoint; + sub->handler = handler; + return sub; +} + +struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) +{ + ast_assert(sub->endpoint != NULL); + ao2_ref(sub->endpoint, +1); + return sub->endpoint; +} + +struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) +{ + ast_assert(sub->serializer != NULL); + return sub->serializer; +} + +pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub) +{ + return sub->evsub; +} + +pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub) +{ + return sub->dlg; +} + +int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) +{ + return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub), + tdata) == PJ_SUCCESS ? 0 : -1; +} + +static void subscription_datastore_destroy(void *obj) +{ + struct ast_datastore *datastore = obj; + + /* Using the destroy function (if present) destroy the data */ + if (datastore->info->destroy != NULL && datastore->data != NULL) { + datastore->info->destroy(datastore->data); + datastore->data = NULL; + } + + ast_free((void *) datastore->uid); + datastore->uid = NULL; +} + +struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid) +{ + RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup); + const char *uid_ptr = uid; + + if (!info) { + return NULL; + } + + datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy); + if (!datastore) { + return NULL; + } + + datastore->info = info; + if (ast_strlen_zero(uid)) { + /* They didn't provide an ID so we'll provide one ourself */ + struct ast_uuid *uuid = ast_uuid_generate(); + char uuid_buf[AST_UUID_STR_LEN]; + if (!uuid) { + return NULL; + } + uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf)); + ast_free(uuid); + } + + datastore->uid = ast_strdup(uid_ptr); + if (!datastore->uid) { + return NULL; + } + + ao2_ref(datastore, +1); + return datastore; +} + +int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore) +{ + ast_assert(datastore != NULL); + ast_assert(datastore->info != NULL); + ast_assert(!ast_strlen_zero(datastore->uid)); + + if (!ao2_link(subscription->datastores, datastore)) { + return -1; + } + return 0; +} + +struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name) +{ + return ao2_find(subscription->datastores, name, OBJ_KEY); +} + +void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name) +{ + ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); +} + +int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore) +{ + ast_assert(datastore != NULL); + ast_assert(datastore->info != NULL); + ast_assert(!ast_strlen_zero(datastore->uid)); + + if (!ao2_link(publication->datastores, datastore)) { + return -1; + } + return 0; +} + +struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name) +{ + return ao2_find(publication->datastores, name, OBJ_KEY); +} + +void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name) +{ + ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); +} + +AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler); + +static int publication_hash_fn(const void *obj, const int flags) +{ + const struct ast_sip_publication *publication = obj; + const int *entity_tag = obj; + + return flags & OBJ_KEY ? *entity_tag : publication->entity_tag; +} + +static int publication_cmp_fn(void *obj, void *arg, int flags) +{ + const struct ast_sip_publication *publication1 = obj; + const struct ast_sip_publication *publication2 = arg; + const int *entity_tag = arg; + + return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ? + CMP_MATCH | CMP_STOP : 0); +} + +static void publish_add_handler(struct ast_sip_publish_handler *handler) +{ + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next); +} + +int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler) +{ + if (ast_strlen_zero(handler->event_name)) { + ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n"); + return -1; + } + + if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS, + publication_hash_fn, publication_cmp_fn))) { + ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n", + handler->event_name); + return -1; + } + + publish_add_handler(handler); + + ast_module_ref(ast_module_info->self); + + return 0; +} + +void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler) +{ + struct ast_sip_publish_handler *iter; + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) { + if (handler == iter) { + AST_RWLIST_REMOVE_CURRENT(next); + ao2_cleanup(handler->publications); + ast_module_unref(ast_module_info->self); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; +} + +AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler); + +static void sub_add_handler(struct ast_sip_subscription_handler *handler) +{ + SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next); + ast_module_ref(ast_module_info->self); +} + +static int sub_handler_exists_for_event_name(const char *event_name) +{ + struct ast_sip_subscription_handler *iter; + SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + + AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) { + if (!strcmp(iter->event_name, event_name)) { + return 1; + } + } + return 0; +} + +int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler) +{ + pj_str_t accept[AST_SIP_MAX_ACCEPT]; + int i; + + if (ast_strlen_zero(handler->event_name)) { + ast_log(LOG_ERROR, "No event package specifief for subscription handler. Cannot register\n"); + return -1; + } + + if (ast_strlen_zero(handler->accept[0])) { + ast_log(LOG_ERROR, "Subscription handler must supply at least one 'Accept' format\n"); + return -1; + } + + for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) { + pj_cstr(&accept[i], handler->accept[i]); + } + + if (!sub_handler_exists_for_event_name(handler->event_name)) { + pj_str_t event; + + pj_cstr(&event, handler->event_name); + + if (!strcmp(handler->event_name, "message-summary")) { + pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance()); + } else { + pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); + } + } else { + pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL, + i, accept); + } + + sub_add_handler(handler); + return 0; +} + +void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler) +{ + struct ast_sip_subscription_handler *iter; + SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) { + if (handler == iter) { + AST_RWLIST_REMOVE_CURRENT(next); + ast_module_unref(ast_module_info->self); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; +} + +static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept) +{ + struct ast_sip_subscription_handler *iter; + int match = 0; + SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) { + int i; + int j; + if (strcmp(event, iter->event_name)) { + ast_debug(3, "Event %s does not match %s\n", event, iter->event_name); + continue; + } + ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name); + for (i = 0; i < num_accept; ++i) { + for (j = 0; j < num_accept; ++j) { + if (ast_strlen_zero(iter->accept[i])) { + ast_debug(3, "Breaking because subscription handler has run out of 'accept' types\n"); + break; + } + if (!strcmp(accept[j], iter->accept[i])) { + ast_debug(3, "Accept headers match: %s = %s\n", accept[j], iter->accept[i]); + match = 1; + break; + } + ast_debug(3, "Accept %s does not match %s\n", accept[j], iter->accept[i]); + } + if (match) { + break; + } + } + if (match) { + break; + } + } + + return iter; +} + +static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) +{ + char event[32]; + char accept[AST_SIP_MAX_ACCEPT][64]; + pjsip_accept_hdr *accept_header; + pjsip_event_hdr *event_header; + pjsip_expires_hdr *expires_header; + struct ast_sip_subscription_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct ast_sip_subscription *sub; + int i; + + endpoint = ast_pjsip_rdata_get_endpoint(rdata); + ast_assert(endpoint != NULL); + + if (!endpoint->subscription.allow) { + ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint)); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL); + return PJ_TRUE; + } + + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next); + + if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) { + ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %d\n", + expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL); + return PJ_TRUE; + } + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); + if (!accept_header) { + ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Accept header\n"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + return PJ_TRUE; + } + + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + for (i = 0; i < accept_header->count; ++i) { + ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); + } + + handler = find_sub_handler(event, accept, accept_header->count); + if (!handler) { + ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + sub = handler->new_subscribe(endpoint, rdata); + if (!sub) { + pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata); + + if (trans) { + pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata); + pjsip_tx_data *tdata; + + if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) { + return PJ_TRUE; + } + pjsip_dlg_send_response(dlg, trans, tdata); + } else { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); + } + } + return PJ_TRUE; +} + +static struct ast_sip_publish_handler *find_pub_handler(const char *event) +{ + struct ast_sip_publish_handler *iter = NULL; + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + + AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) { + if (strcmp(event, iter->event_name)) { + ast_debug(3, "Event %s does not match %s\n", event, iter->event_name); + continue; + } + ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name); + break; + } + + return iter; +} + +static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata, + pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id) +{ + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + if (etag_hdr) { + char etag[pj_strlen(&etag_hdr->hvalue) + 1]; + + ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag)); + + if (sscanf(etag, "%30d", entity_id) != 1) { + return SIP_PUBLISH_UNKNOWN; + } + } + + *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + + if (!(*expires)) { + return SIP_PUBLISH_REMOVE; + } else if (!etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_INITIAL; + } else if (etag_hdr && !rdata->msg_info.msg->body) { + return SIP_PUBLISH_REFRESH; + } else if (etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_MODIFY; + } + + return SIP_PUBLISH_UNKNOWN; +} + +static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, + struct ast_sip_publish_handler *handler) +{ + struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata); + + if (!publication) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + return NULL; + } + + publication->handler = handler; + + return publication; +} + +static int publish_expire_callback(void *data) +{ + RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup); + + publication->handler->publish_expire(publication); + + return 0; +} + +static int publish_expire(const void *data) +{ + struct ast_sip_publication *publication = (struct ast_sip_publication*)data; + + ao2_unlink(publication->handler->publications, publication); + publication->sched_id = -1; + + if (ast_sip_push_task(NULL, publish_expire_callback, publication)) { + ao2_cleanup(publication); + } + + return 0; +} + +static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) +{ + pjsip_event_hdr *event_header; + struct ast_sip_publish_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + char event[32]; + static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 }; + pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL); + enum sip_publish_type publish_type; + RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup); + int expires = 0, entity_id; + + endpoint = ast_pjsip_rdata_get_endpoint(rdata); + ast_assert(endpoint != NULL); + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + + handler = find_pub_handler(event); + if (!handler) { + ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id); + + /* If this is not an initial publish ensure that a publication is present */ + if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) { + if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) { + static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 }; + + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed, + NULL, NULL); + return PJ_TRUE; + } + + /* Per the RFC every response has to have a new entity tag */ + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + + /* Update the expires here so that the created responses will contain the correct value */ + publication->expires = expires; + } + + switch (publish_type) { + case SIP_PUBLISH_INITIAL: + publication = publish_request_initial(endpoint, rdata, handler); + break; + case SIP_PUBLISH_REFRESH: + case SIP_PUBLISH_MODIFY: + if (handler->publish_refresh(publication, rdata)) { + /* If an error occurs we want to terminate the publication */ + expires = 0; + } + break; + case SIP_PUBLISH_REMOVE: + handler->publish_termination(publication, rdata); + break; + case SIP_PUBLISH_UNKNOWN: + default: + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + break; + } + + if (publication) { + if (expires) { + ao2_link(handler->publications, publication); + + AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication, + ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1)); + } else { + AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1)); + } + } + + return PJ_TRUE; +} + +/*! \brief Internal destructor for publications */ +static void publication_destroy_fn(void *obj) +{ + struct ast_sip_publication *publication = obj; + + ast_debug(3, "Destroying SIP publication\n"); + + ao2_cleanup(publication->datastores); + ao2_cleanup(publication->endpoint); +} + +struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +{ + struct ast_sip_publication *publication; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + ast_assert(endpoint != NULL); + + if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) { + return NULL; + } + + if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) { + ao2_ref(publication, -1); + return NULL; + } + + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + ao2_ref(endpoint, +1); + publication->endpoint = endpoint; + publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + publication->sched_id = -1; + + return publication; +} + +struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub) +{ + return pub->endpoint; +} + +int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata, + pjsip_tx_data **tdata) +{ + if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) { + return -1; + } + + if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { + RAII_VAR(char *, entity_tag, NULL, ast_free_ptr); + RAII_VAR(char *, expires, NULL, ast_free_ptr); + + if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) || + (ast_asprintf(&expires, "%d", pub->expires) < 0)) { + pjsip_tx_data_dec_ref(*tdata); + return -1; + } + + ast_sip_add_header(*tdata, "SIP-ETag", entity_tag); + ast_sip_add_header(*tdata, "Expires", expires); + } + + return 0; +} + +pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata, + pjsip_tx_data *tdata) +{ + pj_status_t status; + pjsip_transaction *tsx; + + if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) { + return status; + } + + pjsip_tsx_recv_msg(tsx, rdata); + + return pjsip_tsx_send_msg(tsx, tdata); +} + +static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata) +{ + if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) { + return pubsub_on_rx_subscribe_request(rdata); + } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) { + return pubsub_on_rx_publish_request(rdata); + } + + return PJ_FALSE; +} + +static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) +{ + struct ast_sip_subscription *sub; + if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) { + return; + } + + sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + if (!sub) { + return; + } + + if (event->type == PJSIP_EVENT_RX_MSG) { + sub->handler->subscription_terminated(sub, event->body.rx_msg.rdata); + } + + if (event->type == PJSIP_EVENT_TSX_STATE && + event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) { + sub->handler->subscription_terminated(sub, event->body.tsx_state.src.rdata); + } + + if (sub->handler->subscription_shutdown) { + sub->handler->subscription_shutdown(sub); + } + pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); +} + +static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event) +{ + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + + if (!sub) { + return; + } + + if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC && + event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) { + sub->handler->notify_response(sub, event->body.tsx_state.src.rdata); + } +} + +static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code, + pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body, + struct ast_sip_subscription_response_data *response_data) +{ + ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699); + *p_st_code = response_data->status_code; + + if (!ast_strlen_zero(response_data->status_text)) { + pj_strdup2(pool, *p_st_text, response_data->status_text); + } + + if (response_data->headers) { + struct ast_variable *iter; + for (iter = response_data->headers; iter; iter = iter->next) { + pj_str_t header_name; + pj_str_t header_value; + pjsip_generic_string_hdr *hdr; + + pj_cstr(&header_name, iter->name); + pj_cstr(&header_value, iter->value); + hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value); + pj_list_insert_before(res_hdr, hdr); + } + } + + if (response_data->body) { + pj_str_t type; + pj_str_t subtype; + pj_str_t body_text; + + pj_cstr(&type, response_data->body->type); + pj_cstr(&subtype, response_data->body->subtype); + pj_cstr(&body_text, response_data->body->body_text); + + *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text); + } +} + +static int response_data_changed(struct ast_sip_subscription_response_data *response_data) +{ + if (response_data->status_code != 200 || + !ast_strlen_zero(response_data->status_text) || + response_data->headers || + response_data->body) { + return 1; + } + return 0; +} + +static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, + int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) +{ + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + struct ast_sip_subscription_response_data response_data = { + .status_code = 200, + }; + + if (!sub) { + return; + } + + sub->handler->resubscribe(sub, rdata, &response_data); + + if (!response_data_changed(&response_data)) { + return; + } + + set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text, + res_hdr, p_body, &response_data); +} + +static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, + pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) +{ + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + struct ast_sip_subscription_response_data response_data = { + .status_code = 200, + }; + + if (!sub || !sub->handler->notify_request) { + return; + } + + sub->handler->notify_request(sub, rdata, &response_data); + + if (!response_data_changed(&response_data)) { + return; + } + + set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text, + res_hdr, p_body, &response_data); +} + +static int serialized_pubsub_on_client_refresh(void *userdata) +{ + struct ast_sip_subscription *sub = userdata; + + sub->handler->refresh_subscription(sub); + ao2_cleanup(sub); + return 0; +} + +static void pubsub_on_client_refresh(pjsip_evsub *evsub) +{ + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + + ao2_ref(sub, +1); + ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub); +} + +static int serialized_pubsub_on_server_timeout(void *userdata) +{ + struct ast_sip_subscription *sub = userdata; + + sub->handler->subscription_timeout(sub); + ao2_cleanup(sub); + return 0; +} + +static void pubsub_on_server_timeout(pjsip_evsub *evsub) +{ + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + + ao2_ref(sub, +1); + ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub); +} + +static int load_module(void) +{ + static const pj_str_t str_PUBLISH = { "PUBLISH", 7 }; + + pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint()); + + if (!(sched = ast_sched_context_create())) { + ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n"); + return AST_MODULE_LOAD_FAILURE; + } + + if (ast_sched_start_thread(sched)) { + ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n"); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; + } + + pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH); + + if (ast_sip_register_service(&pubsub_module)) { + ast_log(LOG_ERROR, "Could not register pubsub service\n"); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; + } + + return AST_MODULE_LOAD_SUCCESS; +} + +static int unload_module(void) +{ + if (sched) { + ast_sched_context_destroy(sched); + } + + return 0; +} + +AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource", + .load = load_module, + .unload = unload_module, + .load_pri = AST_MODPRI_CHANNEL_DEPEND, +); |