summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Colp <jcolp@digium.com>2015-11-12 10:56:04 -0600
committerGerrit Code Review <gerrit2@gerrit.digium.api>2015-11-12 10:56:05 -0600
commitcd51b0aeacd6ca41da31039316e898f673686c1e (patch)
tree53aaeadc78d446fe607a9f05733cd6d98e8167bf
parentdb93c357ce2c3ddbf5f25f7b2a63755c39628e86 (diff)
parent2f9cb7d62bf1ee2d3f7d878607d2d1eb9995dd03 (diff)
Merge "res_pjsip: Deny requests when threadpool queue is backed up." into 13
-rw-r--r--include/asterisk/res_pjsip.h6
-rw-r--r--include/asterisk/taskprocessor.h6
-rw-r--r--include/asterisk/threadpool.h6
-rw-r--r--main/taskprocessor.c9
-rw-r--r--main/threadpool.c5
-rw-r--r--res/res_pjsip.c5
-rw-r--r--res/res_pjsip/pjsip_distributor.c14
7 files changed, 44 insertions, 7 deletions
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 459082901..37b766211 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2116,4 +2116,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
*/
const char *ast_sip_get_host_ip_string(int af);
+/*!
+ * \brief Return the size of the SIP threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_sip_threadpool_queue_size(void);
+
#endif /* _RES_PJSIP_H */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f16f144cb..06368867a 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -262,4 +262,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
*/
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
+/*!
+ * \brief Return the current size of the taskprocessor queue
+ * \since 13.7.0
+ */
+long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+
#endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 75ce0e4e4..0f360c7a4 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+/*!
+ * \brief Return the size of the threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_threadpool_queue_size(struct ast_threadpool *pool);
+
#endif /* ASTERISK_THREADPOOL_H */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index f382814af..eefa85f61 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
return task;
}
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+long ast_taskprocessor_size(struct ast_taskprocessor *tps)
{
return (tps) ? tps->tps_queue_size : -1;
}
@@ -765,7 +762,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
struct tps_task *t;
- int size;
+ long size;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@@ -797,7 +794,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
* after we pop an empty stack.
*/
tps->executing = 0;
- size = tps_taskprocessor_depth(tps);
+ size = ast_taskprocessor_size(tps);
/* If we executed a task, bump the stats */
if (tps->stats) {
tps->stats->_tasks_processed_count++;
diff --git a/main/threadpool.c b/main/threadpool.c
index 46de9b7f8..60e1e9a3b 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -1397,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
{
return ast_threadpool_serializer_group(name, pool, NULL);
}
+
+long ast_threadpool_queue_size(struct ast_threadpool *pool)
+{
+ return ast_taskprocessor_size(pool->tps);
+}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index d2b393fcc..babbe7aaa 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3755,6 +3755,11 @@ static void remove_request_headers(pjsip_endpoint *endpt)
}
}
+long ast_sip_threadpool_queue_size(void)
+{
+ return ast_threadpool_queue_size(sip_threadpool);
+}
+
AST_TEST_DEFINE(xml_sanitization_end_null)
{
char sanitized[8];
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 9b052603a..1d39e0fd2 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = {
.on_rx_request = endpoint_lookup,
};
+#define SIP_MAX_QUEUE 500L
+
static pj_bool_t distributor(pjsip_rx_data *rdata)
{
pjsip_dialog *dlg = find_dialog(rdata);
@@ -280,7 +282,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
}
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
+ /* When the threadpool is backed up this much, there is a good chance that we have encountered
+ * some sort of terrible condition and don't need to be adding more work to the threadpool.
+ * It's in our best interest to send back a 503 response and be done with it.
+ */
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ } else {
+ ast_sip_push_task(serializer, distribute, clone);
+ }
end:
if (dlg) {