summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES4
-rw-r--r--include/asterisk/res_pjsip_outbound_publish.h22
-rw-r--r--res/res_pjsip_outbound_publish.c789
3 files changed, 568 insertions, 247 deletions
diff --git a/CHANGES b/CHANGES
index 9c60b2443..628bde2ad 100644
--- a/CHANGES
+++ b/CHANGES
@@ -259,6 +259,10 @@ res_pjsip_outbound_registration
outbound registration, registration is retried at the given interval up to
'max_retries'.
+res_pjsip_outbound_publish
+------------------
+ * Added a new multi_user option that when set to 'yes' allows a given configuration
+ to be used for multiple users.
CEL Backends
------------------
diff --git a/include/asterisk/res_pjsip_outbound_publish.h b/include/asterisk/res_pjsip_outbound_publish.h
index b2038f58b..2831afb35 100644
--- a/include/asterisk/res_pjsip_outbound_publish.h
+++ b/include/asterisk/res_pjsip_outbound_publish.h
@@ -184,4 +184,26 @@ void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_cli
int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
const struct ast_sip_body *body);
+/*!
+* \brief Send an outgoing PUBLISH message based on the user
+*
+* \param client The publication client to send from
+* \param user The user to send to
+* \param body An optional body to add to the PUBLISH
+*
+* \retval -1 failure
+* \retval 0 success
+*/
+int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
+ const char *user, const struct ast_sip_body *body);
+
+/*!
+* \brief Remove the user from the client (stopping it from publishing)
+*
+* \param client The publication client
+* \param user The user to remove
+*/
+void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
+ const char *user);
+
#endif /* RES_PJSIP_OUTBOUND_PUBLISH_H */
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index 51e8a06be..f37ce2305 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -27,6 +27,7 @@
#include <pjsip.h>
#include <pjsip_simple.h>
+#include "asterisk/res_pjproject.h"
#include "asterisk/res_pjsip.h"
#include "asterisk/res_pjsip_outbound_publish.h"
#include "asterisk/module.h"
@@ -94,6 +95,10 @@
<literal>pjsip.conf</literal>. As with other <literal>res_pjsip</literal> modules, this will use the first available transport of the appropriate type if unconfigured.</para></note>
</description>
</configOption>
+ <configOption name="multi_user" default="no">
+ <synopsis>Enable multi-user support</synopsis>
+ <description><para>When enabled the user portion of the server uri is replaced by a dynamically created user</para></description>
+ </configOption>
<configOption name="type">
<synopsis>Must be of type 'outbound-publish'.</synopsis>
</configOption>
@@ -102,6 +107,8 @@
</configInfo>
***/
+static int pjsip_max_url_size = PJSIP_MAX_URL_SIZE;
+
/*! \brief Queued outbound publish message */
struct sip_outbound_publish_message {
/*! \brief Optional body */
@@ -112,6 +119,39 @@ struct sip_outbound_publish_message {
char body_contents[0];
};
+/*
+ * A note about some of the object types used in this module:
+ *
+ * The reason we currently have 4 separate object types that relate to configuration,
+ * publishing, state, and client information is due to object lifetimes and order of
+ * destruction dependencies.
+ *
+ * Separation of concerns is a good thing and of course it makes sense to have a
+ * configuration object type as well as an object type wrapper around pjsip's publishing
+ * client class. There also may be run time state data that needs to be tracked, so
+ * again having something to handle that is prudent. However, it may be tempting to think
+ * "why not combine the state and client object types?" Especially seeing as how they have
+ * a one-to-one relationship. The answer is, it's possible, but it'd make the code a bit
+ * more awkward.
+ *
+ * Currently this module maintains a global container of current state objects. When this
+ * states container is replaced, or deleted, it un-references all contained objects. Any
+ * state with a reference left have probably been carried over from a reload/realtime fetch.
+ * States not carried over are destructed and the associated client (and all its publishers)
+ * get unpublished.
+ *
+ * This "unpublishing" goes through a careful process of unpublishing the client, all its
+ * publishers, and making sure all the appropriate references are removed in a sane order.
+ * This process is essentially kicked off with the destruction of the state. If the state
+ * and client objects were to be merged, where clients became the globally tracked object
+ * type, this "unpublishing" process would never start because of the multiple references
+ * held to the client object over it's lifetime. Meaning the global tracking container
+ * would remove its reference to the client object when done with it, but other sources
+ * would still be holding a reference to it (namely the datastore and publisher(s)).
+ *
+ * Thus at this time it is easier to keep them separate.
+ */
+
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish {
/*! \brief Sorcery object details */
@@ -137,30 +177,43 @@ struct ast_sip_outbound_publish {
unsigned int max_auth_attempts;
/*! \brief Configured authentication credentials */
struct ast_sip_auth_vector outbound_auths;
+ /*! \brief The publishing client is used for multiple users when true */
+ unsigned int multi_user;
};
-/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
-struct ast_sip_outbound_publish_client {
+struct sip_outbound_publisher {
+ /*! \brief The client object that 'owns' this client
+
+ \note any potential circular reference problems are accounted
+ for (see publisher alloc for more information)
+ */
+ struct ast_sip_outbound_publish_client *owner;
/*! \brief Underlying publish client */
pjsip_publishc *client;
/*! \brief Timer entry for refreshing publish */
pj_timer_entry timer;
- /*! \brief Publisher datastores set up by handlers */
- struct ao2_container *datastores;
/*! \brief The number of auth attempts done */
unsigned int auth_attempts;
/*! \brief Queue of outgoing publish messages to send*/
AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
/*! \brief The message currently being sent */
struct sip_outbound_publish_message *sending;
- /*! \brief Publish client has been fully started and event type informed */
- unsigned int started;
/*! \brief Publish client should be destroyed */
unsigned int destroy;
+ /*! \brief User, if any, associated with the publisher */
+ char user[0];
+};
+
+/*! \brief Outbound publish client state information (persists for lifetime of a publish) */
+struct ast_sip_outbound_publish_client {
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish *publish;
- /*! \brief The name of the transport to be used for the publish */
- char *transport_name;
+ /*! \brief Publisher datastores set up by handlers */
+ struct ao2_container *datastores;
+ /*! \brief Container of all the client publishing objects */
+ struct ao2_container *publishers;
+ /*! \brief Publishing has been fully started and event type informed */
+ unsigned int started;
};
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
@@ -171,6 +224,20 @@ struct ast_sip_outbound_publish_state {
char id[0];
};
+/*!
+ * \brief Used for locking while loading/reloading
+ *
+ * Mutli-user configurations make it so publishers can be dynamically added and
+ * removed. Publishers should not be added or removed during a [re]load since
+ * it could cause the current_clients container to be out of sync. Thus the
+ * reason for this lock.
+ */
+AST_RWLOCK_DEFINE_STATIC(load_lock);
+
+#define DEFAULT_PUBLISHER_BUCKETS 119
+AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user);
+AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user);
+
/*! \brief Unloading data */
struct unloading_data {
int is_unloading;
@@ -238,6 +305,7 @@ static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
static struct ao2_container *get_publishes_and_update_state(void)
{
struct ao2_container *container;
+ SCOPED_WRLOCK(lock, &load_lock);
container = ast_sorcery_retrieve_by_fields(
ast_sip_get_sorcery(), "outbound-publish",
@@ -274,22 +342,22 @@ static struct ast_sip_event_publisher_handler *find_publisher_handler_for_event_
return iter;
}
-/*! \brief Helper function which cancels the refresh timer on a client */
-static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *client)
+/*! \brief Helper function which cancels the refresh timer on a publisher */
+static void cancel_publish_refresh(struct sip_outbound_publisher *publisher)
{
- if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &client->timer)) {
- /* The timer was successfully cancelled, drop the refcount of the client */
- ao2_ref(client, -1);
+ if (pj_timer_heap_cancel(pjsip_endpt_get_timer_heap(ast_sip_get_pjsip_endpoint()), &publisher->timer)) {
+ /* The timer was successfully cancelled, drop the refcount of the publisher */
+ ao2_ref(publisher, -1);
}
}
/*! \brief Helper function which sets up the timer to send publication */
-static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, int expiration)
+static void schedule_publish_refresh(struct sip_outbound_publisher *publisher, int expiration)
{
- struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
+ struct ast_sip_outbound_publish *publish = ao2_bump(publisher->owner->publish);
pj_time_val delay = { .sec = 0, };
- cancel_publish_refresh(client);
+ cancel_publish_refresh(publisher);
if (expiration > 0) {
delay.sec = expiration - PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
@@ -301,61 +369,83 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *cli
delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
}
- ao2_ref(client, +1);
- if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
+ ao2_ref(publisher, +1);
+ if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publisher->timer, &delay) != PJ_SUCCESS) {
ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
ao2_ref(publish, -1);
}
+static int publisher_client_send(void *obj, void *arg, void *data, int flags);
+
/*! \brief Publish client timer callback function */
static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj_timer_entry *entry)
{
- struct ast_sip_outbound_publish_client *client = entry->user_data;
+ struct sip_outbound_publisher *publisher = entry->user_data;
- ao2_lock(client);
- if (AST_LIST_EMPTY(&client->queue)) {
+ ao2_lock(publisher);
+ if (AST_LIST_EMPTY(&publisher->queue)) {
+ int res;
/* If there are no outstanding messages send an empty PUBLISH message so our publication doesn't expire */
- ast_sip_publish_client_send(client, NULL);
+ publisher_client_send(publisher, NULL, &res, 0);
}
- ao2_unlock(client);
+ ao2_unlock(publisher);
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
/*! \brief Task for cancelling a refresh timer */
static int cancel_refresh_timer_task(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
- cancel_publish_refresh(client);
- ao2_ref(client, -1);
+ cancel_publish_refresh(publisher);
+ ao2_ref(publisher, -1);
return 0;
}
+static void set_transport(struct sip_outbound_publisher *publisher, pjsip_tx_data *tdata)
+{
+ if (!ast_strlen_zero(publisher->owner->publish->transport)) {
+ pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
+ ast_sip_set_tpselector_from_transport_name(
+ publisher->owner->publish->transport, &selector);
+ pjsip_tx_data_set_transport(tdata, &selector);
+ }
+}
+
/*! \brief Task for sending an unpublish */
static int send_unpublish_task(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
pjsip_tx_data *tdata;
- if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
- if (!ast_strlen_zero(client->transport_name)) {
- pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
- ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
- pjsip_tx_data_set_transport(tdata, &selector);
- }
-
- pjsip_publishc_send(client->client, tdata);
+ if (pjsip_publishc_unpublish(publisher->client, &tdata) == PJ_SUCCESS) {
+ set_transport(publisher, tdata);
+ pjsip_publishc_send(publisher->client, tdata);
}
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
return 0;
}
+static void stop_publishing(struct ast_sip_outbound_publish_client *client,
+ struct ast_sip_event_publisher_handler *handler)
+{
+ if (!handler) {
+ handler = find_publisher_handler_for_event_name(client->publish->event);
+ }
+
+ if (handler) {
+ handler->stop_publishing(client);
+ }
+}
+
+static int cancel_and_unpublish(void *obj, void *arg, int flags);
+
/*! \brief Helper function which starts or stops publish clients when applicable */
static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
{
@@ -389,15 +479,10 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
} else {
state->client->started = 1;
}
- } else if (!handler && removed && !strcmp(publish->event, removed->event_name)) {
- /* If the publisher client has been started but it is going away stop it */
- removed->stop_publishing(state->client);
+ } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
+ stop_publishing(state->client, removed);
+ ao2_callback(state->client->publishers, OBJ_NODATA, cancel_and_unpublish, NULL);
state->client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
- ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
- ast_sorcery_object_get_id(publish));
- ao2_ref(state->client, -1);
- }
}
ao2_ref(publish, -1);
ao2_ref(state, -1);
@@ -583,19 +668,19 @@ void ast_sip_publish_client_remove_datastore(struct ast_sip_outbound_publish_cli
ao2_find(client->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
}
-static int sip_publish_client_service_queue(void *data)
+static int sip_publisher_service_queue(void *data)
{
- RAII_VAR(struct ast_sip_outbound_publish_client *, client, data, ao2_cleanup);
- SCOPED_AO2LOCK(lock, client);
+ RAII_VAR(struct sip_outbound_publisher *, publisher, data, ao2_cleanup);
+ SCOPED_AO2LOCK(lock, publisher);
struct sip_outbound_publish_message *message;
pjsip_tx_data *tdata;
pj_status_t status;
- if (client->destroy || client->sending || !(message = AST_LIST_FIRST(&client->queue))) {
+ if (publisher->destroy || publisher->sending || !(message = AST_LIST_FIRST(&publisher->queue))) {
return 0;
}
- if (pjsip_publishc_publish(client->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
+ if (pjsip_publishc_publish(publisher->client, PJ_FALSE, &tdata) != PJ_SUCCESS) {
goto fatal;
}
@@ -605,13 +690,9 @@ static int sip_publish_client_service_queue(void *data)
goto fatal;
}
- if (!ast_strlen_zero(client->transport_name)) {
- pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
- ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
- pjsip_tx_data_set_transport(tdata, &selector);
- }
+ set_transport(publisher, tdata);
- status = pjsip_publishc_send(client->client, tdata);
+ status = pjsip_publishc_send(publisher->client, tdata);
if (status == PJ_EBUSY) {
/* We attempted to send the message but something else got there first */
goto service;
@@ -619,30 +700,31 @@ static int sip_publish_client_service_queue(void *data)
goto fatal;
}
- client->sending = message;
+ publisher->sending = message;
return 0;
fatal:
- AST_LIST_REMOVE_HEAD(&client->queue, entry);
+ AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
ast_free(message);
service:
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
- ao2_ref(client, -1);
+ if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) {
+ ao2_ref(publisher, -1);
}
return -1;
}
-int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
- const struct ast_sip_body *body)
+static int publisher_client_send(void *obj, void *arg, void *data, int flags)
{
- SCOPED_AO2LOCK(lock, client);
+ struct sip_outbound_publisher *publisher = obj;
+ const struct ast_sip_body *body = arg;
struct sip_outbound_publish_message *message;
size_t type_len = 0, subtype_len = 0, body_text_len = 0;
- int res;
+ int *res = data;
- if (!client->client) {
+ *res = -1;
+ if (!publisher->client) {
return -1;
}
@@ -668,31 +750,191 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
message->body.body_text = strcpy(dst, body->body_text);
}
- AST_LIST_INSERT_TAIL(&client->queue, message, entry);
+ AST_LIST_INSERT_TAIL(&publisher->queue, message, entry);
- res = ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client));
- if (res) {
- ao2_ref(client, -1);
+ *res = ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher));
+ if (*res) {
+ ao2_ref(publisher, -1);
}
+ return *res;
+}
+
+int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
+ const struct ast_sip_body *body)
+{
+ SCOPED_AO2LOCK(lock, client);
+ int res = 0;
+
+ ao2_callback_data(client->publishers, OBJ_NODATA,
+ publisher_client_send, (void *)body, &res);
return res;
}
+static int sip_outbound_publisher_set_uri(
+ pj_pool_t *pool, const char *uri, const char *user, pj_str_t *res_uri)
+{
+ pj_str_t tmp;
+ pjsip_uri *parsed;
+ pjsip_sip_uri *parsed_uri;
+ int size;
+
+ pj_strdup2_with_null(pool, &tmp, uri);
+ if (!(parsed = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0))) {
+ return -1;
+ }
+
+ if (!(parsed_uri = pjsip_uri_get_uri(parsed))) {
+ return -1;
+ }
+
+ if (!ast_strlen_zero(user)) {
+ pj_strdup2(pool, &parsed_uri->user, user);
+ }
+
+ res_uri->ptr = (char*) pj_pool_alloc(pool, pjsip_max_url_size);
+ if (!res_uri->ptr) {
+ return -1;
+ }
+
+ if ((size = pjsip_uri_print(PJSIP_URI_IN_OTHER, parsed_uri, res_uri->ptr,
+ pjsip_max_url_size - 1)) <= 0) {
+ return -1;
+ }
+ res_uri->ptr[size] = '\0';
+ res_uri->slen = size;
+
+ return 0;
+}
+
+static int sip_outbound_publisher_set_uris(
+ pj_pool_t *pool, struct sip_outbound_publisher *publisher,
+ pj_str_t *server_uri, pj_str_t *to_uri, pj_str_t *from_uri)
+{
+ struct ast_sip_outbound_publish *publish = publisher->owner->publish;
+
+ if (sip_outbound_publisher_set_uri(pool, publish->server_uri, publisher->user, server_uri)) {
+ ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
+ publish->server_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ if (ast_strlen_zero(publish->to_uri)) {
+ to_uri->ptr = server_uri->ptr;
+ to_uri->slen = server_uri->slen;
+ } else if (sip_outbound_publisher_set_uri(pool, publish->to_uri, publisher->user, to_uri)) {
+ ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
+ publish->to_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ if (ast_strlen_zero(publish->from_uri)) {
+ from_uri->ptr = server_uri->ptr;
+ from_uri->slen = server_uri->slen;
+ } else if (sip_outbound_publisher_set_uri(pool, publish->from_uri, publisher->user, from_uri)) {
+ ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
+ publish->from_uri, ast_sorcery_object_get_id(publish));
+ return -1;
+ }
+
+ return 0;
+}
+
+static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
+
+/*! \brief Helper function that allocates a pjsip publish client and configures it */
+static int sip_outbound_publisher_init(void *data)
+{
+ struct sip_outbound_publisher *publisher = data;
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
+ pjsip_publishc_opt opt = {
+ .queue_request = PJ_FALSE,
+ };
+ pj_pool_t *pool;
+ pj_str_t event, server_uri, to_uri, from_uri;
+
+ if (publisher->client) {
+ return 0;
+ }
+
+ if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt,
+ ao2_bump(publisher), sip_outbound_publish_callback,
+ &publisher->client) != PJ_SUCCESS) {
+ ao2_ref(publisher, -1);
+ return -1;
+ }
+
+ publish = ao2_bump(publisher->owner->publish);
+
+ if (!ast_strlen_zero(publish->outbound_proxy)) {
+ pjsip_route_hdr route_set, *route;
+ static const pj_str_t ROUTE_HNAME = { "Route", 5 };
+
+ pj_list_init(&route_set);
+
+ if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publisher->client), &ROUTE_HNAME,
+ (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+ pj_list_insert_nodes_before(&route_set, route);
+
+ pjsip_publishc_set_route_set(publisher->client, &route_set);
+ }
+
+ pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation",
+ pjsip_max_url_size, pjsip_max_url_size);
+ if (!pool) {
+ ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(publish));
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ if (sip_outbound_publisher_set_uris(pool, publisher, &server_uri, &from_uri, &to_uri)) {
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ pj_cstr(&event, publish->event);
+ if (pjsip_publishc_init(publisher->client, &event, &server_uri, &from_uri, &to_uri,
+ publish->expiration != PJ_SUCCESS)) {
+ ast_log(LOG_ERROR, "Failed to initialize publishing client on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(publish));
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ pjsip_publishc_destroy(publisher->client);
+ return -1;
+ }
+
+ pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
+ return 0;
+}
+
+static int sip_outbound_publisher_reinit(void *obj, void *arg, int flags)
+{
+ return sip_outbound_publisher_init(obj);
+}
+
+static int sip_outbound_publisher_reinit_all(void *data)
+{
+ ao2_callback(data, OBJ_NODATA, sip_outbound_publisher_reinit, NULL);
+ return 0;
+}
+
/*! \brief Destructor function for publish client */
-static void sip_outbound_publish_client_destroy(void *obj)
+static void sip_outbound_publisher_destroy(void *obj)
{
- struct ast_sip_outbound_publish_client *client = obj;
+ struct sip_outbound_publisher *publisher = obj;
struct sip_outbound_publish_message *message;
/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
- while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
ast_free(message);
}
- ao2_cleanup(client->datastores);
- ao2_cleanup(client->publish);
- ast_free(client->transport_name);
+ ao2_cleanup(publisher->owner);
/* if unloading the module and all objects have been unpublished
send the signal to finish unloading */
@@ -705,67 +947,195 @@ static void sip_outbound_publish_client_destroy(void *obj)
}
}
+static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
+ struct ast_sip_outbound_publish_client *client, const char *user)
+{
+ struct sip_outbound_publisher *publisher;
+
+ publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1,
+ sip_outbound_publisher_destroy);
+ if (!publisher) {
+ return NULL;
+ }
+
+ /*
+ * Bump the ref to the client. This essentially creates a circular reference,
+ * but it is needed in order to make sure the client object doesn't get pulled
+ * out from under us when the publisher stops publishing.
+ *
+ * The circular reference is alleviated by calling cancel_and_unpublish for
+ * each client, from the state's destructor. By calling it there all references
+ * to the publishers should go to zero, thus calling the publisher's destructor.
+ * This in turn removes the client reference we added here. The state then removes
+ * its reference to the client, which should take it to zero.
+ */
+ publisher->owner = ao2_bump(client);
+ publisher->timer.user_data = publisher;
+ publisher->timer.cb = sip_outbound_publish_timer_cb;
+ if (user) {
+ strcpy(publisher->user, user);
+ } else {
+ *publisher->user = '\0';
+ }
+
+ if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) {
+ ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(client->publish));
+ ao2_ref(publisher, -1);
+ return NULL;
+ }
+
+ return publisher;
+}
+
+static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
+ struct ast_sip_outbound_publish_client *client, const char *user)
+{
+ struct sip_outbound_publisher *publisher =
+ sip_outbound_publisher_alloc(client, user);
+
+ if (!publisher) {
+ return NULL;
+ }
+
+ if (!ao2_link(client->publishers, publisher)) {
+ /*
+ * No need to bump the reference here. The task will take care of
+ * removing the reference.
+ */
+ if (ast_sip_push_task(NULL, cancel_refresh_timer_task, publisher)) {
+ ao2_ref(publisher, -1);
+ }
+ return NULL;
+ }
+
+ return publisher;
+}
+
+int ast_sip_publish_client_user_send(struct ast_sip_outbound_publish_client *client,
+ const char *user, const struct ast_sip_body *body)
+{
+ struct sip_outbound_publisher *publisher;
+ int res;
+
+ /*
+ * Lock before searching since there could be a race between searching and adding.
+ * Just use the load_lock since we might need to lock it anyway (if adding) and
+ * also it simplifies the code (otherwise we'd have to lock the publishers, no-
+ * lock the search and pass a flag to 'add publisher to no-lock the potential link).
+ */
+ ast_rwlock_wrlock(&load_lock);
+ publisher = ao2_find(client->publishers, user, OBJ_SEARCH_KEY);
+ if (!publisher) {
+ if (!(publisher = sip_outbound_publish_client_add_publisher(client, user))) {
+ ast_rwlock_unlock(&load_lock);
+ return -1;
+ }
+ }
+ ast_rwlock_unlock(&load_lock);
+
+ publisher_client_send(publisher, (void *)body, &res, 0);
+ ao2_ref(publisher, -1);
+ return res;
+}
+
+void ast_sip_publish_client_remove(struct ast_sip_outbound_publish_client *client,
+ const char *user)
+{
+ SCOPED_WRLOCK(lock, &load_lock);
+ ao2_find(client->publishers, user, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
+}
+
static int explicit_publish_destroy(void *data)
{
- struct ast_sip_outbound_publish_client *client = data;
+ struct sip_outbound_publisher *publisher = data;
/*
* If there is no pjsip publishing client then we obviously don't need
* to destroy it. Also, the ref for the Asterisk publishing client that
* pjsip had would not exist or should already be gone as well.
*/
- if (client->client) {
- pjsip_publishc_destroy(client->client);
- ao2_ref(client, -1);
+ if (publisher->client) {
+ pjsip_publishc_destroy(publisher->client);
+ ao2_ref(publisher, -1);
}
return 0;
}
/*! \brief Helper function which cancels and un-publishes a no longer used client */
-static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
+static int cancel_and_unpublish(void *obj, void *arg, int flags)
{
- struct ast_sip_event_publisher_handler *handler;
- SCOPED_AO2LOCK(lock, client);
+ struct sip_outbound_publisher *publisher = obj;
+ struct ast_sip_outbound_publish_client *client = publisher->owner;
+
+ SCOPED_AO2LOCK(lock, publisher);
if (!client->started) {
- /* If the client was never started, there's nothing to unpublish, so just
- * destroy the publication and remove its reference to the client.
+ /* If the publisher was never started, there's nothing to unpublish, so just
+ * destroy the publication and remove its reference to the publisher.
*/
- ast_sip_push_task(NULL, explicit_publish_destroy, client);
+ ast_sip_push_task(NULL, explicit_publish_destroy, publisher);
return 0;
}
- handler = find_publisher_handler_for_event_name(client->publish->event);
- if (handler) {
- handler->stop_publishing(client);
- }
-
- client->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+ if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publisher))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
- if (!client->sending) {
- if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!publisher->sending) {
+ if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) {
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(client->publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
}
- client->destroy = 1;
+ publisher->destroy = 1;
return 0;
}
+/*! \brief Destructor function for publish client */
+static void sip_outbound_publish_client_destroy(void *obj)
+{
+ struct ast_sip_outbound_publish_client *client = obj;
+
+ ao2_cleanup(client->datastores);
+
+ /*
+ * The client's publishers have already been unpublished and destroyed
+ * by this point, so it is safe to finally remove the reference to the
+ * publish object. The client needed to hold a reference to it until
+ * the publishers were done with it.
+ */
+ ao2_cleanup(client->publish);
+}
+
/*! \brief Destructor function for publish state */
static void sip_outbound_publish_state_destroy(void *obj)
{
struct ast_sip_outbound_publish_state *state = obj;
- cancel_and_unpublish(state->client);
+ stop_publishing(state->client, NULL);
+ /*
+ * Since the state is being destroyed the associated client needs to also
+ * be destroyed. However simply removing the reference to the client will
+ * not initiate client destruction since the client's publisher(s) hold a
+ * reference to the client object as well. So we need to unpublish the
+ * the client's publishers here, which will remove the publisher's client
+ * reference during that process.
+ *
+ * That being said we don't want to remove the client's reference to the
+ * publish object just yet. We'll hold off on that until client destruction
+ * itself. This is because the publishers need access to the client's
+ * publish object while they are unpublishing.
+ */
+ ao2_callback(state->client->publishers, OBJ_NODATA | OBJ_UNLINK, cancel_and_unpublish, NULL);
+ ao2_cleanup(state->client->publishers);
+
+ state->client->started = 0;
ao2_cleanup(state->client);
}
@@ -799,126 +1169,31 @@ static int can_reuse_publish(struct ast_sip_outbound_publish *existing, struct a
return 1;
}
-static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
-
-/*! \brief Helper function that allocates a pjsip publish client and configures it */
-static int sip_outbound_publish_client_alloc(void *data)
-{
- struct ast_sip_outbound_publish_client *client = data;
- RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
- pjsip_publishc_opt opt = {
- .queue_request = PJ_FALSE,
- };
- pj_str_t event, server_uri, to_uri, from_uri;
- pj_status_t status;
-
- if (client->client) {
- return 0;
- } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
- &client->client) != PJ_SUCCESS) {
- ao2_ref(client, -1);
- return -1;
- }
-
- publish = ao2_bump(client->publish);
-
- if (!ast_strlen_zero(publish->outbound_proxy)) {
- pjsip_route_hdr route_set, *route;
- static const pj_str_t ROUTE_HNAME = { "Route", 5 };
-
- pj_list_init(&route_set);
-
- if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
- (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
- pjsip_publishc_destroy(client->client);
- return -1;
- }
- pj_list_insert_nodes_before(&route_set, route);
-
- pjsip_publishc_set_route_set(client->client, &route_set);
- }
-
- pj_cstr(&event, publish->event);
- pj_cstr(&server_uri, publish->server_uri);
- pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
- pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
-
- status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
- publish->expiration);
- if (status == PJSIP_EINVALIDURI) {
- pj_pool_t *pool;
- pj_str_t tmp;
- pjsip_uri *uri;
-
- pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "URI Validation", 256, 256);
- if (!pool) {
- ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
- ast_sorcery_object_get_id(publish));
- pjsip_publishc_destroy(client->client);
- return -1;
- }
-
- pj_strdup2_with_null(pool, &tmp, publish->server_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid server URI '%s' specified on outbound publish '%s'\n",
- publish->server_uri, ast_sorcery_object_get_id(publish));
- }
-
- if (!ast_strlen_zero(publish->to_uri)) {
- pj_strdup2_with_null(pool, &tmp, publish->to_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid to URI '%s' specified on outbound publish '%s'\n",
- publish->to_uri, ast_sorcery_object_get_id(publish));
- }
- }
-
- if (!ast_strlen_zero(publish->from_uri)) {
- pj_strdup2_with_null(pool, &tmp, publish->from_uri);
- uri = pjsip_parse_uri(pool, tmp.ptr, tmp.slen, 0);
- if (!uri) {
- ast_log(LOG_ERROR, "Invalid from URI '%s' specified on outbound publish '%s'\n",
- publish->from_uri, ast_sorcery_object_get_id(publish));
- }
- }
-
- pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
- pjsip_publishc_destroy(client->client);
- return -1;
- } else if (status != PJ_SUCCESS) {
- pjsip_publishc_destroy(client->client);
- return -1;
- }
-
- return 0;
-}
-
/*! \brief Callback function for publish client responses */
static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
{
#define DESTROY_CLIENT() do { \
- pjsip_publishc_destroy(client->client); \
- client->client = NULL; \
- ao2_ref(client, -1); } while (0)
+ pjsip_publishc_destroy(publisher->client); \
+ publisher->client = NULL; \
+ ao2_ref(publisher, -1); } while (0)
- RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
- RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
- SCOPED_AO2LOCK(lock, client);
+ RAII_VAR(struct sip_outbound_publisher *, publisher, ao2_bump(param->token), ao2_cleanup);
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(publisher->owner->publish), ao2_cleanup);
+ SCOPED_AO2LOCK(lock, publisher);
pjsip_tx_data *tdata;
- if (client->destroy) {
- if (client->sending) {
- client->sending = NULL;
+ if (publisher->destroy) {
+ if (publisher->sending) {
+ publisher->sending = NULL;
- if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) {
return;
}
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
- ao2_ref(client, -1);
+ ao2_ref(publisher, -1);
}
- /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
+ /* Once the destroy is called this callback will not get called any longer, so drop the publisher ref */
DESTROY_CLIENT();
return;
}
@@ -928,16 +1203,12 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
param->rdata, tsx->last_tx, &tdata)) {
- if (!ast_strlen_zero(client->transport_name)) {
- pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
- ast_sip_set_tpselector_from_transport_name(client->transport_name, &selector);
- pjsip_tx_data_set_transport(tdata, &selector);
- }
- pjsip_publishc_send(client->client, tdata);
+ set_transport(publisher, tdata);
+ pjsip_publishc_send(publisher->client, tdata);
}
- client->auth_attempts++;
+ publisher->auth_attempts++;
- if (client->auth_attempts == publish->max_auth_attempts) {
+ if (publisher->auth_attempts == publish->max_auth_attempts) {
DESTROY_CLIENT();
ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
@@ -947,18 +1218,18 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
return;
}
- client->auth_attempts = 0;
+ publisher->auth_attempts = 0;
if (param->code == 412) {
DESTROY_CLIENT();
- if (sip_outbound_publish_client_alloc(client)) {
+ if (sip_outbound_publisher_init(publisher)) {
ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
ast_sorcery_object_get_id(publish));
goto end;
}
/* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
- client->sending = NULL;
+ publisher->sending = NULL;
} else if (param->code == 423) {
/* Update the expiration with the new expiration time if available */
pjsip_expires_hdr *expires;
@@ -971,33 +1242,33 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
goto end;
}
- pjsip_publishc_update_expires(client->client, expires->ivalue);
- client->sending = NULL;
- } else if (client->sending) {
+ pjsip_publishc_update_expires(publisher->client, expires->ivalue);
+ publisher->sending = NULL;
+ } else if (publisher->sending) {
/* Remove the message currently being sent so that when the queue is serviced another will get sent */
- AST_LIST_REMOVE_HEAD(&client->queue, entry);
- ast_free(client->sending);
- client->sending = NULL;
+ AST_LIST_REMOVE_HEAD(&publisher->queue, entry);
+ ast_free(publisher->sending);
+ publisher->sending = NULL;
if (!param->rdata) {
ast_log(LOG_NOTICE, "No response received for outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
}
}
- if (AST_LIST_EMPTY(&client->queue)) {
- schedule_publish_refresh(client, param->expiration);
+ if (AST_LIST_EMPTY(&publisher->queue)) {
+ schedule_publish_refresh(publisher, param->expiration);
}
end:
- if (!client->client) {
+ if (!publisher->client) {
struct sip_outbound_publish_message *message;
- while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&publisher->queue, entry))) {
ast_free(message);
}
} else {
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
- ao2_ref(client, -1);
+ if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) {
+ ao2_ref(publisher, -1);
}
}
}
@@ -1085,27 +1356,18 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
return NULL;
}
- state->client->timer.user_data = state->client;
- state->client->timer.cb = sip_outbound_publish_timer_cb;
- state->client->transport_name = ast_strdup(publish->transport);
+ state->client->publishers = ao2_container_alloc(DATASTORE_BUCKETS, sip_outbound_publisher_hash_fn,
+ sip_outbound_publisher_cmp_fn);
+ if (!state->client->publishers) {
+ ao2_ref(state, -1);
+ return NULL;
+ }
state->client->publish = ao2_bump(publish);
strcpy(state->id, id);
return state;
}
-static int initialize_publish_client(struct ast_sip_outbound_publish *publish,
- struct ast_sip_outbound_publish_state *state)
-{
- if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
- ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
- ast_sorcery_object_get_id(publish));
- return -1;
- }
-
- return 0;
-}
-
static int validate_publish_config(struct ast_sip_outbound_publish *publish)
{
if (ast_strlen_zero(publish->server_uri)) {
@@ -1125,6 +1387,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish,
{
struct ast_sip_outbound_publish *old_publish;
+ /*
+ * Don't maintain the old state/client objects if the multi_user option changed.
+ */
+ if ((!publish->multi_user && current_state->client->publish->multi_user) ||
+ (publish->multi_user && !current_state->client->publish->multi_user)) {
+ return 0;
+ }
+
+
if (!can_reuse_publish(current_state->client->publish, publish)) {
/*
* Something significant has changed in the configuration, so we are
@@ -1140,12 +1411,15 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish,
*/
old_publish = current_state->client->publish;
current_state->client->publish = publish;
- if (initialize_publish_client(publish, current_state)) {
+ if (ast_sip_push_task_synchronous(
+ NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) {
/*
* If the state object fails to re-initialize then swap
* the old publish info back in.
*/
current_state->client->publish = publish;
+ ast_log(LOG_ERROR, "Unable to reinitialize client(s) for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(current_state->client->publish));
return -1;
}
@@ -1170,6 +1444,7 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o
struct ast_sip_outbound_publish *applied = obj;
struct ast_sip_outbound_publish_state *current_state, *new_state;
+ struct sip_outbound_publisher *publisher = NULL;
int res;
/*
@@ -1216,11 +1491,13 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o
return -1;
};
- if (initialize_publish_client(applied, new_state)) {
+ if (!applied->multi_user &&
+ !(publisher = sip_outbound_publish_client_add_publisher(new_state->client, NULL))) {
ADD_TO_NEW_STATES(current_state);
ao2_ref(new_state, -1);
return -1;
}
+ ao2_cleanup(publisher);
ADD_TO_NEW_STATES(new_state);
ao2_cleanup(current_state);
@@ -1238,6 +1515,9 @@ static int load_module(void)
{
CHECK_PJSIP_MODULE_LOADED();
+ /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
+ ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size);
+
ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
@@ -1257,6 +1537,7 @@ static int load_module(void)
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "transport", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, transport));
ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+ ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "multi_user", "no", OPT_BOOL_T, 1, FLDSET(struct ast_sip_outbound_publish, multi_user));
ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
@@ -1279,6 +1560,13 @@ static int reload_module(void)
return 0;
}
+static int current_publishing_count(void *obj, void *arg, int flags)
+{
+ struct ast_sip_outbound_publish_state *state = obj;
+ unloading.count += ao2_container_count(state->client->publishers);
+ return 0;
+}
+
static int unload_module(void)
{
struct timeval start = ast_tvnow();
@@ -1289,11 +1577,18 @@ static int unload_module(void)
int res = 0;
struct ao2_container *states = ao2_global_obj_ref(current_states);
- if (!states || !(unloading.count = ao2_container_count(states))) {
+ if (!states) {
return 0;
}
+
+ unloading.count = 0;
+ ao2_callback(states, OBJ_NODATA, current_publishing_count, NULL);
ao2_ref(states, -1);
+ if (!unloading.count) {
+ return 0;
+ }
+
ast_mutex_init(&unloading.lock);
ast_cond_init(&unloading.cond, NULL);
ast_mutex_lock(&unloading.lock);