summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Michelson <mmichelson@digium.com>2015-11-11 17:11:53 -0600
committerMark Michelson <mmichelson@digium.com>2015-11-12 11:39:41 -0500
commit264c74aa22aca7dffdf054ca1512a3638f3fc6dc (patch)
tree751ee9be5d225c76a5bbc341ad3ea9393929c149
parent469ad02404abeadd6f9594113e811a8840312f47 (diff)
res_pjsip: Deny requests when threadpool queue is backed up.
We have observed situations where the SIP threadpool may become deadlocked. However, because incoming traffic is still arriving, the SIP threadpool's queue can continue to grow, eventually running the system out of memory. This change makes it so that incoming traffic gets rejected with a 503 response if the queue is backed up too much. Change-Id: I4e736d48a2ba79fd1f8056c0dcd330e38e6a3816
-rw-r--r--include/asterisk/res_pjsip.h6
-rw-r--r--include/asterisk/taskprocessor.h6
-rw-r--r--include/asterisk/threadpool.h6
-rw-r--r--main/taskprocessor.c9
-rw-r--r--main/threadpool.c5
-rw-r--r--res/res_pjsip.c5
-rw-r--r--res/res_pjsip/pjsip_distributor.c14
7 files changed, 44 insertions, 7 deletions
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index f159a572e..95b8f23eb 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2104,4 +2104,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
*/
const char *ast_sip_get_host_ip_string(int af);
+/*!
+ * \brief Return the size of the SIP threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_sip_threadpool_queue_size(void);
+
#endif /* _RES_PJSIP_H */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f16f144cb..06368867a 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -262,4 +262,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
*/
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
+/*!
+ * \brief Return the current size of the taskprocessor queue
+ * \since 13.7.0
+ */
+long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+
#endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 75ce0e4e4..0f360c7a4 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+/*!
+ * \brief Return the size of the threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_threadpool_queue_size(struct ast_threadpool *pool);
+
#endif /* ASTERISK_THREADPOOL_H */
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index 0719deec2..91125ad2a 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
return task;
}
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+long ast_taskprocessor_size(struct ast_taskprocessor *tps)
{
return (tps) ? tps->tps_queue_size : -1;
}
@@ -765,7 +762,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
struct tps_task *t;
- int size;
+ long size;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@@ -797,7 +794,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
* after we pop an empty stack.
*/
tps->executing = 0;
- size = tps_taskprocessor_depth(tps);
+ size = ast_taskprocessor_size(tps);
/* If we executed a task, bump the stats */
if (tps->stats) {
tps->stats->_tasks_processed_count++;
diff --git a/main/threadpool.c b/main/threadpool.c
index 46de9b7f8..60e1e9a3b 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -1397,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
{
return ast_threadpool_serializer_group(name, pool, NULL);
}
+
+long ast_threadpool_queue_size(struct ast_threadpool *pool)
+{
+ return ast_taskprocessor_size(pool->tps);
+}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index cdaed4ee7..9d0540d42 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3809,6 +3809,11 @@ static void remove_request_headers(pjsip_endpoint *endpt)
}
}
+long ast_sip_threadpool_queue_size(void)
+{
+ return ast_threadpool_queue_size(sip_threadpool);
+}
+
AST_TEST_DEFINE(xml_sanitization_end_null)
{
char sanitized[8];
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 9b052603a..1d39e0fd2 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = {
.on_rx_request = endpoint_lookup,
};
+#define SIP_MAX_QUEUE 500L
+
static pj_bool_t distributor(pjsip_rx_data *rdata)
{
pjsip_dialog *dlg = find_dialog(rdata);
@@ -280,7 +282,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
}
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
+ /* When the threadpool is backed up this much, there is a good chance that we have encountered
+ * some sort of terrible condition and don't need to be adding more work to the threadpool.
+ * It's in our best interest to send back a 503 response and be done with it.
+ */
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ } else {
+ ast_sip_push_task(serializer, distribute, clone);
+ }
end:
if (dlg) {