diff options
Diffstat (limited to 'res/res_pjsip_outbound_publish.c')
-rw-r--r-- | res/res_pjsip_outbound_publish.c | 789 |
1 files changed, 542 insertions, 247 deletions
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 51e8a06be..f37ce2305 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -27,6 +27,7 @@ #include <pjsip.h> #include <pjsip_simple.h> +#include "asterisk/res_pjproject.h" #include "asterisk/res_pjsip.h" #include "asterisk/res_pjsip_outbound_publish.h" #include "asterisk/module.h" @@ -94,6 +95,10 @@ <literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note> </description> </configOption> + <configOption name="multi_user" default="no"> + <synopsis>Enable multi-user support</synopsis> + <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description> + </configOption> <configOption name="type"> <synopsis>Must be of type 'outbound-publish'.</synopsis> </configOption> @@ -102,6 +107,8 @@ </configInfo> ***/ +static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE; + /*! \brief Queued outbound publish message */ struct sip_outbound_publish_message { /*! \brief Optional body */ @@ -112,6 +119,39 @@ struct sip_outbound_publish_message { char body_contents[0]; }; +/* + * A note about some of the object types used in this module: + * + * The reason we currently have 4 separate object types that relate to configuration, + * publishing, state, and client information is due to object lifetimes and order of + * destruction dependencies. + * + * Separation of concerns is a good thing and of course it makes sense to have a + * configuration object type as well as an object type wrapper around pjsip's publishing + * client class. There also may be run time state data that needs to be tracked, so + * again having something to handle that is prudent. However, it may be tempting to think + * "why not combine the state and client object types?" Especially seeing as how they have + * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit + * more awkward. + * + * Currently this module maintains a global container of current state objects. When this + * states container is replaced, or deleted, it un-references all contained objects. Any + * state with a reference left have probably been carried over from a reload/realtime fetch. + * States not carried over are destructed and the associated client (and all its publishers) + * get unpublished. + * + * This "unpublishing" goes through a careful process of unpublishing the client, all its + * publishers, and making sure all the appropriate references are removed in a sane order. + * This process is essentially kicked off with the destruction of the state. If the state + * and client objects were to be merged, where clients became the globally tracked object + * type, this "unpublishing" process would never start because of the multiple references + * held to the client object over it's lifetime. Meaning the global tracking container + * would remove its reference to the client object when done with it, but other sources + * would still be holding a reference to it (namely the datastore and publisher(s)). + * + * Thus at this time it is easier to keep them separate. + */ + /*! \brief Outbound publish information */ struct ast_sip_outbound_publish { /*! \brief Sorcery object details */ @@ -137,30 +177,43 @@ struct ast_sip_outbound_publish { unsigned int max_auth_attempts; /*! \brief Configured authentication credentials */ struct ast_sip_auth_vector outbound_auths; + /*! \brief The publishing client is used for multiple users when true */ + unsigned int multi_user; }; -/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */ -struct ast_sip_outbound_publish_client { +struct sip_outbound_publisher { + /*! \brief The client object that 'owns' this client + + \note any potential circular reference problems are accounted + for (see publisher alloc for more information) + */ + struct ast_sip_outbound_publish_client *owner; /*! \brief Underlying publish client */ pjsip_publishc *client; /*! \brief Timer entry for refreshing publish */ pj_timer_entry timer; - /*! \brief Publisher datastores set up by handlers */ - struct ao2_container *datastores; /*! \brief The number of auth attempts done */ unsigned int auth_attempts; /*! \brief Queue of outgoing publish messages to send*/ AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue; /*! \brief The message currently being sent */ struct sip_outbound_publish_message *sending; - /*! \brief Publish client has been fully started and event type informed */ - unsigned int started; /*! \brief Publish client should be destroyed */ unsigned int destroy; + /*! \brief User, if any, associated with the publisher */ + char user[0]; +}; + +/*! \brief Outbound publish client state information (persists for lifetime of a publish) */ +struct ast_sip_outbound_publish_client { /*! \brief Outbound publish information */ struct ast_sip_outbound_publish *publish; - /*! \brief The name of the transport to be used for the publish */ - char *transport_name; + /*! \brief Publisher datastores set up by handlers */ + struct ao2_container *datastores; + /*! \brief Container of all the client publishing objects */ + struct ao2_container *publishers; + /*! \brief Publishing has been fully started and event type informed */ + unsigned int started; }; /*! \brief Outbound publish state information (persists for lifetime of a publish) */ @@ -171,6 +224,20 @@ struct ast_sip_outbound_publish_state { char id[0]; }; +/*! + * \brief Used for locking while loading/reloading + * + * Mutli-user configurations make it so publishers can be dynamically added and + * removed. Publishers should not be added or removed during a [re]load since + * it could cause the current_clients container to be out of sync. Thus the + * reason for this lock. + */ +AST_RWLOCK_DEFINE_STATIC(load_lock); + +#define DEFAULT_PUBLISHER_BUCKETS 119 +AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user); +AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user); + /*! \brief Unloading data */ struct unloading_data { int is_unloading; @@ -238,6 +305,7 @@ static int outbound_publish_state_cmp(void *obj, void *arg, int flags) static struct ao2_container *get_publishes_and_update_state(void) { struct ao2_container *container; + SCOPED_WRLOCK(lock, &load_lock); container = ast_sorcery_retrieve_by_fields( ast_sip_get_sorcery(), "outbound-publish", @@ -274,22 +342,22 @@ static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_ return iter; } -/*! \brief Helper function which cancels the refresh timer on a client */ -static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client) +/*! \brief Helper function which cancels the refresh timer on a publisher */ +static void cancel_publish_refresh(struct sip_outbound_publisher *publisher) { - if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) { - /* The timer was successfully cancelled, drop the refcount of the client */ - ao2_ref(client, -1); + if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) { + /* The timer was successfully cancelled, drop the refcount of the publisher */ + ao2_ref(publisher, -1); } } /*! \brief Helper function which sets up the timer to send publication */ -static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration) +static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration) { - struct ast_sip_outbound_publish *publish = ao2_bump(client->publish); + struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish); pj_time_val delay = { .sec = 0, }; - cancel_publish_refresh(client); + cancel_publish_refresh(publisher); if (expiration > 0) { delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH; @@ -301,61 +369,83 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *cli delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH; } - ao2_ref(client, +1); - if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) { + ao2_ref(publisher, +1); + if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) { ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n"); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } ao2_ref(publish, -1); } +static int publisher_client_send(void *obj, void *arg, void *data, int flags); + /*! \brief Publish client timer callback function */ static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry) { - struct ast_sip_outbound_publish_client *client = entry->user_data; + struct sip_outbound_publisher *publisher = entry->user_data; - ao2_lock(client); - if (AST_LIST_EMPTY(&client->queue)) { + ao2_lock(publisher); + if (AST_LIST_EMPTY(&publisher->queue)) { + int res; /* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */ - ast_sip_publish_client_send(client, NULL); + publisher_client_send(publisher, NULL, &res, 0); } - ao2_unlock(client); + ao2_unlock(publisher); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } /*! \brief Task for cancelling a refresh timer */ static int cancel_refresh_timer_task(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; - cancel_publish_refresh(client); - ao2_ref(client, -1); + cancel_publish_refresh(publisher); + ao2_ref(publisher, -1); return 0; } +static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata) +{ + if (!ast_strlen_zero(publisher->owner->publish->transport)) { + pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; + ast_sip_set_tpselector_from_transport_name( + publisher->owner->publish->transport, &selector); + pjsip_tx_data_set_transport(tdata, &selector); + } +} + /*! \brief Task for sending an unpublish */ static int send_unpublish_task(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; pjsip_tx_data *tdata; - if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) { - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } - - pjsip_publishc_send(client->client, tdata); + if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) { + set_transport(publisher, tdata); + pjsip_publishc_send(publisher->client, tdata); } - ao2_ref(client, -1); + ao2_ref(publisher, -1); return 0; } +static void stop_publishing(struct ast_sip_outbound_publish_client *client, + struct ast_sip_event_publisher_handler *handler) +{ + if (!handler) { + handler = find_publisher_handler_for_event_name(client->publish->event); + } + + if (handler) { + handler->stop_publishing(client); + } +} + +static int cancel_and_unpublish(void *obj, void *arg, int flags); + /*! \brief Helper function which starts or stops publish clients when applicable */ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed) { @@ -389,15 +479,10 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand } else { state->client->started = 1; } - } else if (!handler && removed && !strcmp(publish->event, removed->event_name)) { - /* If the publisher client has been started but it is going away stop it */ - removed->stop_publishing(state->client); + } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) { + stop_publishing(state->client, removed); + ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL); state->client->started = 0; - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) { - ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n", - ast_sorcery_object_get_id(publish)); - ao2_ref(state->client, -1); - } } ao2_ref(publish, -1); ao2_ref(state, -1); @@ -583,19 +668,19 @@ void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_cli ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA); } -static int sip_publish_client_service_queue(void *data) +static int sip_publisher_service_queue(void *data) { - RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup); - SCOPED_AO2LOCK(lock, client); + RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup); + SCOPED_AO2LOCK(lock, publisher); struct sip_outbound_publish_message *message; pjsip_tx_data *tdata; pj_status_t status; - if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) { + if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) { return 0; } - if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) { + if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) { goto fatal; } @@ -605,13 +690,9 @@ static int sip_publish_client_service_queue(void *data) goto fatal; } - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } + set_transport(publisher, tdata); - status = pjsip_publishc_send(client->client, tdata); + status = pjsip_publishc_send(publisher->client, tdata); if (status == PJ_EBUSY) { /* We attempted to send the message but something else got there first */ goto service; @@ -619,30 +700,31 @@ static int sip_publish_client_service_queue(void *data) goto fatal; } - client->sending = message; + publisher->sending = message; return 0; fatal: - AST_LIST_REMOVE_HEAD(&client->queue, entry); + AST_LIST_REMOVE_HEAD(&publisher->queue, entry); ast_free(message); service: - if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) { - ao2_ref(client, -1); + if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + ao2_ref(publisher, -1); } return -1; } -int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, - const struct ast_sip_body *body) +static int publisher_client_send(void *obj, void *arg, void *data, int flags) { - SCOPED_AO2LOCK(lock, client); + struct sip_outbound_publisher *publisher = obj; + const struct ast_sip_body *body = arg; struct sip_outbound_publish_message *message; size_t type_len = 0, subtype_len = 0, body_text_len = 0; - int res; + int *res = data; - if (!client->client) { + *res = -1; + if (!publisher->client) { return -1; } @@ -668,31 +750,191 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, message->body.body_text = strcpy(dst, body->body_text); } - AST_LIST_INSERT_TAIL(&client->queue, message, entry); + AST_LIST_INSERT_TAIL(&publisher->queue, message, entry); - res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client)); - if (res) { - ao2_ref(client, -1); + *res = ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher)); + if (*res) { + ao2_ref(publisher, -1); } + return *res; +} + +int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, + const struct ast_sip_body *body) +{ + SCOPED_AO2LOCK(lock, client); + int res = 0; + + ao2_callback_data(client->publishers, OBJ_NODATA, + publisher_client_send, (void *)body, &res); return res; } +static int sip_outbound_publisher_set_uri( + pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri) +{ + pj_str_t tmp; + pjsip_uri *parsed; + pjsip_sip_uri *parsed_uri; + int size; + + pj_strdup2_with_null(pool, &tmp, uri); + if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) { + return -1; + } + + if (!(parsed_uri = pjsip_uri_get_uri(parsed))) { + return -1; + } + + if (!ast_strlen_zero(user)) { + pj_strdup2(pool, &parsed_uri->user, user); + } + + res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size); + if (!res_uri->ptr) { + return -1; + } + + if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr, + pjsip_max_url_size - 1)) <= 0) { + return -1; + } + res_uri->ptr[size] = '\0'; + res_uri->slen = size; + + return 0; +} + +static int sip_outbound_publisher_set_uris( + pj_pool_t *pool, struct sip_outbound_publisher *publisher, + pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri) +{ + struct ast_sip_outbound_publish *publish = publisher->owner->publish; + + if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) { + ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n", + publish->server_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + if (ast_strlen_zero(publish->to_uri)) { + to_uri->ptr = server_uri->ptr; + to_uri->slen = server_uri->slen; + } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) { + ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n", + publish->to_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + if (ast_strlen_zero(publish->from_uri)) { + from_uri->ptr = server_uri->ptr; + from_uri->slen = server_uri->slen; + } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) { + ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n", + publish->from_uri, ast_sorcery_object_get_id(publish)); + return -1; + } + + return 0; +} + +static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param); + +/*! \brief Helper function that allocates a pjsip publish client and configures it */ +static int sip_outbound_publisher_init(void *data) +{ + struct sip_outbound_publisher *publisher = data; + RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup); + pjsip_publishc_opt opt = { + .queue_request = PJ_FALSE, + }; + pj_pool_t *pool; + pj_str_t event, server_uri, to_uri, from_uri; + + if (publisher->client) { + return 0; + } + + if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, + ao2_bump(publisher), sip_outbound_publish_callback, + &publisher->client) != PJ_SUCCESS) { + ao2_ref(publisher, -1); + return -1; + } + + publish = ao2_bump(publisher->owner->publish); + + if (!ast_strlen_zero(publish->outbound_proxy)) { + pjsip_route_hdr route_set, *route; + static const pj_str_t ROUTE_HNAME = { "Route", 5 }; + + pj_list_init(&route_set); + + if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME, + (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) { + pjsip_publishc_destroy(publisher->client); + return -1; + } + pj_list_insert_nodes_before(&route_set, route); + + pjsip_publishc_set_route_set(publisher->client, &route_set); + } + + pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", + pjsip_max_url_size, pjsip_max_url_size); + if (!pool) { + ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n", + ast_sorcery_object_get_id(publish)); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) { + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + pj_cstr(&event, publish->event); + if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri, + publish->expiration != PJ_SUCCESS)) { + ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n", + ast_sorcery_object_get_id(publish)); + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + pjsip_publishc_destroy(publisher->client); + return -1; + } + + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + return 0; +} + +static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags) +{ + return sip_outbound_publisher_init(obj); +} + +static int sip_outbound_publisher_reinit_all(void *data) +{ + ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL); + return 0; +} + /*! \brief Destructor function for publish client */ -static void sip_outbound_publish_client_destroy(void *obj) +static void sip_outbound_publisher_destroy(void *obj) { - struct ast_sip_outbound_publish_client *client = obj; + struct sip_outbound_publisher *publisher = obj; struct sip_outbound_publish_message *message; /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */ - while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) { ast_free(message); } - ao2_cleanup(client->datastores); - ao2_cleanup(client->publish); - ast_free(client->transport_name); + ao2_cleanup(publisher->owner); /* if unloading the module and all objects have been unpublished send the signal to finish unloading */ @@ -705,67 +947,195 @@ static void sip_outbound_publish_client_destroy(void *obj) } } +static struct sip_outbound_publisher *sip_outbound_publisher_alloc( + struct ast_sip_outbound_publish_client *client, const char *user) +{ + struct sip_outbound_publisher *publisher; + + publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1, + sip_outbound_publisher_destroy); + if (!publisher) { + return NULL; + } + + /* + * Bump the ref to the client. This essentially creates a circular reference, + * but it is needed in order to make sure the client object doesn't get pulled + * out from under us when the publisher stops publishing. + * + * The circular reference is alleviated by calling cancel_and_unpublish for + * each client, from the state's destructor. By calling it there all references + * to the publishers should go to zero, thus calling the publisher's destructor. + * This in turn removes the client reference we added here. The state then removes + * its reference to the client, which should take it to zero. + */ + publisher->owner = ao2_bump(client); + publisher->timer.user_data = publisher; + publisher->timer.cb = sip_outbound_publish_timer_cb; + if (user) { + strcpy(publisher->user, user); + } else { + *publisher->user = '\0'; + } + + if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) { + ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n", + ast_sorcery_object_get_id(client->publish)); + ao2_ref(publisher, -1); + return NULL; + } + + return publisher; +} + +static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher( + struct ast_sip_outbound_publish_client *client, const char *user) +{ + struct sip_outbound_publisher *publisher = + sip_outbound_publisher_alloc(client, user); + + if (!publisher) { + return NULL; + } + + if (!ao2_link(client->publishers, publisher)) { + /* + * No need to bump the reference here. The task will take care of + * removing the reference. + */ + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, publisher)) { + ao2_ref(publisher, -1); + } + return NULL; + } + + return publisher; +} + +int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client, + const char *user, const struct ast_sip_body *body) +{ + struct sip_outbound_publisher *publisher; + int res; + + /* + * Lock before searching since there could be a race between searching and adding. + * Just use the load_lock since we might need to lock it anyway (if adding) and + * also it simplifies the code (otherwise we'd have to lock the publishers, no- + * lock the search and pass a flag to 'add publisher to no-lock the potential link). + */ + ast_rwlock_wrlock(&load_lock); + publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY); + if (!publisher) { + if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) { + ast_rwlock_unlock(&load_lock); + return -1; + } + } + ast_rwlock_unlock(&load_lock); + + publisher_client_send(publisher, (void *)body, &res, 0); + ao2_ref(publisher, -1); + return res; +} + +void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client, + const char *user) +{ + SCOPED_WRLOCK(lock, &load_lock); + ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA); +} + static int explicit_publish_destroy(void *data) { - struct ast_sip_outbound_publish_client *client = data; + struct sip_outbound_publisher *publisher = data; /* * If there is no pjsip publishing client then we obviously don't need * to destroy it. Also, the ref for the Asterisk publishing client that * pjsip had would not exist or should already be gone as well. */ - if (client->client) { - pjsip_publishc_destroy(client->client); - ao2_ref(client, -1); + if (publisher->client) { + pjsip_publishc_destroy(publisher->client); + ao2_ref(publisher, -1); } return 0; } /*! \brief Helper function which cancels and un-publishes a no longer used client */ -static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client) +static int cancel_and_unpublish(void *obj, void *arg, int flags) { - struct ast_sip_event_publisher_handler *handler; - SCOPED_AO2LOCK(lock, client); + struct sip_outbound_publisher *publisher = obj; + struct ast_sip_outbound_publish_client *client = publisher->owner; + + SCOPED_AO2LOCK(lock, publisher); if (!client->started) { - /* If the client was never started, there's nothing to unpublish, so just - * destroy the publication and remove its reference to the client. + /* If the publisher was never started, there's nothing to unpublish, so just + * destroy the publication and remove its reference to the publisher. */ - ast_sip_push_task(NULL, explicit_publish_destroy, client); + ast_sip_push_task(NULL, explicit_publish_destroy, publisher); return 0; } - handler = find_publisher_handler_for_event_name(client->publish->event); - if (handler) { - handler->stop_publishing(client); - } - - client->started = 0; - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) { + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */ - if (!client->sending) { - if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { + if (!publisher->sending) { + if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } } - client->destroy = 1; + publisher->destroy = 1; return 0; } +/*! \brief Destructor function for publish client */ +static void sip_outbound_publish_client_destroy(void *obj) +{ + struct ast_sip_outbound_publish_client *client = obj; + + ao2_cleanup(client->datastores); + + /* + * The client's publishers have already been unpublished and destroyed + * by this point, so it is safe to finally remove the reference to the + * publish object. The client needed to hold a reference to it until + * the publishers were done with it. + */ + ao2_cleanup(client->publish); +} + /*! \brief Destructor function for publish state */ static void sip_outbound_publish_state_destroy(void *obj) { struct ast_sip_outbound_publish_state *state = obj; - cancel_and_unpublish(state->client); + stop_publishing(state->client, NULL); + /* + * Since the state is being destroyed the associated client needs to also + * be destroyed. However simply removing the reference to the client will + * not initiate client destruction since the client's publisher(s) hold a + * reference to the client object as well. So we need to unpublish the + * the client's publishers here, which will remove the publisher's client + * reference during that process. + * + * That being said we don't want to remove the client's reference to the + * publish object just yet. We'll hold off on that until client destruction + * itself. This is because the publishers need access to the client's + * publish object while they are unpublishing. + */ + ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL); + ao2_cleanup(state->client->publishers); + + state->client->started = 0; ao2_cleanup(state->client); } @@ -799,126 +1169,31 @@ static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct a return 1; } -static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param); - -/*! \brief Helper function that allocates a pjsip publish client and configures it */ -static int sip_outbound_publish_client_alloc(void *data) -{ - struct ast_sip_outbound_publish_client *client = data; - RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup); - pjsip_publishc_opt opt = { - .queue_request = PJ_FALSE, - }; - pj_str_t event, server_uri, to_uri, from_uri; - pj_status_t status; - - if (client->client) { - return 0; - } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback, - &client->client) != PJ_SUCCESS) { - ao2_ref(client, -1); - return -1; - } - - publish = ao2_bump(client->publish); - - if (!ast_strlen_zero(publish->outbound_proxy)) { - pjsip_route_hdr route_set, *route; - static const pj_str_t ROUTE_HNAME = { "Route", 5 }; - - pj_list_init(&route_set); - - if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME, - (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) { - pjsip_publishc_destroy(client->client); - return -1; - } - pj_list_insert_nodes_before(&route_set, route); - - pjsip_publishc_set_route_set(client->client, &route_set); - } - - pj_cstr(&event, publish->event); - pj_cstr(&server_uri, publish->server_uri); - pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri)); - pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri)); - - status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri, - publish->expiration); - if (status == PJSIP_EINVALIDURI) { - pj_pool_t *pool; - pj_str_t tmp; - pjsip_uri *uri; - - pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256); - if (!pool) { - ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n", - ast_sorcery_object_get_id(publish)); - pjsip_publishc_destroy(client->client); - return -1; - } - - pj_strdup2_with_null(pool, &tmp, publish->server_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n", - publish->server_uri, ast_sorcery_object_get_id(publish)); - } - - if (!ast_strlen_zero(publish->to_uri)) { - pj_strdup2_with_null(pool, &tmp, publish->to_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n", - publish->to_uri, ast_sorcery_object_get_id(publish)); - } - } - - if (!ast_strlen_zero(publish->from_uri)) { - pj_strdup2_with_null(pool, &tmp, publish->from_uri); - uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0); - if (!uri) { - ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n", - publish->from_uri, ast_sorcery_object_get_id(publish)); - } - } - - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); - pjsip_publishc_destroy(client->client); - return -1; - } else if (status != PJ_SUCCESS) { - pjsip_publishc_destroy(client->client); - return -1; - } - - return 0; -} - /*! \brief Callback function for publish client responses */ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) { #define DESTROY_CLIENT() do { \ - pjsip_publishc_destroy(client->client); \ - client->client = NULL; \ - ao2_ref(client, -1); } while (0) + pjsip_publishc_destroy(publisher->client); \ + publisher->client = NULL; \ + ao2_ref(publisher, -1); } while (0) - RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup); - RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup); - SCOPED_AO2LOCK(lock, client); + RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup); + RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup); + SCOPED_AO2LOCK(lock, publisher); pjsip_tx_data *tdata; - if (client->destroy) { - if (client->sending) { - client->sending = NULL; + if (publisher->destroy) { + if (publisher->sending) { + publisher->sending = NULL; - if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { + if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { return; } ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); - ao2_ref(client, -1); + ao2_ref(publisher, -1); } - /* Once the destroy is called this callback will not get called any longer, so drop the client ref */ + /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */ DESTROY_CLIENT(); return; } @@ -928,16 +1203,12 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) if (!ast_sip_create_request_with_auth(&publish->outbound_auths, param->rdata, tsx->last_tx, &tdata)) { - if (!ast_strlen_zero(client->transport_name)) { - pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, }; - ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector); - pjsip_tx_data_set_transport(tdata, &selector); - } - pjsip_publishc_send(client->client, tdata); + set_transport(publisher, tdata); + pjsip_publishc_send(publisher->client, tdata); } - client->auth_attempts++; + publisher->auth_attempts++; - if (client->auth_attempts == publish->max_auth_attempts) { + if (publisher->auth_attempts == publish->max_auth_attempts) { DESTROY_CLIENT(); ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); @@ -947,18 +1218,18 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) return; } - client->auth_attempts = 0; + publisher->auth_attempts = 0; if (param->code == 412) { DESTROY_CLIENT(); - if (sip_outbound_publish_client_alloc(client)) { + if (sip_outbound_publisher_init(publisher)) { ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n", ast_sorcery_object_get_id(publish)); goto end; } /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */ - client->sending = NULL; + publisher->sending = NULL; } else if (param->code == 423) { /* Update the expiration with the new expiration time if available */ pjsip_expires_hdr *expires; @@ -971,33 +1242,33 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) goto end; } - pjsip_publishc_update_expires(client->client, expires->ivalue); - client->sending = NULL; - } else if (client->sending) { + pjsip_publishc_update_expires(publisher->client, expires->ivalue); + publisher->sending = NULL; + } else if (publisher->sending) { /* Remove the message currently being sent so that when the queue is serviced another will get sent */ - AST_LIST_REMOVE_HEAD(&client->queue, entry); - ast_free(client->sending); - client->sending = NULL; + AST_LIST_REMOVE_HEAD(&publisher->queue, entry); + ast_free(publisher->sending); + publisher->sending = NULL; if (!param->rdata) { ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); } } - if (AST_LIST_EMPTY(&client->queue)) { - schedule_publish_refresh(client, param->expiration); + if (AST_LIST_EMPTY(&publisher->queue)) { + schedule_publish_refresh(publisher, param->expiration); } end: - if (!client->client) { + if (!publisher->client) { struct sip_outbound_publish_message *message; - while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) { ast_free(message); } } else { - if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) { - ao2_ref(client, -1); + if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + ao2_ref(publisher, -1); } } } @@ -1085,27 +1356,18 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc( return NULL; } - state->client->timer.user_data = state->client; - state->client->timer.cb = sip_outbound_publish_timer_cb; - state->client->transport_name = ast_strdup(publish->transport); + state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn, + sip_outbound_publisher_cmp_fn); + if (!state->client->publishers) { + ao2_ref(state, -1); + return NULL; + } state->client->publish = ao2_bump(publish); strcpy(state->id, id); return state; } -static int initialize_publish_client(struct ast_sip_outbound_publish *publish, - struct ast_sip_outbound_publish_state *state) -{ - if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) { - ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n", - ast_sorcery_object_get_id(publish)); - return -1; - } - - return 0; -} - static int validate_publish_config(struct ast_sip_outbound_publish *publish) { if (ast_strlen_zero(publish->server_uri)) { @@ -1125,6 +1387,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, { struct ast_sip_outbound_publish *old_publish; + /* + * Don't maintain the old state/client objects if the multi_user option changed. + */ + if ((!publish->multi_user && current_state->client->publish->multi_user) || + (publish->multi_user && !current_state->client->publish->multi_user)) { + return 0; + } + + if (!can_reuse_publish(current_state->client->publish, publish)) { /* * Something significant has changed in the configuration, so we are @@ -1140,12 +1411,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, */ old_publish = current_state->client->publish; current_state->client->publish = publish; - if (initialize_publish_client(publish, current_state)) { + if (ast_sip_push_task_synchronous( + NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) { /* * If the state object fails to re-initialize then swap * the old publish info back in. */ current_state->client->publish = publish; + ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n", + ast_sorcery_object_get_id(current_state->client->publish)); return -1; } @@ -1170,6 +1444,7 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o struct ast_sip_outbound_publish *applied = obj; struct ast_sip_outbound_publish_state *current_state, *new_state; + struct sip_outbound_publisher *publisher = NULL; int res; /* @@ -1216,11 +1491,13 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o return -1; }; - if (initialize_publish_client(applied, new_state)) { + if (!applied->multi_user && + !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) { ADD_TO_NEW_STATES(current_state); ao2_ref(new_state, -1); return -1; } + ao2_cleanup(publisher); ADD_TO_NEW_STATES(new_state); ao2_cleanup(current_state); @@ -1238,6 +1515,9 @@ static int load_module(void) { CHECK_PJSIP_MODULE_LOADED(); + /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */ + ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size); + ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish"); ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish"); @@ -1257,6 +1537,7 @@ static int load_module(void) ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts)); ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport)); ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0); + ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user)); ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish"); @@ -1279,6 +1560,13 @@ static int reload_module(void) return 0; } +static int current_publishing_count(void *obj, void *arg, int flags) +{ + struct ast_sip_outbound_publish_state *state = obj; + unloading.count += ao2_container_count(state->client->publishers); + return 0; +} + static int unload_module(void) { struct timeval start = ast_tvnow(); @@ -1289,11 +1577,18 @@ static int unload_module(void) int res = 0; struct ao2_container *states = ao2_global_obj_ref(current_states); - if (!states || !(unloading.count = ao2_container_count(states))) { + if (!states) { return 0; } + + unloading.count = 0; + ao2_callback(states, OBJ_NODATA, current_publishing_count, NULL); ao2_ref(states, -1); + if (!unloading.count) { + return 0; + } + ast_mutex_init(&unloading.lock); ast_cond_init(&unloading.cond, NULL); ast_mutex_lock(&unloading.lock); |