diff options
author | Joshua Colp <jcolp@digium.com> | 2016-08-04 15:16:33 +0000 |
---|---|---|
committer | Joshua Colp <jcolp@digium.com> | 2016-08-05 06:31:14 -0500 |
commit | 54869e48231a8b24da4b91843e98c4779fb58abd (patch) | |
tree | 9381fbc0233a1ee2da7383f04d31714fed1b58ae /res/res_pjsip_outbound_publish.c | |
parent | e711e57106369fabfe46eefb43f7c30134ffa4e9 (diff) |
res_pjsip_outbound_publish: Use a serializer shutdown group for unload.
This change replaces the custom unload process for the outbound
publish module with the common serializer shutdown group.
ASTERISK-25217 #close
Change-Id: I280a0384d860c486202d87d2d674394cca77ffb6
Diffstat (limited to 'res/res_pjsip_outbound_publish.c')
-rw-r--r-- | res/res_pjsip_outbound_publish.c | 143 |
1 files changed, 61 insertions, 82 deletions
diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 53e15a0a4..0aad5fcdb 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -32,6 +32,7 @@ #include "asterisk/res_pjsip_outbound_publish.h" #include "asterisk/module.h" #include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" #include "asterisk/datastore.h" /*** DOCUMENTATION @@ -204,6 +205,8 @@ struct sip_outbound_publisher { struct sip_outbound_publish_message *sending; /*! \brief Publish client should be destroyed */ unsigned int destroy; + /*! \brief Serializer for stuff and things */ + struct ast_taskprocessor *serializer; /*! \brief User, if any, associated with the publisher */ char user[0]; }; @@ -242,13 +245,11 @@ AST_RWLOCK_DEFINE_STATIC(load_lock); AO2_STRING_FIELD_HASH_FN(sip_outbound_publisher, user); AO2_STRING_FIELD_CMP_FN(sip_outbound_publisher, user); -/*! \brief Unloading data */ -struct unloading_data { - int is_unloading; - int count; - ast_mutex_t lock; - ast_cond_t cond; -} unloading; +/*! Time needs to be long enough for a transaction to timeout if nothing replies. */ +#define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */ + +/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */ +static struct ast_serializer_shutdown_group *shutdown_group; /*! \brief Default number of client state container buckets */ #define DEFAULT_STATE_BUCKETS 31 @@ -772,7 +773,7 @@ fatal: ast_free(message); service: - if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) { ao2_ref(publisher, -1); } return -1; @@ -815,7 +816,7 @@ static int publisher_client_send(void *obj, void *arg, void *data, int flags) AST_LIST_INSERT_TAIL(&publisher->queue, message, entry); - *res = ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher)); + *res = ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher)); if (*res) { ao2_ref(publisher, -1); } @@ -1008,25 +1009,17 @@ static void sip_outbound_publisher_destroy(void *obj) } ao2_cleanup(publisher->owner); - ast_free(publisher->from_uri); ast_free(publisher->to_uri); - /* if unloading the module and all objects have been unpublished - send the signal to finish unloading */ - if (unloading.is_unloading) { - ast_mutex_lock(&unloading.lock); - if (--unloading.count == 0) { - ast_cond_signal(&unloading.cond); - } - ast_mutex_unlock(&unloading.lock); - } + ast_taskprocessor_unreference(publisher->serializer); } static struct sip_outbound_publisher *sip_outbound_publisher_alloc( struct ast_sip_outbound_publish_client *client, const char *user) { struct sip_outbound_publisher *publisher; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; publisher = ao2_alloc(sizeof(*publisher) + (user ? strlen(user) : 0) + 1, sip_outbound_publisher_destroy); @@ -1054,6 +1047,16 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc( *publisher->user = '\0'; } + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/outpub/%s", + ast_sorcery_object_get_id(client->publish)); + + publisher->serializer = ast_sip_create_serializer_group(tps_name, + shutdown_group); + if (!publisher->serializer) { + ao2_ref(publisher, -1); + return NULL; + } + if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) { ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); @@ -1079,7 +1082,7 @@ static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher( * No need to bump the reference here. The task will take care of * removing the reference. */ - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, publisher)) { + if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, publisher)) { ao2_ref(publisher, -1); } return NULL; @@ -1142,13 +1145,13 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags) /* If the publisher was never started, there's nothing to unpublish, so just * destroy the publication and remove its reference to the publisher. */ - if (ast_sip_push_task(NULL, explicit_publish_destroy, ao2_bump(publisher))) { + if (ast_sip_push_task(publisher->serializer, explicit_publish_destroy, ao2_bump(publisher))) { ao2_ref(publisher, -1); } return 0; } - if (ast_sip_push_task(NULL, cancel_refresh_timer_task, ao2_bump(publisher))) { + if (ast_sip_push_task(publisher->serializer, cancel_refresh_timer_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not stop refresh timer on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); ao2_ref(publisher, -1); @@ -1156,7 +1159,7 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags) /* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */ if (!publisher->sending) { - if (ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { + if (ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) { ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); ao2_ref(publisher, -1); @@ -1255,7 +1258,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param) if (publisher->sending) { publisher->sending = NULL; - if (!ast_sip_push_task(NULL, send_unpublish_task, ao2_bump(publisher))) { + if (!ast_sip_push_task(publisher->serializer, send_unpublish_task, ao2_bump(publisher))) { return; } ast_log(LOG_WARNING, "Could not send unpublish message on outbound publish '%s'\n", @@ -1336,7 +1339,7 @@ end: ast_free(message); } } else { - if (ast_sip_push_task(NULL, sip_publisher_service_queue, ao2_bump(publisher))) { + if (ast_sip_push_task(publisher->serializer, sip_publisher_service_queue, ao2_bump(publisher))) { ao2_ref(publisher, -1); } } @@ -1431,6 +1434,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc( ao2_ref(state, -1); return NULL; } + state->client->publish = ao2_bump(publish); strcpy(state->id, id); @@ -1580,6 +1584,32 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab return ast_sip_auth_vector_init(&publish->outbound_auths, var->value); } + +static int unload_module(void) +{ + int remaining; + + ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish"); + + ao2_global_obj_release(current_states); + + /* Wait for publication serializers to get destroyed. */ + ast_debug(2, "Waiting for publication to complete for unload.\n"); + remaining = ast_serializer_shutdown_group_join(shutdown_group, MAX_UNLOAD_TIMEOUT_TIME); + if (remaining) { + ast_log(LOG_WARNING, "Unload incomplete. Could not stop %d outbound publications. Try again later.\n", + remaining); + return -1; + } + + ast_debug(2, "Successful shutdown.\n"); + + ao2_cleanup(shutdown_group); + shutdown_group = NULL; + + return 0; +} + static int load_module(void) { CHECK_PJSIP_MODULE_LOADED(); @@ -1587,12 +1617,18 @@ static int load_module(void) /* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */ ast_pjproject_get_buildopt("PJSIP_MAX_URL_SIZE", "%d", &pjsip_max_url_size); + shutdown_group = ast_serializer_shutdown_group_alloc(); + if (!shutdown_group) { + return AST_MODULE_LOAD_FAILURE; + } + ast_sorcery_apply_config(ast_sip_get_sorcery(), "res_pjsip_outbound_publish"); ast_sorcery_apply_default(ast_sip_get_sorcery(), "outbound-publish", "config", "pjsip.conf,criteria=type=outbound-publish"); if (ast_sorcery_object_register(ast_sip_get_sorcery(), "outbound-publish", sip_outbound_publish_alloc, NULL, sip_outbound_publish_apply)) { ast_log(LOG_ERROR, "Unable to register 'outbound-publish' type with sorcery\n"); + unload_module(); return AST_MODULE_LOAD_DECLINE; } @@ -1629,63 +1665,6 @@ static int reload_module(void) return 0; } -static int current_publishing_count(void *obj, void *arg, int flags) -{ - struct ast_sip_outbound_publish_state *state = obj; - unloading.count += ao2_container_count(state->client->publishers); - return 0; -} - -static int unload_module(void) -{ - struct timeval start = ast_tvnow(); - struct timespec end = { - .tv_sec = start.tv_sec + 10, - .tv_nsec = start.tv_usec * 1000 - }; - int res = 0; - struct ao2_container *states = ao2_global_obj_ref(current_states); - - if (!states) { - return 0; - } - - unloading.count = 0; - ao2_callback(states, OBJ_NODATA, current_publishing_count, NULL); - ao2_ref(states, -1); - - if (!unloading.count) { - return 0; - } - - ast_mutex_init(&unloading.lock); - ast_cond_init(&unloading.cond, NULL); - ast_mutex_lock(&unloading.lock); - - unloading.is_unloading = 1; - ao2_global_obj_release(current_states); - - /* wait for items to unpublish */ - ast_verb(5, "Waiting to complete unpublishing task(s)\n"); - while (unloading.count && !res) { - res = ast_cond_timedwait(&unloading.cond, &unloading.lock, &end); - } - ast_mutex_unlock(&unloading.lock); - - ast_mutex_destroy(&unloading.lock); - ast_cond_destroy(&unloading.cond); - - if (res) { - ast_verb(5, "At least %d items were unable to unpublish " - "in the allowed time\n", unloading.count); - } else { - ast_verb(5, "All items successfully unpublished\n"); - ast_sorcery_object_unregister(ast_sip_get_sorcery(), "outbound-publish"); - } - - return res; -} - AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP Outbound Publish Support", .load = load_module, .reload = reload_module, |