summaryrefslogtreecommitdiff
path: root/res/res_pjsip
diff options
context:
space:
mode:
authorRichard Mudgett <rmudgett@digium.com>2016-05-26 17:35:04 -0500
committerRichard Mudgett <rmudgett@digium.com>2016-06-07 13:16:19 -0500
commit16b08444dae47eded183f63300908b49cf6b2c1a (patch)
tree32976263d71b7f2528ee29b2a4bb27de107db6f2 /res/res_pjsip
parent993b769524b5e39aa5749a64e0389f01427235ee (diff)
pjsip_distributor.c: Consistently pick a serializer for messages.
Incoming messages that are not part of a dialog or a recognized response to one of our requests need to be sent to a consistent serializer. Under load we may be queueing retransmissions before we can process the original message. We don't need to throw these messages onto random serializers and cause reentrancy and message sequencing problems. * Created a pool of pjsip/distributor serializers that get picked by hashing the call-id and remote tag strings of the received messages. * Made ast_sip_destroy_distributor() destroy items in the reverse order of creation. ASTERISK-26088 Reported by: Richard Mudgett Change-Id: I2ce769389fc060d9f379977f559026fbcb632407
Diffstat (limited to 'res/res_pjsip')
-rw-r--r--res/res_pjsip/pjsip_distributor.c160
1 files changed, 155 insertions, 5 deletions
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 288a3e00a..528ccb627 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -59,6 +59,12 @@ struct unidentified_request{
char src_name[];
};
+/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */
+#define DISTRIBUTOR_POOL_SIZE 31
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
+
/*!
* \internal
* \brief Record the task's serializer name on the tdata structure.
@@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
return dlg;
}
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to add to the hash
+ * \param[in] hash The hash value to add to
+ *
+ * \details
+ * This version of the function is for when you need to compute a
+ * string hash of more than one string.
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * \sa http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash_add(pj_str_t *str, int hash)
+{
+ size_t len;
+ const char *pos;
+
+ len = pj_strlen(str);
+ pos = pj_strbuf(str);
+ while (len--) {
+ hash = hash * 33 ^ *pos++;
+ }
+
+ return hash;
+}
+
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to hash
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash(pj_str_t *str)
+{
+ return pjstr_hash_add(str, 5381);
+}
+
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
+{
+ int hash;
+ pj_str_t *remote_tag;
+ struct ast_taskprocessor *serializer;
+
+ if (!rdata->msg_info.msg) {
+ return NULL;
+ }
+
+ if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+ remote_tag = &rdata->msg_info.from->tag;
+ } else {
+ remote_tag = &rdata->msg_info.to->tag;
+ }
+
+ /* Compute the hash from the SIP message call-id and remote-tag */
+ hash = pjstr_hash(&rdata->msg_info.cid->id);
+ hash = pjstr_hash_add(remote_tag, hash);
+ hash = abs(hash);
+
+ serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
+ if (serializer) {
+ ast_debug(3, "Calculated serializer %s to use for %s\n",
+ ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
+ }
+ return serializer;
+}
+
static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
static pjsip_module endpoint_mod = {
@@ -324,12 +407,23 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
pjsip_rx_data_get_info(rdata));
serializer = find_request_serializer(rdata);
+ if (!serializer) {
+ /*
+ * Pick a serializer for the unmatched response. Maybe
+ * the stack can figure out what it is for, or we really
+ * should just toss it regardless.
+ */
+ serializer = ast_sip_get_distributor_serializer(rdata);
+ }
} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
|| !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
/* We have a BYE or CANCEL request without a serializer. */
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
return PJ_TRUE;
+ } else {
+ /* Pick a serializer for the out-of-dialog request. */
+ serializer = ast_sip_get_distributor_serializer(rdata);
}
pjsip_rx_data_clone(rdata, 0, &clone);
@@ -349,7 +443,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
} else {
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_push_task(serializer, distribute, clone)) {
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ }
}
ast_taskprocessor_unreference(serializer);
@@ -796,6 +893,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags)
return 0;
}
+
static int cli_unid_print_body(void *obj, void *arg, int flags)
{
struct unidentified_request *unid = obj;
@@ -886,6 +984,47 @@ static struct ast_sorcery_observer global_observer = {
.loaded = global_loaded,
};
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \return Nothing
+ */
+static void distributor_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ ast_taskprocessor_unreference(distributor_pool[idx]);
+ distributor_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int distributor_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
+
+ distributor_pool[idx] = ast_sip_create_serializer_named(tps_name);
+ if (!distributor_pool[idx]) {
+ return -1;
+ }
+ }
+ return 0;
+}
int ast_sip_initialize_distributor(void)
{
@@ -895,6 +1034,11 @@ int ast_sip_initialize_distributor(void)
return -1;
}
+ if (distributor_pool_setup()) {
+ ast_sip_destroy_distributor();
+ return -1;
+ }
+
prune_context = ast_sched_context_create();
if (!prune_context) {
ast_sip_destroy_distributor();
@@ -927,8 +1071,10 @@ int ast_sip_initialize_distributor(void)
return -1;
}
- unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
+ unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!unid_formatter) {
+ ast_sip_destroy_distributor();
ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
return -1;
}
@@ -940,6 +1086,7 @@ int ast_sip_initialize_distributor(void)
unid_formatter->get_id = cli_unid_get_id;
unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
ast_sip_register_cli_formatter(unid_formatter);
+
ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
return 0;
@@ -950,17 +1097,20 @@ void ast_sip_destroy_distributor(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
ast_sip_unregister_cli_formatter(unid_formatter);
- internal_sip_unregister_service(&distributor_mod);
- internal_sip_unregister_service(&endpoint_mod);
internal_sip_unregister_service(&auth_mod);
+ internal_sip_unregister_service(&endpoint_mod);
+ internal_sip_unregister_service(&distributor_mod);
ao2_cleanup(artificial_auth);
ao2_cleanup(artificial_endpoint);
- ao2_cleanup(unidentified_requests);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
if (prune_context) {
ast_sched_context_destroy(prune_context);
}
+
+ distributor_pool_shutdown();
+
+ ao2_cleanup(unidentified_requests);
}