diff options
Diffstat (limited to 'res')
-rw-r--r-- | res/res_pjsip_outbound_publish.c | 563 |
1 files changed, 362 insertions, 201 deletions
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 8bf329ab0..323324d60 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -105,26 +105,6 @@ struct sip_outbound_publish_message { char body_contents[0]; }; -/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */ -struct ast_sip_outbound_publish_client { - /*! \brief Underlying publish client */ - pjsip_publishc *client; - /*! \brief Timer entry for refreshing publish */ - pj_timer_entry timer; - /*! \brief Publisher datastores set up by handlers */ - struct ao2_container *datastores; - /*! \brief The number of auth attempts done */ - unsigned int auth_attempts; - /*! \brief Queue of outgoing publish messages to send*/ - AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue; - /*! \brief The message currently being sent */ - struct sip_outbound_publish_message *sending; - /*! \brief Publish client has been fully started and event type informed */ - unsigned int started; - /*! \brief Publish client should be destroyed */ - unsigned int destroy; -}; - /*! \brief Outbound publish information */ struct ast_sip_outbound_publish { /*! \brief Sorcery object details */ @@ -148,14 +128,122 @@ struct ast_sip_outbound_publish { unsigned int max_auth_attempts; /*! \brief Configured authentication credentials */ struct ast_sip_auth_vector outbound_auths; - /*! \brief Outbound publish state */ - struct ast_sip_outbound_publish_client *state; }; -AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler); +/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */ +struct ast_sip_outbound_publish_client { + /*! \brief Underlying publish client */ + pjsip_publishc *client; + /*! \brief Timer entry for refreshing publish */ + pj_timer_entry timer; + /*! \brief Publisher datastores set up by handlers */ + struct ao2_container *datastores; + /*! \brief The number of auth attempts done */ + unsigned int auth_attempts; + /*! \brief Queue of outgoing publish messages to send*/ + AST_LIST_HEAD_NOLOCK(, sip_outbound_publish_message) queue; + /*! \brief The message currently being sent */ + struct sip_outbound_publish_message *sending; + /*! \brief Publish client has been fully started and event type informed */ + unsigned int started; + /*! \brief Publish client should be destroyed */ + unsigned int destroy; + /*! \brief Outbound publish information */ + struct ast_sip_outbound_publish *publish; +}; + +/*! \brief Outbound publish state information (persists for lifetime of a publish) */ +struct ast_sip_outbound_publish_state { + /*! \brief Outbound publish client */ + struct ast_sip_outbound_publish_client *client; + /* publish state id lookup key - same as publish configuration id */ + char id[0]; +}; -/*! \brief Container of currently active publish clients */ -static AO2_GLOBAL_OBJ_STATIC(active); +/*! \brief Unloading data */ +struct unloading_data { + int is_unloading; + int count; + ast_mutex_t lock; + ast_cond_t cond; +} unloading; + +/*! \brief Default number of client state container buckets */ +#define DEFAULT_STATE_BUCKETS 31 +static AO2_GLOBAL_OBJ_STATIC(current_states); +/*! \brief Used on [re]loads to hold new state data */ +static struct ao2_container *new_states; + +/*! \brief hashing function for state objects */ +static int outbound_publish_state_hash(const void *obj, const int flags) +{ + const struct ast_sip_outbound_publish_state *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->id; + break; + default: + ast_assert(0); + return 0; + } + return ast_str_hash(key); +} + +/*! \brief comparator function for client objects */ +static int outbound_publish_state_cmp(void *obj, void *arg, int flags) +{ + const struct ast_sip_outbound_publish_state *object_left = obj; + const struct ast_sip_outbound_publish_state *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->id; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->id, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container. */ + ast_assert(0); + return 0; + default: + cmp = 0; + break; + } + if (cmp) { + return 0; + } + return CMP_MATCH; +} + +static struct ao2_container *get_publishes_and_update_state(void) +{ + struct ao2_container *container; + + container = ast_sorcery_retrieve_by_fields( + ast_sip_get_sorcery(), "outbound-publish", + AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + + if (!new_states) { + return container; + } + + ao2_global_obj_replace_unref(current_states, new_states); + ao2_cleanup(new_states); + new_states = NULL; + + return container; +} + +AST_RWLIST_HEAD_STATIC(publisher_handlers, ast_sip_event_publisher_handler); static void sub_add_handler(struct ast_sip_event_publisher_handler *handler) { @@ -185,12 +273,13 @@ static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *clien } /*! \brief Helper function which sets up the timer to send publication */ -static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, pjsip_rx_data *rdata) +static void schedule_publish_refresh(struct ast_sip_outbound_publish_client *client, pjsip_rx_data *rdata) { + struct ast_sip_outbound_publish *publish = ao2_bump(client->publish); pj_time_val delay = { .sec = 0, }; pjsip_expires_hdr *expires; - cancel_publish_refresh(publish->state); + cancel_publish_refresh(client); /* Determine when we should refresh - we favor the Expires header if possible */ expires = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); @@ -204,11 +293,12 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, p delay.sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH; } - ao2_ref(publish->state, +1); - if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &publish->state->timer, &delay) != PJ_SUCCESS) { + ao2_ref(client, +1); + if (pjsip_endpt_schedule_timer(ast_sip_get_pjsip_endpoint(), &client->timer, &delay) != PJ_SUCCESS) { ast_log(LOG_WARNING, "Failed to pass timed publish refresh to scheduler\n"); - ao2_ref(publish->state, -1); + ao2_ref(client, -1); } + ao2_ref(publish, -1); } /*! \brief Publish client timer callback function */ @@ -229,10 +319,10 @@ static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj /*! \brief Task for cancelling a refresh timer */ static int cancel_refresh_timer_task(void *data) { - struct ast_sip_outbound_publish_client *state = data; + struct ast_sip_outbound_publish_client *client = data; - cancel_publish_refresh(state); - ao2_ref(state, -1); + cancel_publish_refresh(client); + ao2_ref(client, -1); return 0; } @@ -240,14 +330,14 @@ static int cancel_refresh_timer_task(void *data) /*! \brief Task for sending an unpublish */ static int send_unpublish_task(void *data) { - struct ast_sip_outbound_publish_client *state = data; + struct ast_sip_outbound_publish_client *client = data; pjsip_tx_data *tdata; - if (pjsip_publishc_unpublish(state->client, &tdata) == PJ_SUCCESS) { - pjsip_publishc_send(state->client, tdata); + if (pjsip_publishc_unpublish(client->client, &tdata) == PJ_SUCCESS) { + pjsip_publishc_send(client->client, tdata); } - ao2_ref(state, -1); + ao2_ref(client, -1); return 0; } @@ -255,53 +345,70 @@ static int send_unpublish_task(void *data) /*! \brief Helper function which starts or stops publish clients when applicable */ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_handler *removed) { - RAII_VAR(struct ao2_container *, publishes, ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL), ao2_cleanup); + RAII_VAR(struct ao2_container *, publishes, get_publishes_and_update_state(), ao2_cleanup); + struct ao2_container *states; struct ao2_iterator i; - struct ast_sip_outbound_publish *publish; + struct ast_sip_outbound_publish_state *state; if (!publishes) { return; } - i = ao2_iterator_init(publishes, 0); - while ((publish = ao2_iterator_next(&i))) { + states = ao2_global_obj_ref(current_states); + if (!states) { + return; + } + + i = ao2_iterator_init(states, 0); + while ((state = ao2_iterator_next(&i))) { + struct ast_sip_outbound_publish *publish = ao2_bump(state->client->publish); struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event); - if (!publish->state->started) { + if (!state->client->started) { /* If the publisher client has not yet been started try to start it */ if (!handler) { ast_debug(2, "Could not find handler for event '%s' for outbound publish client '%s'\n", - publish->event, ast_sorcery_object_get_id(publish)); - } else if (handler->start_publishing(publish, publish->state)) { + publish->event, ast_sorcery_object_get_id(publish)); + } else if (handler->start_publishing(publish, state->client)) { ast_log(LOG_ERROR, "Failed to start outbound publish with event '%s' for client '%s'\n", publish->event, ast_sorcery_object_get_id(publish)); } else { - publish->state->started = 1; + state->client->started = 1; } - } else if (publish->state->started && !handler && removed && !strcmp(publish->event, removed->event_name)) { + } else if (state->client->started && !handler && removed && !strcmp(publish->event, removed->event_name)) { /* If the publisher client has been started but it is going away stop it */ - removed->stop_publishing(publish->state); - publish->state->started = 0; - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publish->state))) { + removed->stop_publishing(state->client); + state->client->started = 0; + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(state->client))) { ast_log(LOG_WARNING, "Could not stop refresh timer on client '%s'\n", ast_sorcery_object_get_id(publish)); - ao2_ref(publish->state, -1); + ao2_ref(state->client, -1); } } ao2_ref(publish, -1); + ao2_ref(state, -1); } ao2_iterator_destroy(&i); + ao2_ref(states, -1); } struct ast_sip_outbound_publish_client *ast_sip_publish_client_get(const char *name) { - RAII_VAR(struct ast_sip_outbound_publish *, publish, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish", name), ao2_cleanup); + RAII_VAR(struct ao2_container *, states, + ao2_global_obj_ref(current_states), ao2_cleanup); + RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup); - if (!publish) { + if (!states) { return NULL; } - return ao2_bump(publish->state); + state = ao2_find(states, name, OBJ_SEARCH_KEY); + if (!state) { + return NULL; + } + + ao2_ref(state->client, +1); + return state->client; } int ast_sip_register_event_publisher_handler(struct ast_sip_event_publisher_handler *handler) @@ -351,16 +458,7 @@ void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_h static void sip_outbound_publish_destroy(void *obj) { struct ast_sip_outbound_publish *publish = obj; - SCOPED_LOCK(lock, &publisher_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); - struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(publish->event); - if (handler) { - handler->stop_publishing(publish->state); - } - if (publish->state) { - cancel_publish_refresh(publish->state); - ao2_ref(publish->state, -1); - } ast_sip_auth_vector_destroy(&publish->outbound_auths); ast_string_field_free_memory(publish); @@ -538,19 +636,72 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client, return res; } -/*! \brief Destructor function for publish state */ +/*! \brief Destructor function for publish client */ static void sip_outbound_publish_client_destroy(void *obj) { - struct ast_sip_outbound_publish_client *state = obj; + struct ast_sip_outbound_publish_client *client = obj; struct sip_outbound_publish_message *message; /* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */ - while ((message = AST_LIST_REMOVE_HEAD(&state->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { ast_free(message); } - ao2_cleanup(state->datastores); + ao2_cleanup(client->datastores); + ao2_cleanup(client->publish); + + /* if unloading the module and all objects have been unpublished + send the signal to finish unloading */ + if (unloading.is_unloading) { + ast_mutex_lock(&unloading.lock); + if (--unloading.count == 0) { + ast_cond_signal(&unloading.cond); + } + ast_mutex_unlock(&unloading.lock); + } +} + +/*! \brief Helper function which cancels and un-publishes a no longer used client */ +static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client) +{ + SCOPED_AO2LOCK(lock, client); + + /* If this publish client is currently publishing stop and terminate any refresh timer */ + if (client->started) { + struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(client->publish->event); + + if (handler) { + handler->stop_publishing(client); + } + + client->started = 0; + if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(client))) { + ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n", + ast_sorcery_object_get_id(client->publish)); + ao2_ref(client, -1); + } + } + + /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */ + if (!client->sending) { + if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { + ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", + ast_sorcery_object_get_id(client->publish)); + ao2_ref(client, -1); + } + } + client->destroy = 1; + return 0; +} + +/*! \brief Destructor function for publish state */ +static void sip_outbound_publish_state_destroy(void *obj) +{ + struct ast_sip_outbound_publish_state *state = obj; + + cancel_and_unpublish(state->client); + ao2_cleanup(state->client); } /*! @@ -588,35 +739,38 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param); /*! \brief Helper function that allocates a pjsip publish client and configures it */ static int sip_outbound_publish_client_alloc(void *data) { - struct ast_sip_outbound_publish *publish = data; + struct ast_sip_outbound_publish_client *client = data; + RAII_VAR(struct ast_sip_outbound_publish *, publish, NULL, ao2_cleanup); pjsip_publishc_opt opt = { .queue_request = PJ_FALSE, }; pj_str_t event, server_uri, to_uri, from_uri; pj_status_t status; - if (publish->state->client) { + if (client->client) { return 0; - } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(publish), sip_outbound_publish_callback, - &publish->state->client) != PJ_SUCCESS) { - ao2_ref(publish, -1); + } else if (pjsip_publishc_create(ast_sip_get_pjsip_endpoint(), &opt, ao2_bump(client), sip_outbound_publish_callback, + &client->client) != PJ_SUCCESS) { + ao2_ref(client, -1); return -1; } + publish = ao2_bump(client->publish); + if (!ast_strlen_zero(publish->outbound_proxy)) { pjsip_route_hdr route_set, *route; static const pj_str_t ROUTE_HNAME = { "Route", 5 }; pj_list_init(&route_set); - if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(publish->state->client), &ROUTE_HNAME, + if (!(route = pjsip_parse_hdr(pjsip_publishc_get_pool(client->client), &ROUTE_HNAME, (char*)publish->outbound_proxy, strlen(publish->outbound_proxy), NULL))) { - pjsip_publishc_destroy(publish->state->client); + pjsip_publishc_destroy(client->client); return -1; } pj_list_insert_nodes_before(&route_set, route); - pjsip_publishc_set_route_set(publish->state->client, &route_set); + pjsip_publishc_set_route_set(client->client, &route_set); } pj_cstr(&event, publish->event); @@ -624,7 +778,7 @@ static int sip_outbound_publish_client_alloc(void *data) pj_cstr(&to_uri, S_OR(publish->to_uri, publish->server_uri)); pj_cstr(&from_uri, S_OR(publish->from_uri, publish->server_uri)); - status = pjsip_publishc_init(publish->state->client, &event, &server_uri, &from_uri, &to_uri, + status = pjsip_publishc_init(client->client, &event, &server_uri, &from_uri, &to_uri, publish->expiration); if (status == PJSIP_EINVALIDURI) { pj_pool_t *pool; @@ -635,7 +789,7 @@ static int sip_outbound_publish_client_alloc(void *data) if (!pool) { ast_log(LOG_ERROR, "Could not create pool for URI validation on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); - pjsip_publishc_destroy(publish->state->client); + pjsip_publishc_destroy(client->client); return -1; } @@ -665,10 +819,10 @@ static int sip_outbound_publish_client_alloc(void *data) } pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); - pjsip_publishc_destroy(publish->state->client); + pjsip_publishc_destroy(client->client); return -1; } else if (status != PJ_SUCCESS) { - pjsip_publishc_destroy(publish->state->client); + pjsip_publishc_destroy(client->client); return -1; } @@ -678,51 +832,52 @@ static int sip_outbound_publish_client_alloc(void *data) /*! \brief Callback function for publish client responses */ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) { - RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(param->token), ao2_cleanup); - SCOPED_AO2LOCK(lock, publish->state); + RAII_VAR(struct ast_sip_outbound_publish_client *, client, ao2_bump(param->token), ao2_cleanup); + RAII_VAR(struct ast_sip_outbound_publish *, publish, ao2_bump(client->publish), ao2_cleanup); + SCOPED_AO2LOCK(lock, client); pjsip_tx_data *tdata; - if (publish->state->destroy) { - if (publish->state->sending) { - publish->state->sending = NULL; - if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publish->state))) { + if (client->destroy) { + if (client->sending) { + client->sending = NULL; + + if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(client))) { return; } ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); - ao2_ref(publish->state, -1); + ao2_ref(client, -1); } - /* Once the destroy is called this callback will not get called any longer, so drop the publish ref */ - pjsip_publishc_destroy(publish->state->client); - ao2_ref(publish, -1); + /* Once the destroy is called this callback will not get called any longer, so drop the client ref */ + pjsip_publishc_destroy(client->client); + ao2_ref(client, -1); return; } if (param->code == 401 || param->code == 407) { if (!ast_sip_create_request_with_auth(&publish->outbound_auths, param->rdata, pjsip_rdata_get_tsx(param->rdata), &tdata)) { - pjsip_publishc_send(publish->state->client, tdata); + pjsip_publishc_send(client->client, tdata); } - publish->state->auth_attempts++; + client->auth_attempts++; - if (publish->state->auth_attempts == publish->max_auth_attempts) { - pjsip_publishc_destroy(publish->state->client); - publish->state->client = NULL; + if (client->auth_attempts == publish->max_auth_attempts) { + pjsip_publishc_destroy(client->client); + client->client = NULL; ast_log(LOG_ERROR, "Reached maximum number of PUBLISH authentication attempts on outbound publish '%s'\n", ast_sorcery_object_get_id(publish)); goto end; } - return; } - publish->state->auth_attempts = 0; + client->auth_attempts = 0; if (param->code == 412) { - pjsip_publishc_destroy(publish->state->client); - publish->state->client = NULL; + pjsip_publishc_destroy(client->client); + client->client = NULL; if (sip_outbound_publish_client_alloc(publish)) { ast_log(LOG_ERROR, "Failed to create a new outbound publish client for '%s' on 412 response\n", @@ -731,7 +886,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) } /* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */ - publish->state->sending = NULL; + client->sending = NULL; } else if (param->code == 423) { /* Update the expiration with the new expiration time if available */ pjsip_expires_hdr *expires; @@ -740,34 +895,34 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) if (!expires || !expires->ivalue) { ast_log(LOG_ERROR, "Received 423 response on outbound publish '%s' without a Min-Expires header\n", ast_sorcery_object_get_id(publish)); - pjsip_publishc_destroy(publish->state->client); - publish->state->client = NULL; + pjsip_publishc_destroy(client->client); + client->client = NULL; goto end; } - pjsip_publishc_update_expires(publish->state->client, expires->ivalue); - publish->state->sending = NULL; - } else if (publish->state->sending) { + pjsip_publishc_update_expires(client->client, expires->ivalue); + client->sending = NULL; + } else if (client->sending) { /* Remove the message currently being sent so that when the queue is serviced another will get sent */ - AST_LIST_REMOVE_HEAD(&publish->state->queue, entry); - ast_free(publish->state->sending); - publish->state->sending = NULL; + AST_LIST_REMOVE_HEAD(&client->queue, entry); + ast_free(client->sending); + client->sending = NULL; } - if (AST_LIST_EMPTY(&publish->state->queue)) { - schedule_publish_refresh(publish, param->rdata); + if (AST_LIST_EMPTY(&client->queue)) { + schedule_publish_refresh(client, param->rdata); } end: - if (!publish->state->client) { + if (!client->client) { struct sip_outbound_publish_message *message; - while ((message = AST_LIST_REMOVE_HEAD(&publish->state->queue, entry))) { + while ((message = AST_LIST_REMOVE_HEAD(&client->queue, entry))) { ast_free(message); } } else { - if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(publish->state))) { - ao2_ref(publish->state, -1); + if (ast_sip_push_task(NULL, sip_publish_client_service_queue, ao2_bump(client))) { + ao2_ref(client, -1); } } } @@ -831,31 +986,43 @@ static int datastore_cmp(void *obj, void *arg, int flags) return CMP_MATCH; } -/*! \brief Allocator function for publish client state */ -static struct ast_sip_outbound_publish_client *sip_outbound_publish_state_alloc(void) +/*! \brief Allocator function for publish client */ +static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc( + struct ast_sip_outbound_publish *publish) { - struct ast_sip_outbound_publish_client *state = ao2_alloc(sizeof(*state), sip_outbound_publish_client_destroy); + const char *id = ast_sorcery_object_get_id(publish); + struct ast_sip_outbound_publish_state *state = + ao2_alloc(sizeof(*state) + strlen(id) + 1, sip_outbound_publish_state_destroy); if (!state) { return NULL; } - state->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp); - if (!state->datastores) { + state->client = ao2_alloc(sizeof(*state->client), sip_outbound_publish_client_destroy); + if (!state->client) { + ao2_ref(state, -1); + return NULL; + } + + state->client->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp); + if (!state->client->datastores) { ao2_ref(state, -1); return NULL; } - state->timer.user_data = state; - state->timer.cb = sip_outbound_publish_timer_cb; + state->client->timer.user_data = state->client; + state->client->timer.cb = sip_outbound_publish_timer_cb; + state->client->publish = ao2_bump(publish); + strcpy(state->id, id); return state; } /*! \brief Apply function which finds or allocates a state structure */ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *obj) { - RAII_VAR(struct ast_sip_outbound_publish *, existing, ast_sorcery_retrieve_by_id(sorcery, "outbound-publish", ast_sorcery_object_get_id(obj)), ao2_cleanup); + RAII_VAR(struct ao2_container *, states, ao2_global_obj_ref(current_states), ao2_cleanup); + RAII_VAR(struct ast_sip_outbound_publish_state *, state, NULL, ao2_cleanup); struct ast_sip_outbound_publish *applied = obj; if (ast_strlen_zero(applied->server_uri)) { @@ -868,24 +1035,47 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o return -1; } - if (!existing) { - /* If no existing publish exists we can just start fresh easily */ - applied->state = sip_outbound_publish_state_alloc(); - } else { - /* If there is an existing publish things are more complicated, we can immediately reuse this state if most stuff remains unchanged */ - if (can_reuse_publish(existing, applied)) { - applied->state = existing->state; - ao2_ref(applied->state, +1); - } else { - applied->state = sip_outbound_publish_state_alloc(); + if (!new_states) { + /* make sure new_states has been allocated as we will be adding to it */ + new_states = ao2_container_alloc_options( + AO2_ALLOC_OPT_LOCK_NOLOCK, DEFAULT_STATE_BUCKETS, + outbound_publish_state_hash, outbound_publish_state_cmp); + + if (!new_states) { + ast_log(LOG_ERROR, "Unable to allocate new states container\n"); + return -1; + } + } + + if (states) { + state = ao2_find(states, ast_sorcery_object_get_id(obj), OBJ_SEARCH_KEY); + if (state) { + if (can_reuse_publish(state->client->publish, applied)) { + ao2_replace(state->client->publish, applied); + } else { + ao2_ref(state, -1); + state = NULL; + } } } - if (!applied->state) { + if (!state) { + state = sip_outbound_publish_state_alloc(applied); + if (!state) { + ast_log(LOG_ERROR, "Unable to create state for outbound publish '%s'\n", + ast_sorcery_object_get_id(applied)); + return -1; + }; + } + + if (ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, state->client)) { + ast_log(LOG_ERROR, "Unable to create client for outbound publish '%s'\n", + ast_sorcery_object_get_id(applied)); return -1; } - return ast_sip_push_task_synchronous(NULL, sip_outbound_publish_client_alloc, applied); + ao2_link(new_states, state); + return 0; } static int outbound_auth_handler(const struct aco_option *opt, struct ast_variable *var, void *obj) @@ -895,74 +1085,9 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab return ast_sip_auth_vector_init(&publish->outbound_auths, var->value); } -/*! \brief Helper function which prunes old publish clients */ -static void prune_publish_clients(const char *object_type) -{ - struct ao2_container *old, *current; - - old = ao2_global_obj_ref(active); - if (old) { - struct ao2_iterator i; - struct ast_sip_outbound_publish *existing; - - i = ao2_iterator_init(old, 0); - for (; (existing = ao2_iterator_next(&i)); ao2_ref(existing, -1)) { - struct ast_sip_outbound_publish *created; - - created = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "outbound-publish", - ast_sorcery_object_get_id(existing)); - if (created) { - if (created->state == existing->state) { - ao2_ref(created, -1); - continue; - } - ao2_ref(created, -1); - } - - ao2_lock(existing->state); - - /* If this publish client is currently publishing stop and terminate any refresh timer */ - if (existing->state->started) { - struct ast_sip_event_publisher_handler *handler = find_publisher_handler_for_event_name(existing->event); - - if (handler) { - handler->stop_publishing(existing->state); - } - - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(existing->state))) { - ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n", - ast_sorcery_object_get_id(existing)); - ao2_ref(existing->state, -1); - } - } - - /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */ - if (!existing->state->sending) { - if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(existing->state))) { - ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", - ast_sorcery_object_get_id(existing)); - ao2_ref(existing->state, -1); - } - } - - existing->state->destroy = 1; - ao2_unlock(existing->state); - } - ao2_iterator_destroy(&i); - - ao2_ref(old, -1); - } - - current = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "outbound-publish", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); - ao2_global_obj_replace_unref(active, current); -} - -static struct ast_sorcery_observer outbound_publish_observer = { - .loaded = prune_publish_clients, -}; - static int load_module(void) { + ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish"); ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish"); if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL, @@ -970,8 +1095,6 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - ast_sorcery_observer_add(ast_sip_get_sorcery(), "outbound-publish", &outbound_publish_observer); - ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "type", "", OPT_NOOP_T, 0, 0); ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "server_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, server_uri)); ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "from_uri", "", OPT_STRINGFIELD_T, 0, STRFLDSET(struct ast_sip_outbound_publish, from_uri)); @@ -981,6 +1104,7 @@ static int load_module(void) ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "expiration", "3600", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, expiration)); ast_sorcery_object_field_register(ast_sip_get_sorcery(), "outbound-publish", "max_auth_attempts", "5", OPT_UINT_T, 0, FLDSET(struct ast_sip_outbound_publish, max_auth_attempts)); ast_sorcery_object_field_register_custom(ast_sip_get_sorcery(), "outbound-publish", "outbound_auth", "", outbound_auth_handler, NULL, NULL, 0, 0); + ast_sorcery_reload_object(ast_sip_get_sorcery(), "outbound-publish"); AST_RWLIST_RDLOCK(&publisher_handlers); @@ -1004,7 +1128,44 @@ static int reload_module(void) static int unload_module(void) { - return 0; + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 10, + .tv_nsec = start.tv_usec * 1000 + }; + int res = 0; + struct ao2_container *states = ao2_global_obj_ref(current_states); + + if (!states || !(unloading.count = ao2_container_count(states))) { + return 0; + } + ao2_ref(states, -1); + + ast_mutex_init(&unloading.lock); + ast_cond_init(&unloading.cond, NULL); + ast_mutex_lock(&unloading.lock); + + unloading.is_unloading = 1; + ao2_global_obj_release(current_states); + + /* wait for items to unpublish */ + ast_verb(5, "Waiting to complete unpublishing task(s)\n"); + while (unloading.count) { + res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end); + } + ast_mutex_unlock(&unloading.lock); + + ast_mutex_destroy(&unloading.lock); + ast_cond_destroy(&unloading.cond); + + if (res) { + ast_verb(5, "At least %d items were unable to unpublish " + "in the allowed time\n", unloading.count); + } else { + ast_verb(5, "All items successfully unpublished\n"); + } + + return res; } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support", |