summaryrefslogtreecommitdiff
path: root/res/res_pjsip/pjsip_distributor.c
diff options
context:
space:
mode:
Diffstat (limited to 'res/res_pjsip/pjsip_distributor.c')
-rw-r--r--res/res_pjsip/pjsip_distributor.c160
1 files changed, 155 insertions, 5 deletions
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 288a3e00a..528ccb627 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -59,6 +59,12 @@ struct unidentified_request{
char src_name[];
};
+/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */
+#define DISTRIBUTOR_POOL_SIZE 31
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
+
/*!
* \internal
* \brief Record the task's serializer name on the tdata structure.
@@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
return dlg;
}
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to add to the hash
+ * \param[in] hash The hash value to add to
+ *
+ * \details
+ * This version of the function is for when you need to compute a
+ * string hash of more than one string.
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * \sa http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash_add(pj_str_t *str, int hash)
+{
+ size_t len;
+ const char *pos;
+
+ len = pj_strlen(str);
+ pos = pj_strbuf(str);
+ while (len--) {
+ hash = hash * 33 ^ *pos++;
+ }
+
+ return hash;
+}
+
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to hash
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash(pj_str_t *str)
+{
+ return pjstr_hash_add(str, 5381);
+}
+
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
+{
+ int hash;
+ pj_str_t *remote_tag;
+ struct ast_taskprocessor *serializer;
+
+ if (!rdata->msg_info.msg) {
+ return NULL;
+ }
+
+ if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+ remote_tag = &rdata->msg_info.from->tag;
+ } else {
+ remote_tag = &rdata->msg_info.to->tag;
+ }
+
+ /* Compute the hash from the SIP message call-id and remote-tag */
+ hash = pjstr_hash(&rdata->msg_info.cid->id);
+ hash = pjstr_hash_add(remote_tag, hash);
+ hash = abs(hash);
+
+ serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
+ if (serializer) {
+ ast_debug(3, "Calculated serializer %s to use for %s\n",
+ ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
+ }
+ return serializer;
+}
+
static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
static pjsip_module endpoint_mod = {
@@ -324,12 +407,23 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
pjsip_rx_data_get_info(rdata));
serializer = find_request_serializer(rdata);
+ if (!serializer) {
+ /*
+ * Pick a serializer for the unmatched response. Maybe
+ * the stack can figure out what it is for, or we really
+ * should just toss it regardless.
+ */
+ serializer = ast_sip_get_distributor_serializer(rdata);
+ }
} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
|| !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
/* We have a BYE or CANCEL request without a serializer. */
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
return PJ_TRUE;
+ } else {
+ /* Pick a serializer for the out-of-dialog request. */
+ serializer = ast_sip_get_distributor_serializer(rdata);
}
pjsip_rx_data_clone(rdata, 0, &clone);
@@ -349,7 +443,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
} else {
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_push_task(serializer, distribute, clone)) {
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ }
}
ast_taskprocessor_unreference(serializer);
@@ -796,6 +893,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags)
return 0;
}
+
static int cli_unid_print_body(void *obj, void *arg, int flags)
{
struct unidentified_request *unid = obj;
@@ -886,6 +984,47 @@ static struct ast_sorcery_observer global_observer = {
.loaded = global_loaded,
};
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \return Nothing
+ */
+static void distributor_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ ast_taskprocessor_unreference(distributor_pool[idx]);
+ distributor_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int distributor_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
+
+ distributor_pool[idx] = ast_sip_create_serializer_named(tps_name);
+ if (!distributor_pool[idx]) {
+ return -1;
+ }
+ }
+ return 0;
+}
int ast_sip_initialize_distributor(void)
{
@@ -895,6 +1034,11 @@ int ast_sip_initialize_distributor(void)
return -1;
}
+ if (distributor_pool_setup()) {
+ ast_sip_destroy_distributor();
+ return -1;
+ }
+
prune_context = ast_sched_context_create();
if (!prune_context) {
ast_sip_destroy_distributor();
@@ -927,8 +1071,10 @@ int ast_sip_initialize_distributor(void)
return -1;
}
- unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
+ unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
+ AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!unid_formatter) {
+ ast_sip_destroy_distributor();
ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
return -1;
}
@@ -940,6 +1086,7 @@ int ast_sip_initialize_distributor(void)
unid_formatter->get_id = cli_unid_get_id;
unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
ast_sip_register_cli_formatter(unid_formatter);
+
ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
return 0;
@@ -950,17 +1097,20 @@ void ast_sip_destroy_distributor(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
ast_sip_unregister_cli_formatter(unid_formatter);
- internal_sip_unregister_service(&distributor_mod);
- internal_sip_unregister_service(&endpoint_mod);
internal_sip_unregister_service(&auth_mod);
+ internal_sip_unregister_service(&endpoint_mod);
+ internal_sip_unregister_service(&distributor_mod);
ao2_cleanup(artificial_auth);
ao2_cleanup(artificial_endpoint);
- ao2_cleanup(unidentified_requests);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
if (prune_context) {
ast_sched_context_destroy(prune_context);
}
+
+ distributor_pool_shutdown();
+
+ ao2_cleanup(unidentified_requests);
}