From f523a96635b8763503aab8c70d1c41683721efeb Mon Sep 17 00:00:00 2001 From: Joshua Colp Date: Sat, 29 Jun 2013 13:42:19 +0000 Subject: Implement the defined PUBLISH ESC API within res_sip_pubsub. (closes issue ASTERISK-21452) Review: https://reviewboard.asterisk.org/r/2630/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@393262 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- include/asterisk/res_sip_pubsub.h | 427 ++++++++++++++++++++---------- res/res_sip_pubsub.c | 532 +++++++++++++++++++++++++++++++++++--- res/res_sip_pubsub.exports.in | 9 + 3 files changed, 795 insertions(+), 173 deletions(-) diff --git a/include/asterisk/res_sip_pubsub.h b/include/asterisk/res_sip_pubsub.h index be443299c..e74f23bf5 100644 --- a/include/asterisk/res_sip_pubsub.h +++ b/include/asterisk/res_sip_pubsub.h @@ -29,19 +29,174 @@ struct ast_sip_endpoint; struct ast_datastore; struct ast_datastore_info; +/*! + * \brief Opaque structure representing a publication + */ +struct ast_sip_publication; + +/*! + * \brief Callbacks that publication handlers will define + */ +struct ast_sip_publish_handler { + /*! \brief The name of the event this handler deals with */ + const char *event_name; + + /*! \brief Publications */ + struct ao2_container *publications; + + /*! + * \brief Called when a PUBLISH to establish a new publication arrives. + * + * \param endpoint The endpoint from whom the PUBLISH arrived + * \param rdata The PUBLISH request + * \retval NULL PUBLISH was not accepted + * \retval non-NULL New publication + * + * \note The callback is expected to send a response for the PUBLISH in success cases. + */ + struct ast_sip_publication *(*new_publication)(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata); + + /*! + * \brief Called when a PUBLISH for an existing publication arrives. + * + * This PUBLISH may be intending to change state or it may be simply renewing + * the publication since the publication is nearing expiration. The callback + * is expected to send a response to the PUBLISH. + * + * \param pub The publication on which the PUBLISH arrived + * \param rdata The PUBLISH request + * \retval 0 Publication was accepted + * \retval non-zero Publication was denied + * + * \note The callback is expected to send a response for the PUBLISH. + */ + int (*publish_refresh)(struct ast_sip_publication *pub, pjsip_rx_data *rdata); + + /*! + * \brief Called when a publication has reached its expiration. + */ + void (*publish_expire)(struct ast_sip_publication *pub); + + /*! + * \brief Called when a PUBLISH arrives to terminate a publication. + * + * \param pub The publication that is terminating + * \param rdata The PUBLISH request terminating the publication + * + * \note The callback is expected to send a response for the PUBLISH. + */ + void (*publish_termination)(struct ast_sip_publication *pub, pjsip_rx_data *rdata); + + AST_LIST_ENTRY(ast_sip_publish_handler) next; +}; + +/*! + * \brief Create a new publication + * + * Publication handlers should call this when a PUBLISH arrives to establish a new publication. + * + * \param endpoint The endpoint from whom the PUBLISHes arrive + * \param rdata The PUBLISH that established the publication + * \retval NULL Failed to create a publication + * \retval non-NULL The newly-created publication + */ +struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata); + +/*! + * \brief Given a publication, get the associated endpoint + * + * \param pub The publication + * \retval NULL Failure + * \retval non-NULL The associated endpoint + */ +struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub); + +/*! + * \brief Create a response to an inbound PUBLISH + * + * The created response must be sent using ast_sip_publication_send_response + * + * \param pub The publication + * \param status code The status code to place in the response + * \param rdata The request to which the response is being made + * \param[out] tdata The created response + */ +int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata, + pjsip_tx_data **tdata); + +/*! + * \brief Send a response for an inbound PUBLISH + * + * \param pub The publication + * \param rdata The request to which the response was made + * \param tdata The response to the request + */ +pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata, + pjsip_tx_data *tdata); + +/*! + * \brief Register a publish handler + * + * \retval 0 Handler was registered successfully + * \retval non-zero Handler was not registered successfully + */ +int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler); + +/*! + * \brief Unregister a publish handler + */ +void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler); + +/*! + * \brief Add a datastore to a SIP publication + * + * Note that SIP uses reference counted datastores. The datastore passed into this function + * must have been allocated using ao2_alloc() or there will be serious problems. + * + * \param publication The publication to add the datastore to + * \param datastore The datastore to be added to the subscription + * \retval 0 Success + * \retval -1 Failure + */ +int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore); + +/*! + * \brief Retrieve a publication datastore + * + * The datastore retrieved will have its reference count incremented. When the caller is done + * with the datastore, the reference counted needs to be decremented using ao2_ref(). + * + * \param publication The publication from which to retrieve the datastore + * \param name The name of the datastore to retrieve + * \retval NULL Failed to find the specified datastore + * \retval non-NULL The specified datastore + */ +struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name); + +/*! + * \brief Remove a publication datastore from the publication + * + * This operation may cause the datastore's free() callback to be called if the reference + * count reaches zero. + * + * \param publication The publication to remove the datastore from + * \param name The name of the datastore to remove + */ +void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name); + /*! * \brief Opaque structure representing an RFC 3265 SIP subscription */ struct ast_sip_subscription; - + /*! * \brief Role for the subscription that is being created */ enum ast_sip_subscription_role { - /* Sending SUBSCRIBEs, receiving NOTIFYs */ - AST_SIP_SUBSCRIBER, - /* Sending NOTIFYs, receiving SUBSCRIBEs */ - AST_SIP_NOTIFIER, + /* Sending SUBSCRIBEs, receiving NOTIFYs */ + AST_SIP_SUBSCRIBER, + /* Sending NOTIFYs, receiving SUBSCRIBEs */ + AST_SIP_NOTIFIER, }; /*! @@ -56,147 +211,147 @@ enum ast_sip_subscription_role { * not provide it with any additional data. */ struct ast_sip_subscription_response_data { - /*! Status code of the response */ - int status_code; - /*! Optional status text */ - const char *status_text; - /*! Optional additional headers to add to the response */ - struct ast_variable *headers; - /*! Optional body to add to the response */ - struct ast_sip_body *body; + /*! Status code of the response */ + int status_code; + /*! Optional status text */ + const char *status_text; + /*! Optional additional headers to add to the response */ + struct ast_variable *headers; + /*! Optional body to add to the response */ + struct ast_sip_body *body; }; #define AST_SIP_MAX_ACCEPT 32 struct ast_sip_subscription_handler { - /*! The name of the event this handler deals with */ - const char *event_name; - /*! The types of body this handler accepts */ - const char *accept[AST_SIP_MAX_ACCEPT]; - - /*! - * \brief Called when a subscription is to be destroyed - * - * This is a subscriber and notifier callback. - * - * The handler is not expected to send any sort of requests or responses - * during this callback. The handler MUST, however, begin the destruction + /*! The name of the event this handler deals with */ + const char *event_name; + /*! The types of body this handler accepts */ + const char *accept[AST_SIP_MAX_ACCEPT]; + + /*! + * \brief Called when a subscription is to be destroyed + * + * This is a subscriber and notifier callback. + * + * The handler is not expected to send any sort of requests or responses + * during this callback. The handler MUST, however, begin the destruction * process for the subscription during this callback. - */ + */ void (*subscription_shutdown)(struct ast_sip_subscription *subscription); - - /*! - * \brief Called when a SUBSCRIBE arrives in order to create a new subscription - * - * This is a notifier callback. - * - * If the notifier wishes to accept the subscription, then it can create - * a new ast_sip_subscription to do so. - * - * If the notifier chooses to create a new subscription, then it must accept - * the incoming subscription using pjsip_evsub_accept() and it must also - * send an initial NOTIFY with the current subscription state. - * - * \param endpoint The endpoint from which we received the SUBSCRIBE - * \param rdata The SUBSCRIBE request - * \retval NULL The SUBSCRIBE has not been accepted - * \retval non-NULL The newly-created subscription - */ - struct ast_sip_subscription *(*new_subscribe)(struct ast_sip_endpoint *endpoint, - pjsip_rx_data *rdata); - - /*! - * \brief Called when an endpoint renews a subscription. - * - * This is a notifier callback. - * - * Because of the way that the PJSIP evsub framework works, it will automatically - * send a response to the SUBSCRIBE. However, the subscription handler must send + + /*! + * \brief Called when a SUBSCRIBE arrives in order to create a new subscription + * + * This is a notifier callback. + * + * If the notifier wishes to accept the subscription, then it can create + * a new ast_sip_subscription to do so. + * + * If the notifier chooses to create a new subscription, then it must accept + * the incoming subscription using pjsip_evsub_accept() and it must also + * send an initial NOTIFY with the current subscription state. + * + * \param endpoint The endpoint from which we received the SUBSCRIBE + * \param rdata The SUBSCRIBE request + * \retval NULL The SUBSCRIBE has not been accepted + * \retval non-NULL The newly-created subscription + */ + struct ast_sip_subscription *(*new_subscribe)(struct ast_sip_endpoint *endpoint, + pjsip_rx_data *rdata); + + /*! + * \brief Called when an endpoint renews a subscription. + * + * This is a notifier callback. + * + * Because of the way that the PJSIP evsub framework works, it will automatically + * send a response to the SUBSCRIBE. However, the subscription handler must send * a NOTIFY with the current subscription state when this callback is called. * * The response_data that is passed into this callback is used to craft what should * be in the response to the incoming SUBSCRIBE. It is initialized with a 200 status * code and all other parameters are empty. - * - * \param sub The subscription that is being renewed - * \param rdata The SUBSCRIBE request in question + * + * \param sub The subscription that is being renewed + * \param rdata The SUBSCRIBE request in question * \param[out] response_data Data pertaining to the SIP response that should be * sent to the SUBSCRIBE - */ - void (*resubscribe)(struct ast_sip_subscription *sub, - pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data); - - /*! - * \brief Called when a subscription times out. - * - * This is a notifier callback - * - * This indicates that the subscription has timed out. The subscription handler is - * expected to send a NOTIFY that terminates the subscription. - * - * \param sub The subscription that has timed out - */ - void (*subscription_timeout)(struct ast_sip_subscription *sub); - - /*! - * \brief Called when a subscription is terminated via a SUBSCRIBE or NOTIFY request - * - * This is a notifier and subscriber callback. - * - * The PJSIP subscription framework will automatically send the response to the - * request. If a notifier receives this callback, then the subscription handler + */ + void (*resubscribe)(struct ast_sip_subscription *sub, + pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data); + + /*! + * \brief Called when a subscription times out. + * + * This is a notifier callback + * + * This indicates that the subscription has timed out. The subscription handler is + * expected to send a NOTIFY that terminates the subscription. + * + * \param sub The subscription that has timed out + */ + void (*subscription_timeout)(struct ast_sip_subscription *sub); + + /*! + * \brief Called when a subscription is terminated via a SUBSCRIBE or NOTIFY request + * + * This is a notifier and subscriber callback. + * + * The PJSIP subscription framework will automatically send the response to the + * request. If a notifier receives this callback, then the subscription handler * is expected to send a final NOTIFY to terminate the subscription. - * - * \param sub The subscription being terminated - * \param rdata The request that terminated the subscription - */ - void (*subscription_terminated)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata); - - /*! - * \brief Called when a subscription handler's outbound NOTIFY receives a response - * - * This is a notifier callback. - * - * \param sub The subscription - * \param rdata The NOTIFY response - */ - void (*notify_response)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata); - - /*! - * \brief Called when a subscription handler receives an inbound NOTIFY - * - * This is a subscriber callback. - * - * Because of the way that the PJSIP evsub framework works, it will automatically - * send a response to the NOTIFY. By default this will be a 200 OK response, but - * this callback can change details of the response by returning response data - * to use. + * + * \param sub The subscription being terminated + * \param rdata The request that terminated the subscription + */ + void (*subscription_terminated)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata); + + /*! + * \brief Called when a subscription handler's outbound NOTIFY receives a response + * + * This is a notifier callback. + * + * \param sub The subscription + * \param rdata The NOTIFY response + */ + void (*notify_response)(struct ast_sip_subscription *sub, pjsip_rx_data *rdata); + + /*! + * \brief Called when a subscription handler receives an inbound NOTIFY + * + * This is a subscriber callback. + * + * Because of the way that the PJSIP evsub framework works, it will automatically + * send a response to the NOTIFY. By default this will be a 200 OK response, but + * this callback can change details of the response by returning response data + * to use. * * The response_data that is passed into this callback is used to craft what should * be in the response to the incoming SUBSCRIBE. It is initialized with a 200 status * code and all other parameters are empty. - * - * \param sub The subscription - * \param rdata The NOTIFY request + * + * \param sub The subscription + * \param rdata The NOTIFY request * \param[out] response_data Data pertaining to the SIP response that should be * sent to the SUBSCRIBE - */ - void (*notify_request)(struct ast_sip_subscription *sub, - pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data); - - /*! - * \brief Called when it is time for a subscriber to resubscribe - * - * This is a subscriber callback. - * - * The subscriber can reresh the subscription using the pjsip_evsub_initiate() - * function. - * - * \param sub The subscription to refresh - * \retval 0 Success - * \retval non-zero Failure - */ - int (*refresh_subscription)(struct ast_sip_subscription *sub); + */ + void (*notify_request)(struct ast_sip_subscription *sub, + pjsip_rx_data *rdata, struct ast_sip_subscription_response_data *response_data); + + /*! + * \brief Called when it is time for a subscriber to resubscribe + * + * This is a subscriber callback. + * + * The subscriber can reresh the subscription using the pjsip_evsub_initiate() + * function. + * + * \param sub The subscription to refresh + * \retval 0 Success + * \retval non-zero Failure + */ + int (*refresh_subscription)(struct ast_sip_subscription *sub); AST_LIST_ENTRY(ast_sip_subscription_handler) next; }; @@ -221,9 +376,9 @@ struct ast_sip_subscription_handler { * \param rdata If acting as a notifier, the SUBSCRIBE request that triggered subscription creation */ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, - enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata); - - + enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata); + + /*! * \brief Get the endpoint that is associated with this subscription * @@ -234,7 +389,7 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su * \retval non-NULL The endpoint */ struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub); - + /*! * \brief Get the serializer for the subscription * @@ -246,7 +401,7 @@ struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscr * \retval non-NULL The subscription's serializer */ struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub); - + /*! * \brief Get the underlying PJSIP evsub structure * @@ -289,7 +444,7 @@ pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub); * \retval non-zero Failure */ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata); - + /*! * \brief Alternative for ast_datastore_alloc() * @@ -307,7 +462,7 @@ int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx * \retval non-NULL Newly allocated datastore */ struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid); - + /*! * \brief Add a datastore to a SIP subscription * @@ -320,7 +475,7 @@ struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_data * \retval -1 Failure */ int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore); - + /*! * \brief Retrieve a subscription datastore * @@ -333,7 +488,7 @@ int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription * \retval non-NULL The specified datastore */ struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name); - + /*! * \brief Remove a subscription datastore from the subscription * @@ -344,7 +499,7 @@ struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscrip * \param name The name of the datastore to remove */ void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name); - + /*! * \brief Register a subscription handler * @@ -352,7 +507,7 @@ void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscrip * \retval non-zero Handler was not registered successfully */ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler); - + /*! * \brief Unregister a subscription handler */ diff --git a/res/res_sip_pubsub.c b/res/res_sip_pubsub.c index 590c96c49..df6155469 100644 --- a/res/res_sip_pubsub.c +++ b/res/res_sip_pubsub.c @@ -38,14 +38,111 @@ #include "asterisk/datastore.h" #include "asterisk/uuid.h" #include "asterisk/taskprocessor.h" +#include "asterisk/sched.h" #include "asterisk/res_sip.h" -static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata); +static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata); -static struct pjsip_module sub_module = { +static struct pjsip_module pubsub_module = { .name = { "PubSub Module", 13 }, .priority = PJSIP_MOD_PRIORITY_APPLICATION, - .on_rx_request = sub_on_rx_request, + .on_rx_request = pubsub_on_rx_request, +}; + +static const pj_str_t str_event_name = { "Event", 5 }; + +/*! \brief Scheduler used for automatically expiring publications */ +static struct ast_sched_context *sched; + +/*! \brief Number of buckets for publications (on a per handler) */ +#define PUBLICATIONS_BUCKETS 37 + +/*! \brief Default expiration time for PUBLISH if one is not specified */ +#define DEFAULT_PUBLISH_EXPIRES 3600 + +/*! \brief Defined method for PUBLISH */ +const pjsip_method pjsip_publish_method = +{ + PJSIP_OTHER_METHOD, + { "PUBLISH", 7 } +}; + +/*! + * \brief The types of PUBLISH messages defined in RFC 3903 + */ +enum sip_publish_type { + /*! + * \brief Unknown + * + * \details + * This actually is not defined in RFC 3903. We use this as a constant + * to indicate that an incoming PUBLISH does not fit into any of the + * other categories and is thus invalid. + */ + SIP_PUBLISH_UNKNOWN, + + /*! + * \brief Initial + * + * \details + * The first PUBLISH sent. This will contain a non-zero Expires header + * as well as a body that indicates the current state of the endpoint + * that has sent the message. The initial PUBLISH is the only type + * of PUBLISH to not contain a Sip-If-Match header in it. + */ + SIP_PUBLISH_INITIAL, + + /*! + * \brief Refresh + * + * \details + * Used to keep a published state from expiring. This will contain a + * non-zero Expires header but no body since its purpose is not to + * update state. + */ + SIP_PUBLISH_REFRESH, + + /*! + * \brief Modify + * + * \details + * Used to change state from its previous value. This will contain + * a body updating the published state. May or may not contain an + * Expires header. + */ + SIP_PUBLISH_MODIFY, + + /*! + * \brief Remove + * + * \details + * Used to remove published state from an ESC. This will contain + * an Expires header set to 0 and likely no body. + */ + SIP_PUBLISH_REMOVE, +}; + +/*! + * Used to create new entity IDs by ESCs. + */ +static int esc_etag_counter; + +/*! + * \brief Structure representing a SIP publication + */ +struct ast_sip_publication { + /*! Publication datastores set up by handlers */ + struct ao2_container *datastores; + /*! \brief Entity tag for the publication */ + int entity_tag; + /*! \brief Handler for this publication */ + struct ast_sip_publish_handler *handler; + /*! \brief The endpoint with which the subscription is communicating */ + struct ast_sip_endpoint *endpoint; + /*! \brief Expiration time of the publication */ + int expires; + /*! \brief Scheduled item for expiration of publication */ + int sched_id; }; /*! @@ -114,7 +211,7 @@ static void subscription_destructor(void *obj) * remove the serializer will be successful. */ ast_sip_dialog_set_serializer(sub->dlg, NULL); - pjsip_dlg_dec_session(sub->dlg, &sub_module); + pjsip_dlg_dec_session(sub->dlg, &pubsub_module); } ast_taskprocessor_unreference(sub->serializer); } @@ -165,7 +262,7 @@ static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_ } struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, - enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) + enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) { struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor); pjsip_dialog *dlg; @@ -207,23 +304,23 @@ struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_su /* We keep a reference to the dialog until our subscription is destroyed. See * the subscription_destructor for more details */ - pjsip_dlg_inc_session(dlg, &sub_module); + pjsip_dlg_inc_session(dlg, &pubsub_module); sub->dlg = dlg; ast_sip_dialog_set_serializer(dlg, sub->serializer); - pjsip_evsub_set_mod_data(sub->evsub, sub_module.id, sub); + pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub); ao2_ref(endpoint, +1); sub->endpoint = endpoint; sub->handler = handler; return sub; } - + struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) { ast_assert(sub->endpoint != NULL); ao2_ref(sub->endpoint, +1); return sub->endpoint; } - + struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) { ast_assert(sub->serializer != NULL); @@ -294,12 +391,12 @@ struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_data ao2_ref(datastore, +1); return datastore; } - + int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore) { ast_assert(datastore != NULL); ast_assert(datastore->info != NULL); - ast_assert(ast_strlen_zero(datastore->uid) == 0); + ast_assert(!ast_strlen_zero(datastore->uid)); if (!ao2_link(subscription->datastores, datastore)) { return -1; @@ -317,16 +414,100 @@ void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscrip ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); } +int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore) +{ + ast_assert(datastore != NULL); + ast_assert(datastore->info != NULL); + ast_assert(!ast_strlen_zero(datastore->uid)); + + if (!ao2_link(publication->datastores, datastore)) { + return -1; + } + return 0; +} + +struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name) +{ + return ao2_find(publication->datastores, name, OBJ_KEY); +} + +void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name) +{ + ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); +} + +AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler); + +static int publication_hash_fn(const void *obj, const int flags) +{ + const struct ast_sip_publication *publication = obj; + const int *entity_tag = obj; + + return flags & OBJ_KEY ? *entity_tag : publication->entity_tag; +} + +static int publication_cmp_fn(void *obj, void *arg, int flags) +{ + const struct ast_sip_publication *publication1 = obj; + const struct ast_sip_publication *publication2 = arg; + const int *entity_tag = arg; + + return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ? + CMP_MATCH | CMP_STOP : 0); +} + +static void publish_add_handler(struct ast_sip_publish_handler *handler) +{ + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next); +} + +int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler) +{ + if (ast_strlen_zero(handler->event_name)) { + ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n"); + return -1; + } + + if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS, + publication_hash_fn, publication_cmp_fn))) { + ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n", + handler->event_name); + return -1; + } + + publish_add_handler(handler); + + ast_module_ref(ast_module_info->self); + + return 0; +} + +void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler) +{ + struct ast_sip_publish_handler *iter; + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) { + if (handler == iter) { + AST_RWLIST_REMOVE_CURRENT(next); + ao2_cleanup(handler->publications); + ast_module_unref(ast_module_info->self); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; +} + AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler); -static void add_handler(struct ast_sip_subscription_handler *handler) +static void sub_add_handler(struct ast_sip_subscription_handler *handler) { SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next); ast_module_ref(ast_module_info->self); } -static int handler_exists_for_event_name(const char *event_name) +static int sub_handler_exists_for_event_name(const char *event_name) { struct ast_sip_subscription_handler *iter; SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); @@ -358,7 +539,7 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h pj_cstr(&accept[i], handler->accept[i]); } - if (!handler_exists_for_event_name(handler->event_name)) { + if (!sub_handler_exists_for_event_name(handler->event_name)) { pj_str_t event; pj_cstr(&event, handler->event_name); @@ -366,14 +547,14 @@ int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *h if (!strcmp(handler->event_name, "message-summary")) { pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance()); } else { - pjsip_evsub_register_pkg(&sub_module, &event, DEFAULT_EXPIRES, i, accept); + pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); } } else { - pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &sub_module, PJSIP_H_ACCEPT, NULL, + pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL, i, accept); } - add_handler(handler); + sub_add_handler(handler); return 0; } @@ -391,7 +572,7 @@ void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler AST_RWLIST_TRAVERSE_SAFE_END; } -static struct ast_sip_subscription_handler *find_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept) +static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept) { struct ast_sip_subscription_handler *iter; int match = 0; @@ -429,9 +610,8 @@ static struct ast_sip_subscription_handler *find_handler(const char *event, char return iter; } -static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata) +static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) { - static const pj_str_t event_name = { "Event", 5 }; char event[32]; char accept[AST_SIP_MAX_ACCEPT][64]; pjsip_accept_hdr *accept_header; @@ -441,14 +621,10 @@ static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata) struct ast_sip_subscription *sub; int i; - if (pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) { - return PJ_FALSE; - } - endpoint = ast_pjsip_rdata_get_endpoint(rdata); ast_assert(endpoint != NULL); - event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &event_name, rdata->msg_info.msg->hdr.next); + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); if (!event_header) { ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n"); pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); @@ -467,9 +643,9 @@ static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata) ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); } - handler = find_handler(event, accept, accept_header->count); + handler = find_sub_handler(event, accept, accept_header->count); if (!handler) { - ast_log(LOG_WARNING, "No registered handler for event %s\n", event); + ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event); pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); return PJ_TRUE; } @@ -492,6 +668,265 @@ static pj_bool_t sub_on_rx_request(pjsip_rx_data *rdata) return PJ_TRUE; } +static struct ast_sip_publish_handler *find_pub_handler(const char *event) +{ + struct ast_sip_publish_handler *iter = NULL; + SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + + AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) { + if (strcmp(event, iter->event_name)) { + ast_debug(3, "Event %s does not match %s\n", event, iter->event_name); + continue; + } + ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name); + break; + } + + return iter; +} + +static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata, + pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id) +{ + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + if (etag_hdr) { + char etag[pj_strlen(&etag_hdr->hvalue) + 1]; + + ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag)); + + if (sscanf(etag, "%30d", entity_id) != 1) { + return SIP_PUBLISH_UNKNOWN; + } + } + + *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + + if (!(*expires)) { + return SIP_PUBLISH_REMOVE; + } else if (!etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_INITIAL; + } else if (etag_hdr && !rdata->msg_info.msg->body) { + return SIP_PUBLISH_REFRESH; + } else if (etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_MODIFY; + } + + return SIP_PUBLISH_UNKNOWN; +} + +static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, + struct ast_sip_publish_handler *handler) +{ + struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata); + + if (!publication) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + return NULL; + } + + publication->handler = handler; + + return publication; +} + +static int publish_expire_callback(void *data) +{ + RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup); + + publication->handler->publish_expire(publication); + + return 0; +} + +static int publish_expire(const void *data) +{ + struct ast_sip_publication *publication = (struct ast_sip_publication*)data; + + ao2_unlink(publication->handler->publications, publication); + publication->sched_id = -1; + + if (ast_sip_push_task(NULL, publish_expire_callback, publication)) { + ao2_cleanup(publication); + } + + return 0; +} + +static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) +{ + pjsip_event_hdr *event_header; + struct ast_sip_publish_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + char event[32]; + static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 }; + pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL); + enum sip_publish_type publish_type; + RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup); + int expires = 0, entity_id; + + endpoint = ast_pjsip_rdata_get_endpoint(rdata); + ast_assert(endpoint != NULL); + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + + handler = find_pub_handler(event); + if (!handler) { + ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id); + + /* If this is not an initial publish ensure that a publication is present */ + if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) { + if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) { + static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 }; + + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed, + NULL, NULL); + return PJ_TRUE; + } + + /* Per the RFC every response has to have a new entity tag */ + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + + /* Update the expires here so that the created responses will contain the correct value */ + publication->expires = expires; + } + + switch (publish_type) { + case SIP_PUBLISH_INITIAL: + publication = publish_request_initial(endpoint, rdata, handler); + break; + case SIP_PUBLISH_REFRESH: + case SIP_PUBLISH_MODIFY: + if (handler->publish_refresh(publication, rdata)) { + /* If an error occurs we want to terminate the publication */ + expires = 0; + } + break; + case SIP_PUBLISH_REMOVE: + handler->publish_termination(publication, rdata); + break; + case SIP_PUBLISH_UNKNOWN: + default: + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + break; + } + + if (publication) { + if (expires) { + ao2_link(handler->publications, publication); + + AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication, + ao2_ref(publication, -1), NULL, ao2_ref(publication, +1)); + } else { + AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1)); + } + } + + return PJ_TRUE; +} + +/*! \brief Internal destructor for publications */ +static void publication_destroy_fn(void *obj) +{ + struct ast_sip_publication *publication = obj; + + ast_debug(3, "Destroying SIP publication\n"); + + ao2_cleanup(publication->datastores); + ao2_cleanup(publication->endpoint); +} + +struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +{ + struct ast_sip_publication *publication; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + ast_assert(endpoint != NULL); + + if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) { + return NULL; + } + + if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) { + ao2_ref(publication, -1); + return NULL; + } + + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + ao2_ref(endpoint, +1); + publication->endpoint = endpoint; + publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + publication->sched_id = -1; + + return publication; +} + +struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub) +{ + return pub->endpoint; +} + +int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata, + pjsip_tx_data **tdata) +{ + if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) { + return -1; + } + + if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { + RAII_VAR(char *, entity_tag, NULL, ast_free_ptr); + RAII_VAR(char *, expires, NULL, ast_free_ptr); + + if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) || + (ast_asprintf(&expires, "%d", pub->expires) < 0)) { + pjsip_tx_data_dec_ref(*tdata); + return -1; + } + + ast_sip_add_header(*tdata, "SIP-ETag", entity_tag); + ast_sip_add_header(*tdata, "Expires", expires); + } + + return 0; +} + +pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata, + pjsip_tx_data *tdata) +{ + pj_status_t status; + pjsip_transaction *tsx; + + if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) { + return status; + } + + pjsip_tsx_recv_msg(tsx, rdata); + + return pjsip_tsx_send_msg(tsx, tdata); +} + +static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata) +{ + if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) { + return pubsub_on_rx_subscribe_request(rdata); + } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) { + return pubsub_on_rx_publish_request(rdata); + } + + return PJ_FALSE; +} + static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) { struct ast_sip_subscription *sub; @@ -499,7 +934,7 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) return; } - sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); if (!sub) { return; } @@ -516,12 +951,12 @@ static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) if (sub->handler->subscription_shutdown) { sub->handler->subscription_shutdown(sub); } - pjsip_evsub_set_mod_data(evsub, sub_module.id, NULL); + pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); } static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); if (!sub) { return; @@ -548,7 +983,7 @@ static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code, struct ast_variable *iter; for (iter = response_data->headers; iter; iter = iter->next) { pj_str_t header_name; - pj_str_t header_value; + pj_str_t header_value; pjsip_generic_string_hdr *hdr; pj_cstr(&header_name, iter->name); @@ -585,7 +1020,7 @@ static int response_data_changed(struct ast_sip_subscription_response_data *resp static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); struct ast_sip_subscription_response_data response_data = { .status_code = 200, }; @@ -607,7 +1042,7 @@ static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); struct ast_sip_subscription_response_data response_data = { .status_code = 200, }; @@ -637,7 +1072,7 @@ static int serialized_pubsub_on_client_refresh(void *userdata) static void pubsub_on_client_refresh(pjsip_evsub *evsub) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); ao2_ref(sub, +1); ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub); @@ -654,7 +1089,7 @@ static int serialized_pubsub_on_server_timeout(void *userdata) static void pubsub_on_server_timeout(pjsip_evsub *evsub) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, sub_module.id); + struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); ao2_ref(sub, +1); ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub); @@ -662,15 +1097,38 @@ static void pubsub_on_server_timeout(pjsip_evsub *evsub) static int load_module(void) { + static const pj_str_t str_PUBLISH = { "PUBLISH", 7 }; + pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint()); - if (ast_sip_register_service(&sub_module)) { - return AST_MODULE_LOAD_DECLINE; + + if (!(sched = ast_sched_context_create())) { + ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n"); + return AST_MODULE_LOAD_FAILURE; + } + + if (ast_sched_start_thread(sched)) { + ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n"); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; + } + + pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH); + + if (ast_sip_register_service(&pubsub_module)) { + ast_log(LOG_ERROR, "Could not register pubsub service\n"); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_FAILURE; } + return AST_MODULE_LOAD_SUCCESS; } static int unload_module(void) { + if (sched) { + ast_sched_context_destroy(sched); + } + return 0; } diff --git a/res/res_sip_pubsub.exports.in b/res/res_sip_pubsub.exports.in index 0ef193d8a..15838f2dc 100644 --- a/res/res_sip_pubsub.exports.in +++ b/res/res_sip_pubsub.exports.in @@ -12,6 +12,15 @@ LINKER_SYMBOL_PREFIXast_sip_subscription_remove_datastore; LINKER_SYMBOL_PREFIXast_sip_register_subscription_handler; LINKER_SYMBOL_PREFIXast_sip_unregister_subscription_handler; + LINKER_SYMBOL_PREFIXast_sip_create_publication; + LINKER_SYMBOL_PREFIXast_sip_publication_get_endpoint; + LINKER_SYMBOL_PREFIXast_sip_publication_create_response; + LINKER_SYMBOL_PREFIXast_sip_publication_send_response; + LINKER_SYMBOL_PREFIXast_sip_register_publish_handler; + LINKER_SYMBOL_PREFIXast_sip_unregister_publish_handler; + LINKER_SYMBOL_PREFIXast_sip_publication_add_datastore; + LINKER_SYMBOL_PREFIXast_sip_publication_get_datastore; + LINKER_SYMBOL_PREFIXast_sip_publication_remove_datastore; local: *; }; -- cgit v1.2.3