diff options
author | Joshua Colp <jcolp@digium.com> | 2016-06-09 16:45:24 -0500 |
---|---|---|
committer | Gerrit Code Review <gerrit2@gerrit.digium.api> | 2016-06-09 16:45:24 -0500 |
commit | 9acb5e3084485c6757430e29e4645458cf202e7e (patch) | |
tree | dd2cc8b0994a12b767ff68a865115408f03a9051 | |
parent | f5ffcb1b72e941a501c818c148e940ea0003145a (diff) | |
parent | 2ff26e97467fabc390cf887fcda84f2e26b83895 (diff) |
Merge "pjsip_distributor.c: Consistently pick a serializer for messages."
-rw-r--r-- | include/asterisk/res_pjsip.h | 11 | ||||
-rw-r--r-- | res/res_pjsip/pjsip_distributor.c | 160 |
2 files changed, 166 insertions, 5 deletions
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 50d02d980..d1f0c9825 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1301,6 +1301,17 @@ struct ast_serializer_shutdown_group; struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group); /*! + * \brief Determine the distributor serializer for the SIP message. + * \since 13.10.0 + * + * \param rdata The incoming message. + * + * \retval Calculated distributor serializer on success. + * \retval NULL on error. + */ +struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata); + +/*! * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized * * Passing a NULL serializer is a way to remove a serializer from a dialog. diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 288a3e00a..75ae461cd 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -59,6 +59,12 @@ struct unidentified_request{ char src_name[]; }; +/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */ +#define DISTRIBUTOR_POOL_SIZE 31 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE]; + /*! * \internal * \brief Record the task's serializer name on the tdata structure. @@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata) return dlg; } +/*! + * \internal + * \brief Compute a hash value on a pjlib string + * \since 13.10.0 + * + * \param[in] str The pjlib string to add to the hash + * \param[in] hash The hash value to add to + * + * \details + * This version of the function is for when you need to compute a + * string hash of more than one string. + * + * This famous hash algorithm was written by Dan Bernstein and is + * commonly used. + * + * \sa http://www.cse.yorku.ca/~oz/hash.html + */ +static int pjstr_hash_add(pj_str_t *str, int hash) +{ + size_t len; + const char *pos; + + len = pj_strlen(str); + pos = pj_strbuf(str); + while (len--) { + hash = hash * 33 ^ *pos++; + } + + return hash; +} + +/*! + * \internal + * \brief Compute a hash value on a pjlib string + * \since 13.10.0 + * + * \param[in] str The pjlib string to hash + * + * This famous hash algorithm was written by Dan Bernstein and is + * commonly used. + * + * http://www.cse.yorku.ca/~oz/hash.html + */ +static int pjstr_hash(pj_str_t *str) +{ + return pjstr_hash_add(str, 5381); +} + +struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata) +{ + int hash; + pj_str_t *remote_tag; + struct ast_taskprocessor *serializer; + + if (!rdata->msg_info.msg) { + return NULL; + } + + if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) { + remote_tag = &rdata->msg_info.from->tag; + } else { + remote_tag = &rdata->msg_info.to->tag; + } + + /* Compute the hash from the SIP message call-id and remote-tag */ + hash = pjstr_hash(&rdata->msg_info.cid->id); + hash = pjstr_hash_add(remote_tag, hash); + hash = abs(hash); + + serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]); + if (serializer) { + ast_debug(3, "Calculated serializer %s to use for %s\n", + ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata)); + } + return serializer; +} + static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata); static pjsip_module endpoint_mod = { @@ -324,12 +407,23 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n", pjsip_rx_data_get_info(rdata)); serializer = find_request_serializer(rdata); + if (!serializer) { + /* + * Pick a serializer for the unmatched response. Maybe + * the stack can figure out what it is for, or we really + * should just toss it regardless. + */ + serializer = ast_sip_get_distributor_serializer(rdata); + } } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) { /* We have a BYE or CANCEL request without a serializer. */ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL); return PJ_TRUE; + } else { + /* Pick a serializer for the out-of-dialog request. */ + serializer = ast_sip_get_distributor_serializer(rdata); } pjsip_rx_data_clone(rdata, 0, &clone); @@ -349,7 +443,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); pjsip_rx_data_free_cloned(clone); } else { - ast_sip_push_task(serializer, distribute, clone); + if (ast_sip_push_task(serializer, distribute, clone)) { + ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); + pjsip_rx_data_free_cloned(clone); + } } ast_taskprocessor_unreference(serializer); @@ -796,6 +893,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags) return 0; } + static int cli_unid_print_body(void *obj, void *arg, int flags) { struct unidentified_request *unid = obj; @@ -886,6 +984,47 @@ static struct ast_sorcery_observer global_observer = { .loaded = global_loaded, }; +/*! + * \internal + * \brief Shutdown the serializers in the distributor pool. + * \since 13.10.0 + * + * \return Nothing + */ +static void distributor_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) { + ast_taskprocessor_unreference(distributor_pool[idx]); + distributor_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the distributor pool. + * \since 13.10.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int distributor_pool_setup(void) +{ + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + int idx; + + for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) { + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor"); + + distributor_pool[idx] = ast_sip_create_serializer(tps_name); + if (!distributor_pool[idx]) { + return -1; + } + } + return 0; +} int ast_sip_initialize_distributor(void) { @@ -895,6 +1034,11 @@ int ast_sip_initialize_distributor(void) return -1; } + if (distributor_pool_setup()) { + ast_sip_destroy_distributor(); + return -1; + } + prune_context = ast_sched_context_create(); if (!prune_context) { ast_sip_destroy_distributor(); @@ -927,8 +1071,10 @@ int ast_sip_initialize_distributor(void) return -1; } - unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL); + unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL, + AO2_ALLOC_OPT_LOCK_NOLOCK); if (!unid_formatter) { + ast_sip_destroy_distributor(); ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n"); return -1; } @@ -940,6 +1086,7 @@ int ast_sip_initialize_distributor(void) unid_formatter->get_id = cli_unid_get_id; unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id; ast_sip_register_cli_formatter(unid_formatter); + ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands)); return 0; @@ -950,17 +1097,20 @@ void ast_sip_destroy_distributor(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); ast_sip_unregister_cli_formatter(unid_formatter); - internal_sip_unregister_service(&distributor_mod); - internal_sip_unregister_service(&endpoint_mod); internal_sip_unregister_service(&auth_mod); + internal_sip_unregister_service(&endpoint_mod); + internal_sip_unregister_service(&distributor_mod); ao2_cleanup(artificial_auth); ao2_cleanup(artificial_endpoint); - ao2_cleanup(unidentified_requests); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); if (prune_context) { ast_sched_context_destroy(prune_context); } + + distributor_pool_shutdown(); + + ao2_cleanup(unidentified_requests); } |