summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--res/res_pjsip_outbound_publish.c563
1 files changed, 362 insertions, 201 deletions
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c
index 8bf329ab0..323324d60 100644
--- a/res/res_pjsip_outbound_publish.c
+++ b/res/res_pjsip_outbound_publish.c
@@ -105,26 +105,6 @@ struct sip_outbound_publish_message {
char body_contents[0];
};
-/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
-struct ast_sip_outbound_publish_client {
- /*! \brief Underlying publish client */
- pjsip_publishc *client;
- /*! \brief Timer entry for refreshing publish */
- pj_timer_entry timer;
- /*! \brief Publisher datastores set up by handlers */
- struct ao2_container *datastores;
- /*! \brief The number of auth attempts done */
- unsigned int auth_attempts;
- /*! \brief Queue of outgoing publish messages to send*/
- AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
- /*! \brief The message currently being sent */
- struct sip_outbound_publish_message *sending;
- /*! \brief Publish client has been fully started and event type informed */
- unsigned int started;
- /*! \brief Publish client should be destroyed */
- unsigned int destroy;
-};
-
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish {
/*! \brief Sorcery object details */
@@ -148,14 +128,122 @@ struct ast_sip_outbound_publish {
unsigned int max_auth_attempts;
/*! \brief Configured authentication credentials */
struct ast_sip_auth_vector outbound_auths;
- /*! \brief Outbound publish state */
- struct ast_sip_outbound_publish_client *state;
};
-AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
+/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
+struct ast_sip_outbound_publish_client {
+ /*! \brief Underlying publish client */
+ pjsip_publishc *client;
+ /*! \brief Timer entry for refreshing publish */
+ pj_timer_entry timer;
+ /*! \brief Publisher datastores set up by handlers */
+ struct ao2_container *datastores;
+ /*! \brief The number of auth attempts done */
+ unsigned int auth_attempts;
+ /*! \brief Queue of outgoing publish messages to send*/
+ AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue;
+ /*! \brief The message currently being sent */
+ struct sip_outbound_publish_message *sending;
+ /*! \brief Publish client has been fully started and event type informed */
+ unsigned int started;
+ /*! \brief Publish client should be destroyed */
+ unsigned int destroy;
+ /*! \brief Outbound publish information */
+ struct ast_sip_outbound_publish *publish;
+};
+
+/*! \brief Outbound publish state information (persists for lifetime of a publish) */
+struct ast_sip_outbound_publish_state {
+ /*! \brief Outbound publish client */
+ struct ast_sip_outbound_publish_client *client;
+ /* publish state id lookup key - same as publish configuration id */
+ char id[0];
+};
-/*! \brief Container of currently active publish clients */
-static AO2_GLOBAL_OBJ_STATIC(active);
+/*! \brief Unloading data */
+struct unloading_data {
+ int is_unloading;
+ int count;
+ ast_mutex_t lock;
+ ast_cond_t cond;
+} unloading;
+
+/*! \brief Default number of client state container buckets */
+#define DEFAULT_STATE_BUCKETS 31
+static AO2_GLOBAL_OBJ_STATIC(current_states);
+/*! \brief Used on [re]loads to hold new state data */
+static struct ao2_container *new_states;
+
+/*! \brief hashing function for state objects */
+static int outbound_publish_state_hash(const void *obj, const int flags)
+{
+ const struct ast_sip_outbound_publish_state *object;
+ const char *key;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_KEY:
+ key = obj;
+ break;
+ case OBJ_SEARCH_OBJECT:
+ object = obj;
+ key = object->id;
+ break;
+ default:
+ ast_assert(0);
+ return 0;
+ }
+ return ast_str_hash(key);
+}
+
+/*! \brief comparator function for client objects */
+static int outbound_publish_state_cmp(void *obj, void *arg, int flags)
+{
+ const struct ast_sip_outbound_publish_state *object_left = obj;
+ const struct ast_sip_outbound_publish_state *object_right = arg;
+ const char *right_key = arg;
+ int cmp;
+
+ switch (flags & OBJ_SEARCH_MASK) {
+ case OBJ_SEARCH_OBJECT:
+ right_key = object_right->id;
+ /* Fall through */
+ case OBJ_SEARCH_KEY:
+ cmp = strcmp(object_left->id, right_key);
+ break;
+ case OBJ_SEARCH_PARTIAL_KEY:
+ /* Not supported by container. */
+ ast_assert(0);
+ return 0;
+ default:
+ cmp = 0;
+ break;
+ }
+ if (cmp) {
+ return 0;
+ }
+ return CMP_MATCH;
+}
+
+static struct ao2_container *get_publishes_and_update_state(void)
+{
+ struct ao2_container *container;
+
+ container = ast_sorcery_retrieve_by_fields(
+ ast_sip_get_sorcery(), "outbound-publish",
+ AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
+
+ if (!new_states) {
+ return container;
+ }
+
+ ao2_global_obj_replace_unref(current_states, new_states);
+ ao2_cleanup(new_states);
+ new_states = NULL;
+
+ return container;
+}
+
+AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler);
static void sub_add_handler(struct ast_sip_event_publisher_handler *handler)
{
@@ -185,12 +273,13 @@ static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *clien
}
/*! \brief Helper function which sets up the timer to send publication */
-static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, pjsip_rx_data *rdata)
+static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, pjsip_rx_data *rdata)
{
+ struct ast_sip_outbound_publish *publish = ao2_bump(client->publish);
pj_time_val delay = { .sec = 0, };
pjsip_expires_hdr *expires;
- cancel_publish_refresh(publish->state);
+ cancel_publish_refresh(client);
/* Determine when we should refresh - we favor the Expires header if possible */
expires = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
@@ -204,11 +293,12 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, p
delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH;
}
- ao2_ref(publish->state, +1);
- if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publish->state->timer, &delay) != PJ_SUCCESS) {
+ ao2_ref(client, +1);
+ if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) {
ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n");
- ao2_ref(publish->state, -1);
+ ao2_ref(client, -1);
}
+ ao2_ref(publish, -1);
}
/*! \brief Publish client timer callback function */
@@ -229,10 +319,10 @@ static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj
/*! \brief Task for cancelling a refresh timer */
static int cancel_refresh_timer_task(void *data)
{
- struct ast_sip_outbound_publish_client *state = data;
+ struct ast_sip_outbound_publish_client *client = data;
- cancel_publish_refresh(state);
- ao2_ref(state, -1);
+ cancel_publish_refresh(client);
+ ao2_ref(client, -1);
return 0;
}
@@ -240,14 +330,14 @@ static int cancel_refresh_timer_task(void *data)
/*! \brief Task for sending an unpublish */
static int send_unpublish_task(void *data)
{
- struct ast_sip_outbound_publish_client *state = data;
+ struct ast_sip_outbound_publish_client *client = data;
pjsip_tx_data *tdata;
- if (pjsip_publishc_unpublish(state->client, &tdata) == PJ_SUCCESS) {
- pjsip_publishc_send(state->client, tdata);
+ if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) {
+ pjsip_publishc_send(client->client, tdata);
}
- ao2_ref(state, -1);
+ ao2_ref(client, -1);
return 0;
}
@@ -255,53 +345,70 @@ static int send_unpublish_task(void *data)
/*! \brief Helper function which starts or stops publish clients when applicable */
static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed)
{
- RAII_VAR(struct ao2_container *, publishes, ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL), ao2_cleanup);
+ RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup);
+ struct ao2_container *states;
struct ao2_iterator i;
- struct ast_sip_outbound_publish *publish;
+ struct ast_sip_outbound_publish_state *state;
if (!publishes) {
return;
}
- i = ao2_iterator_init(publishes, 0);
- while ((publish = ao2_iterator_next(&i))) {
+ states = ao2_global_obj_ref(current_states);
+ if (!states) {
+ return;
+ }
+
+ i = ao2_iterator_init(states, 0);
+ while ((state = ao2_iterator_next(&i))) {
+ struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish);
struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
- if (!publish->state->started) {
+ if (!state->client->started) {
/* If the publisher client has not yet been started try to start it */
if (!handler) {
ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n",
- publish->event, ast_sorcery_object_get_id(publish));
- } else if (handler->start_publishing(publish, publish->state)) {
+ publish->event, ast_sorcery_object_get_id(publish));
+ } else if (handler->start_publishing(publish, state->client)) {
ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n",
publish->event, ast_sorcery_object_get_id(publish));
} else {
- publish->state->started = 1;
+ state->client->started = 1;
}
- } else if (publish->state->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
+ } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) {
/* If the publisher client has been started but it is going away stop it */
- removed->stop_publishing(publish->state);
- publish->state->started = 0;
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publish->state))) {
+ removed->stop_publishing(state->client);
+ state->client->started = 0;
+ if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) {
ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n",
ast_sorcery_object_get_id(publish));
- ao2_ref(publish->state, -1);
+ ao2_ref(state->client, -1);
}
}
ao2_ref(publish, -1);
+ ao2_ref(state, -1);
}
ao2_iterator_destroy(&i);
+ ao2_ref(states, -1);
}
struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name)
{
- RAII_VAR(struct ast_sip_outbound_publish *, publish, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish", name), ao2_cleanup);
+ RAII_VAR(struct ao2_container *, states,
+ ao2_global_obj_ref(current_states), ao2_cleanup);
+ RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
- if (!publish) {
+ if (!states) {
return NULL;
}
- return ao2_bump(publish->state);
+ state = ao2_find(states, name, OBJ_SEARCH_KEY);
+ if (!state) {
+ return NULL;
+ }
+
+ ao2_ref(state->client, +1);
+ return state->client;
}
int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler)
@@ -351,16 +458,7 @@ void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_h
static void sip_outbound_publish_destroy(void *obj)
{
struct ast_sip_outbound_publish *publish = obj;
- SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
- struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event);
- if (handler) {
- handler->stop_publishing(publish->state);
- }
- if (publish->state) {
- cancel_publish_refresh(publish->state);
- ao2_ref(publish->state, -1);
- }
ast_sip_auth_vector_destroy(&publish->outbound_auths);
ast_string_field_free_memory(publish);
@@ -538,19 +636,72 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
return res;
}
-/*! \brief Destructor function for publish state */
+/*! \brief Destructor function for publish client */
static void sip_outbound_publish_client_destroy(void *obj)
{
- struct ast_sip_outbound_publish_client *state = obj;
+ struct ast_sip_outbound_publish_client *client = obj;
struct sip_outbound_publish_message *message;
/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
- while ((message = AST_LIST_REMOVE_HEAD(&state->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
ast_free(message);
}
- ao2_cleanup(state->datastores);
+ ao2_cleanup(client->datastores);
+ ao2_cleanup(client->publish);
+
+ /* if unloading the module and all objects have been unpublished
+ send the signal to finish unloading */
+ if (unloading.is_unloading) {
+ ast_mutex_lock(&unloading.lock);
+ if (--unloading.count == 0) {
+ ast_cond_signal(&unloading.cond);
+ }
+ ast_mutex_unlock(&unloading.lock);
+ }
+}
+
+/*! \brief Helper function which cancels and un-publishes a no longer used client */
+static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
+{
+ SCOPED_AO2LOCK(lock, client);
+
+ /* If this publish client is currently publishing stop and terminate any refresh timer */
+ if (client->started) {
+ struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(client->publish->event);
+
+ if (handler) {
+ handler->stop_publishing(client);
+ }
+
+ client->started = 0;
+ if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) {
+ ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(client->publish));
+ ao2_ref(client, -1);
+ }
+ }
+
+ /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
+ if (!client->sending) {
+ if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
+ ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
+ ast_sorcery_object_get_id(client->publish));
+ ao2_ref(client, -1);
+ }
+ }
+ client->destroy = 1;
+ return 0;
+}
+
+/*! \brief Destructor function for publish state */
+static void sip_outbound_publish_state_destroy(void *obj)
+{
+ struct ast_sip_outbound_publish_state *state = obj;
+
+ cancel_and_unpublish(state->client);
+ ao2_cleanup(state->client);
}
/*!
@@ -588,35 +739,38 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
/*! \brief Helper function that allocates a pjsip publish client and configures it */
static int sip_outbound_publish_client_alloc(void *data)
{
- struct ast_sip_outbound_publish *publish = data;
+ struct ast_sip_outbound_publish_client *client = data;
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup);
pjsip_publishc_opt opt = {
.queue_request = PJ_FALSE,
};
pj_str_t event, server_uri, to_uri, from_uri;
pj_status_t status;
- if (publish->state->client) {
+ if (client->client) {
return 0;
- } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(publish), sip_outbound_publish_callback,
- &publish->state->client) != PJ_SUCCESS) {
- ao2_ref(publish, -1);
+ } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback,
+ &client->client) != PJ_SUCCESS) {
+ ao2_ref(client, -1);
return -1;
}
+ publish = ao2_bump(client->publish);
+
if (!ast_strlen_zero(publish->outbound_proxy)) {
pjsip_route_hdr route_set, *route;
static const pj_str_t ROUTE_HNAME = { "Route", 5 };
pj_list_init(&route_set);
- if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publish->state->client), &ROUTE_HNAME,
+ if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME,
(char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) {
- pjsip_publishc_destroy(publish->state->client);
+ pjsip_publishc_destroy(client->client);
return -1;
}
pj_list_insert_nodes_before(&route_set, route);
- pjsip_publishc_set_route_set(publish->state->client, &route_set);
+ pjsip_publishc_set_route_set(client->client, &route_set);
}
pj_cstr(&event, publish->event);
@@ -624,7 +778,7 @@ static int sip_outbound_publish_client_alloc(void *data)
pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri));
pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri));
- status = pjsip_publishc_init(publish->state->client, &event, &server_uri, &from_uri, &to_uri,
+ status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri,
publish->expiration);
if (status == PJSIP_EINVALIDURI) {
pj_pool_t *pool;
@@ -635,7 +789,7 @@ static int sip_outbound_publish_client_alloc(void *data)
if (!pool) {
ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
- pjsip_publishc_destroy(publish->state->client);
+ pjsip_publishc_destroy(client->client);
return -1;
}
@@ -665,10 +819,10 @@ static int sip_outbound_publish_client_alloc(void *data)
}
pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
- pjsip_publishc_destroy(publish->state->client);
+ pjsip_publishc_destroy(client->client);
return -1;
} else if (status != PJ_SUCCESS) {
- pjsip_publishc_destroy(publish->state->client);
+ pjsip_publishc_destroy(client->client);
return -1;
}
@@ -678,51 +832,52 @@ static int sip_outbound_publish_client_alloc(void *data)
/*! \brief Callback function for publish client responses */
static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
{
- RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(param->token), ao2_cleanup);
- SCOPED_AO2LOCK(lock, publish->state);
+ RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup);
+ RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup);
+ SCOPED_AO2LOCK(lock, client);
pjsip_tx_data *tdata;
- if (publish->state->destroy) {
- if (publish->state->sending) {
- publish->state->sending = NULL;
- if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publish->state))) {
+ if (client->destroy) {
+ if (client->sending) {
+ client->sending = NULL;
+
+ if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) {
return;
}
ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
- ao2_ref(publish->state, -1);
+ ao2_ref(client, -1);
}
- /* Once the destroy is called this callback will not get called any longer, so drop the publish ref */
- pjsip_publishc_destroy(publish->state->client);
- ao2_ref(publish, -1);
+ /* Once the destroy is called this callback will not get called any longer, so drop the client ref */
+ pjsip_publishc_destroy(client->client);
+ ao2_ref(client, -1);
return;
}
if (param->code == 401 || param->code == 407) {
if (!ast_sip_create_request_with_auth(&publish->outbound_auths,
param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) {
- pjsip_publishc_send(publish->state->client, tdata);
+ pjsip_publishc_send(client->client, tdata);
}
- publish->state->auth_attempts++;
+ client->auth_attempts++;
- if (publish->state->auth_attempts == publish->max_auth_attempts) {
- pjsip_publishc_destroy(publish->state->client);
- publish->state->client = NULL;
+ if (client->auth_attempts == publish->max_auth_attempts) {
+ pjsip_publishc_destroy(client->client);
+ client->client = NULL;
ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n",
ast_sorcery_object_get_id(publish));
goto end;
}
-
return;
}
- publish->state->auth_attempts = 0;
+ client->auth_attempts = 0;
if (param->code == 412) {
- pjsip_publishc_destroy(publish->state->client);
- publish->state->client = NULL;
+ pjsip_publishc_destroy(client->client);
+ client->client = NULL;
if (sip_outbound_publish_client_alloc(publish)) {
ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n",
@@ -731,7 +886,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
}
/* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
- publish->state->sending = NULL;
+ client->sending = NULL;
} else if (param->code == 423) {
/* Update the expiration with the new expiration time if available */
pjsip_expires_hdr *expires;
@@ -740,34 +895,34 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if (!expires || !expires->ivalue) {
ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n",
ast_sorcery_object_get_id(publish));
- pjsip_publishc_destroy(publish->state->client);
- publish->state->client = NULL;
+ pjsip_publishc_destroy(client->client);
+ client->client = NULL;
goto end;
}
- pjsip_publishc_update_expires(publish->state->client, expires->ivalue);
- publish->state->sending = NULL;
- } else if (publish->state->sending) {
+ pjsip_publishc_update_expires(client->client, expires->ivalue);
+ client->sending = NULL;
+ } else if (client->sending) {
/* Remove the message currently being sent so that when the queue is serviced another will get sent */
- AST_LIST_REMOVE_HEAD(&publish->state->queue, entry);
- ast_free(publish->state->sending);
- publish->state->sending = NULL;
+ AST_LIST_REMOVE_HEAD(&client->queue, entry);
+ ast_free(client->sending);
+ client->sending = NULL;
}
- if (AST_LIST_EMPTY(&publish->state->queue)) {
- schedule_publish_refresh(publish, param->rdata);
+ if (AST_LIST_EMPTY(&client->queue)) {
+ schedule_publish_refresh(client, param->rdata);
}
end:
- if (!publish->state->client) {
+ if (!client->client) {
struct sip_outbound_publish_message *message;
- while ((message = AST_LIST_REMOVE_HEAD(&publish->state->queue, entry))) {
+ while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) {
ast_free(message);
}
} else {
- if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(publish->state))) {
- ao2_ref(publish->state, -1);
+ if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) {
+ ao2_ref(client, -1);
}
}
}
@@ -831,31 +986,43 @@ static int datastore_cmp(void *obj, void *arg, int flags)
return CMP_MATCH;
}
-/*! \brief Allocator function for publish client state */
-static struct ast_sip_outbound_publish_client *sip_outbound_publish_state_alloc(void)
+/*! \brief Allocator function for publish client */
+static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
+ struct ast_sip_outbound_publish *publish)
{
- struct ast_sip_outbound_publish_client *state = ao2_alloc(sizeof(*state), sip_outbound_publish_client_destroy);
+ const char *id = ast_sorcery_object_get_id(publish);
+ struct ast_sip_outbound_publish_state *state =
+ ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy);
if (!state) {
return NULL;
}
- state->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
- if (!state->datastores) {
+ state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy);
+ if (!state->client) {
+ ao2_ref(state, -1);
+ return NULL;
+ }
+
+ state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
+ if (!state->client->datastores) {
ao2_ref(state, -1);
return NULL;
}
- state->timer.user_data = state;
- state->timer.cb = sip_outbound_publish_timer_cb;
+ state->client->timer.user_data = state->client;
+ state->client->timer.cb = sip_outbound_publish_timer_cb;
+ state->client->publish = ao2_bump(publish);
+ strcpy(state->id, id);
return state;
}
/*! \brief Apply function which finds or allocates a state structure */
static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj)
{
- RAII_VAR(struct ast_sip_outbound_publish *, existing, ast_sorcery_retrieve_by_id(sorcery, "outbound-publish", ast_sorcery_object_get_id(obj)), ao2_cleanup);
+ RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup);
+ RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup);
struct ast_sip_outbound_publish *applied = obj;
if (ast_strlen_zero(applied->server_uri)) {
@@ -868,24 +1035,47 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o
return -1;
}
- if (!existing) {
- /* If no existing publish exists we can just start fresh easily */
- applied->state = sip_outbound_publish_state_alloc();
- } else {
- /* If there is an existing publish things are more complicated, we can immediately reuse this state if most stuff remains unchanged */
- if (can_reuse_publish(existing, applied)) {
- applied->state = existing->state;
- ao2_ref(applied->state, +1);
- } else {
- applied->state = sip_outbound_publish_state_alloc();
+ if (!new_states) {
+ /* make sure new_states has been allocated as we will be adding to it */
+ new_states = ao2_container_alloc_options(
+ AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS,
+ outbound_publish_state_hash, outbound_publish_state_cmp);
+
+ if (!new_states) {
+ ast_log(LOG_ERROR, "Unable to allocate new states container\n");
+ return -1;
+ }
+ }
+
+ if (states) {
+ state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY);
+ if (state) {
+ if (can_reuse_publish(state->client->publish, applied)) {
+ ao2_replace(state->client->publish, applied);
+ } else {
+ ao2_ref(state, -1);
+ state = NULL;
+ }
}
}
- if (!applied->state) {
+ if (!state) {
+ state = sip_outbound_publish_state_alloc(applied);
+ if (!state) {
+ ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(applied));
+ return -1;
+ };
+ }
+
+ if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) {
+ ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n",
+ ast_sorcery_object_get_id(applied));
return -1;
}
- return ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, applied);
+ ao2_link(new_states, state);
+ return 0;
}
static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
@@ -895,74 +1085,9 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return ast_sip_auth_vector_init(&publish->outbound_auths, var->value);
}
-/*! \brief Helper function which prunes old publish clients */
-static void prune_publish_clients(const char *object_type)
-{
- struct ao2_container *old, *current;
-
- old = ao2_global_obj_ref(active);
- if (old) {
- struct ao2_iterator i;
- struct ast_sip_outbound_publish *existing;
-
- i = ao2_iterator_init(old, 0);
- for (; (existing = ao2_iterator_next(&i)); ao2_ref(existing, -1)) {
- struct ast_sip_outbound_publish *created;
-
- created = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish",
- ast_sorcery_object_get_id(existing));
- if (created) {
- if (created->state == existing->state) {
- ao2_ref(created, -1);
- continue;
- }
- ao2_ref(created, -1);
- }
-
- ao2_lock(existing->state);
-
- /* If this publish client is currently publishing stop and terminate any refresh timer */
- if (existing->state->started) {
- struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(existing->event);
-
- if (handler) {
- handler->stop_publishing(existing->state);
- }
-
- if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(existing->state))) {
- ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n",
- ast_sorcery_object_get_id(existing));
- ao2_ref(existing->state, -1);
- }
- }
-
- /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
- if (!existing->state->sending) {
- if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(existing->state))) {
- ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n",
- ast_sorcery_object_get_id(existing));
- ao2_ref(existing->state, -1);
- }
- }
-
- existing->state->destroy = 1;
- ao2_unlock(existing->state);
- }
- ao2_iterator_destroy(&i);
-
- ao2_ref(old, -1);
- }
-
- current = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
- ao2_global_obj_replace_unref(active, current);
-}
-
-static struct ast_sorcery_observer outbound_publish_observer = {
- .loaded = prune_publish_clients,
-};
-
static int load_module(void)
{
+ ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish");
ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish");
if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL,
@@ -970,8 +1095,6 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
- ast_sorcery_observer_add(ast_sip_get_sorcery(), "outbound-publish", &outbound_publish_observer);
-
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0);
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri));
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri));
@@ -981,6 +1104,7 @@ static int load_module(void)
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration));
ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts));
ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0);
+
ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish");
AST_RWLIST_RDLOCK(&publisher_handlers);
@@ -1004,7 +1128,44 @@ static int reload_module(void)
static int unload_module(void)
{
- return 0;
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + 10,
+ .tv_nsec = start.tv_usec * 1000
+ };
+ int res = 0;
+ struct ao2_container *states = ao2_global_obj_ref(current_states);
+
+ if (!states || !(unloading.count = ao2_container_count(states))) {
+ return 0;
+ }
+ ao2_ref(states, -1);
+
+ ast_mutex_init(&unloading.lock);
+ ast_cond_init(&unloading.cond, NULL);
+ ast_mutex_lock(&unloading.lock);
+
+ unloading.is_unloading = 1;
+ ao2_global_obj_release(current_states);
+
+ /* wait for items to unpublish */
+ ast_verb(5, "Waiting to complete unpublishing task(s)\n");
+ while (unloading.count) {
+ res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end);
+ }
+ ast_mutex_unlock(&unloading.lock);
+
+ ast_mutex_destroy(&unloading.lock);
+ ast_cond_destroy(&unloading.cond);
+
+ if (res) {
+ ast_verb(5, "At least %d items were unable to unpublish "
+ "in the allowed time\n", unloading.count);
+ } else {
+ ast_verb(5, "All items successfully unpublished\n");
+ }
+
+ return res;
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support",