summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES7
-rw-r--r--apps/app_confbridge.c18
-rw-r--r--apps/app_queue.c1
-rw-r--r--apps/confbridge/conf_config_parser.c4
-rw-r--r--apps/confbridge/include/confbridge.h2
-rw-r--r--channels/chan_iax2.c6
-rw-r--r--channels/chan_sip.c18
-rw-r--r--configs/samples/confbridge.conf.sample6
-rw-r--r--include/asterisk/res_pjsip.h6
-rw-r--r--include/asterisk/taskprocessor.h8
-rw-r--r--include/asterisk/threadpool.h6
-rw-r--r--main/format_cap.c8
-rw-r--r--main/hashtab.c2
-rw-r--r--main/taskprocessor.c13
-rw-r--r--main/threadpool.c72
-rw-r--r--res/res_pjsip.c11
-rw-r--r--res/res_pjsip/pjsip_distributor.c16
-rw-r--r--res/res_rtp_asterisk.c4
-rw-r--r--tests/test_format_cap.c2
-rw-r--r--tests/test_threadpool.c83
20 files changed, 260 insertions, 33 deletions
diff --git a/CHANGES b/CHANGES
index 88fe060bb..4a55a06ff 100644
--- a/CHANGES
+++ b/CHANGES
@@ -12,6 +12,13 @@
--- Functionality changes from Asterisk 13.6.0 to Asterisk 13.7.0 ------------
------------------------------------------------------------------------------
+ConfBridge
+------------------
+ * A new "timeout" user profile option has been added. This configures the number
+ of seconds that a participant may stay in the ConfBridge after joining. When
+ the time expires, the user is ejected from the conference and CONFBRIDGE_RESULT
+ is set to "TIMEOUT" on the channel.
+
chan_sip
------------------
* The websockets_enabled option has been added to the general section of
diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c
index 8045680e4..5976d39db 100644
--- a/apps/app_confbridge.c
+++ b/apps/app_confbridge.c
@@ -118,6 +118,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
<value name="KICKED">The channel was kicked from the conference.</value>
<value name="ENDMARKED">The channel left the conference as a result of the last marked user leaving.</value>
<value name="DTMF">The channel pressed a DTMF sequence to exit the conference.</value>
+ <value name="TIMEOUT">The channel reached its configured timeout.</value>
</variable>
</variablelist>
</description>
@@ -1542,6 +1543,13 @@ static int conf_get_pin(struct ast_channel *chan, struct confbridge_user *user)
return -1;
}
+static int user_timeout(struct ast_bridge_channel *bridge_channel, void *ignore)
+{
+ ast_bridge_channel_leave_bridge(bridge_channel, BRIDGE_CHANNEL_STATE_END, 0);
+ pbx_builtin_setvar_helper(bridge_channel->chan, "CONFBRIDGE_RESULT", "TIMEOUT");
+ return -1;
+}
+
static int conf_rec_name(struct confbridge_user *user, const char *conf_name)
{
char destdir[PATH_MAX];
@@ -1776,6 +1784,16 @@ static int confbridge_exec(struct ast_channel *chan, const char *data)
ast_autoservice_stop(chan);
}
+ if (user.u_profile.timeout) {
+ ast_bridge_interval_hook(&user.features,
+ 0,
+ user.u_profile.timeout * 1000,
+ user_timeout,
+ NULL,
+ NULL,
+ AST_BRIDGE_HOOK_REMOVE_ON_PULL);
+ }
+
/* See if we need to automatically set this user as a video source or not */
handle_video_on_join(conference, user.chan, ast_test_flag(&user.u_profile, USER_OPT_MARKEDUSER));
diff --git a/apps/app_queue.c b/apps/app_queue.c
index 6dfb14356..5a8dcd246 100644
--- a/apps/app_queue.c
+++ b/apps/app_queue.c
@@ -6500,7 +6500,6 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a
ast_test_flag(&(bridge_config.features_caller), AST_FEATURE_DISCONNECT),
forwardsallowed, ringing);
- ast_channel_unlock(qe->chan);
ao2_lock(qe->parent);
if (qe->parent->strategy == QUEUE_STRATEGY_RRMEMORY || qe->parent->strategy == QUEUE_STRATEGY_RRORDERED) {
store_next_rr(qe, outgoing);
diff --git a/apps/confbridge/conf_config_parser.c b/apps/confbridge/conf_config_parser.c
index 6d6f7ab34..b8b1e2a9c 100644
--- a/apps/confbridge/conf_config_parser.c
+++ b/apps/confbridge/conf_config_parser.c
@@ -227,6 +227,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
<configOption name="template">
<synopsis>When using the CONFBRIDGE dialplan function, use a user profile as a template for creating a new temporary profile</synopsis>
</configOption>
+ <configOption name="timeout">
+ <synopsis>Kick the user out of the conference after this many seconds. 0 means there is no timeout for the user.</synopsis>
+ </configOption>
</configObject>
<configObject name="bridge_profile">
<synopsis>A named profile to apply to specific bridges.</synopsis>
@@ -2109,6 +2112,7 @@ int conf_load_config(void)
aco_option_register(&cfg_info, "dsp_silence_threshold", ACO_EXACT, user_types, __stringify(DEFAULT_SILENCE_THRESHOLD), OPT_UINT_T, 0, FLDSET(struct user_profile, silence_threshold));
aco_option_register(&cfg_info, "dsp_talking_threshold", ACO_EXACT, user_types, __stringify(DEFAULT_TALKING_THRESHOLD), OPT_UINT_T, 0, FLDSET(struct user_profile, talking_threshold));
aco_option_register(&cfg_info, "jitterbuffer", ACO_EXACT, user_types, "no", OPT_BOOLFLAG_T, 1, FLDSET(struct user_profile, flags), USER_OPT_JITTERBUFFER);
+ aco_option_register(&cfg_info, "timeout", ACO_EXACT, user_types, "0", OPT_UINT_T, 0, FLDSET(struct user_profile, timeout));
/* This option should only be used with the CONFBRIDGE dialplan function */
aco_option_register_custom(&cfg_info, "template", ACO_EXACT, user_types, NULL, user_template_handler, 0);
diff --git a/apps/confbridge/include/confbridge.h b/apps/confbridge/include/confbridge.h
index 6f8439a9c..8d2dffb1c 100644
--- a/apps/confbridge/include/confbridge.h
+++ b/apps/confbridge/include/confbridge.h
@@ -140,6 +140,8 @@ struct user_profile {
unsigned int talking_threshold;
/*! The time in ms of silence before a user is considered to be silent by the dsp. */
unsigned int silence_threshold;
+ /*! The time in ms the user may stay in the confbridge */
+ unsigned int timeout;
};
enum conf_sounds {
diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c
index 5af0bfe1d..42a538fd4 100644
--- a/channels/chan_iax2.c
+++ b/channels/chan_iax2.c
@@ -7114,7 +7114,7 @@ static char *handle_cli_iax2_unregister(struct ast_cli_entry *e, int cmd, struct
p = find_peer(a->argv[2], 1);
if (p) {
- if (p->expire > 0) {
+ if (p->expire > -1) {
struct iax2_peer *peer;
peer = ao2_find(peers, a->argv[2], OBJ_KEY);
@@ -7146,8 +7146,8 @@ static char *complete_iax2_unregister(const char *line, const char *word, int po
if (pos == 2) {
struct ao2_iterator i = ao2_iterator_init(peers, 0);
while ((p = ao2_iterator_next(&i))) {
- if (!strncasecmp(p->name, word, wordlen) &&
- ++which > state && p->expire > 0) {
+ if (!strncasecmp(p->name, word, wordlen) &&
+ ++which > state && p->expire > -1) {
res = ast_strdup(p->name);
peer_unref(p);
break;
diff --git a/channels/chan_sip.c b/channels/chan_sip.c
index cda437024..c4d26d56b 100644
--- a/channels/chan_sip.c
+++ b/channels/chan_sip.c
@@ -20829,7 +20829,7 @@ static char *sip_unregister(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SHOWUSAGE;
if ((peer = sip_find_peer(a->argv[2], NULL, load_realtime, FINDPEERS, TRUE, 0))) {
- if (peer->expire > 0) {
+ if (peer->expire > -1) {
AST_SCHED_DEL_UNREF(sched, peer->expire,
sip_unref_peer(peer, "remove register expire ref"));
expire_register(sip_ref_peer(peer, "ref for expire_register"));
@@ -21408,7 +21408,7 @@ static char *complete_sip_registered_peer(const char *word, int state, int flags
while ((peer = ao2_t_iterator_next(&i, "iterate thru peers table"))) {
if (!strncasecmp(word, peer->name, wordlen) &&
(!flags2 || ast_test_flag(&peer->flags[1], flags2)) &&
- ++which > state && peer->expire > 0)
+ ++which > state && peer->expire > -1)
result = ast_strdup(peer->name);
if (result) {
sip_unref_peer(peer, "toss iterator peer ptr before break");
@@ -30264,13 +30264,11 @@ static struct ast_variable *add_var(const char *buf, struct ast_variable *list)
/*! \brief Set peer defaults before configuring specific configurations */
static void set_peer_defaults(struct sip_peer *peer)
{
- if (peer->expire == 0) {
+ if (peer->expire < 0) {
/* Don't reset expire or port time during reload
if we have an active registration
*/
- peer->expire = -1;
- peer->pokeexpire = -1;
- peer->keepalivesend = -1;
+ peer_sched_cleanup(peer);
set_socket_transport(&peer->socket, AST_TRANSPORT_UDP);
}
peer->type = SIP_TYPE_PEER;
@@ -30355,6 +30353,10 @@ static struct sip_peer *temp_peer(const char *name)
}
ast_atomic_fetchadd_int(&apeerobjs, 1);
+ peer->expire = -1;
+ peer->pokeexpire = -1;
+ peer->keepalivesend = -1;
+
set_peer_defaults(peer);
ast_copy_string(peer->name, name, sizeof(peer->name));
@@ -30475,6 +30477,10 @@ static struct sip_peer *build_peer(const char *name, struct ast_variable *v, str
ast_debug(3, "-REALTIME- peer built. Name: %s. Peer objects: %d\n", name, rpeerobjs);
} else
ast_atomic_fetchadd_int(&speerobjs, 1);
+
+ peer->expire = -1;
+ peer->pokeexpire = -1;
+ peer->keepalivesend = -1;
}
/* Note that our peer HAS had its reference count increased */
diff --git a/configs/samples/confbridge.conf.sample b/configs/samples/confbridge.conf.sample
index 860f1cb87..0419001eb 100644
--- a/configs/samples/confbridge.conf.sample
+++ b/configs/samples/confbridge.conf.sample
@@ -137,6 +137,12 @@ type=user
; This option is off by default.
;announcement=</path/to/file> ; Play a sound file to the user when they join the conference.
+;timeout=3600 ; When set non-zero, this specifies the number of seconds that the participant
+ ; may stay in the conference before being automatically ejected. When the user
+ ; is ejected from the conference, the user's channel will have the CONFBRIDGE_RESULT
+ ; variable set to "TIMEOUT". A value of 0 indicates that there is no timeout.
+ ; Default: 0
+
; --- ConfBridge Bridge Profile Options ---
[default_bridge]
type=bridge
diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h
index 459082901..37b766211 100644
--- a/include/asterisk/res_pjsip.h
+++ b/include/asterisk/res_pjsip.h
@@ -2116,4 +2116,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr);
*/
const char *ast_sip_get_host_ip_string(int af);
+/*!
+ * \brief Return the size of the SIP threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_sip_threadpool_queue_size(void);
+
#endif /* _RES_PJSIP_H */
diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index f16f144cb..6ebf0729c 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -56,6 +56,8 @@
struct ast_taskprocessor;
+#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500
+
/*!
* \brief ast_tps_options for specification of taskprocessor options
*
@@ -262,4 +264,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
*/
const char *ast_taskprocessor_name(struct ast_taskprocessor *tps);
+/*!
+ * \brief Return the current size of the taskprocessor queue
+ * \since 13.7.0
+ */
+long ast_taskprocessor_size(struct ast_taskprocessor *tps);
+
#endif /* __AST_TASKPROCESSOR_H__ */
diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h
index 75ce0e4e4..0f360c7a4 100644
--- a/include/asterisk/threadpool.h
+++ b/include/asterisk/threadpool.h
@@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group);
+/*!
+ * \brief Return the size of the threadpool's task queue
+ * \since 13.7.0
+ */
+long ast_threadpool_queue_size(struct ast_threadpool *pool);
+
#endif /* ASTERISK_THREADPOOL_H */
diff --git a/main/format_cap.c b/main/format_cap.c
index 4739efa0a..8d6046ab2 100644
--- a/main/format_cap.c
+++ b/main/format_cap.c
@@ -265,7 +265,15 @@ int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_typ
}
format = ast_format_cache_get(codec->name);
+
+ if (format == ast_format_none) {
+ ao2_ref(format, -1);
+ ao2_ref(codec, -1);
+ continue;
+ }
+
if (!format || (codec != ast_format_get_codec(format))) {
+ ao2_cleanup(format);
format = ast_format_create(codec);
}
ao2_ref(codec, -1);
diff --git a/main/hashtab.c b/main/hashtab.c
index 4b765979f..9b334d4a5 100644
--- a/main/hashtab.c
+++ b/main/hashtab.c
@@ -745,6 +745,8 @@ struct ast_hashtab_iter *ast_hashtab_start_write_traversal(struct ast_hashtab *t
void ast_hashtab_end_traversal(struct ast_hashtab_iter *it)
{
+ if (!it)
+ return;
if (it->tab->do_locking)
ast_rwlock_unlock(&it->tab->lock);
free(it);
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index f382814af..7c50089f2 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap);
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
-/*! \brief Return the size of the taskprocessor queue */
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
-
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
@@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
return task;
}
-static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
+long ast_taskprocessor_size(struct ast_taskprocessor *tps)
{
return (tps) ? tps->tps_queue_size : -1;
}
@@ -716,8 +713,6 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
return NULL;
}
-#define HIGH_WATER_LEVEL 100
-
/* push the task into the taskprocessor queue */
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
@@ -738,7 +733,7 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
- if (previous_size >= HIGH_WATER_LEVEL && !tps->high_water_warned) {
+ if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
tps->name, previous_size);
tps->high_water_warned = 1;
@@ -765,7 +760,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
struct tps_task *t;
- int size;
+ long size;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@@ -797,7 +792,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
* after we pop an empty stack.
*/
tps->executing = 0;
- size = tps_taskprocessor_depth(tps);
+ size = ast_taskprocessor_size(tps);
/* If we executed a task, bump the stats */
if (tps->stats) {
tps->stats->_tasks_processed_count++;
diff --git a/main/threadpool.c b/main/threadpool.c
index d97a7adb8..60e1e9a3b 100644
--- a/main/threadpool.c
+++ b/main/threadpool.c
@@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
-static void worker_set_state(struct worker_thread *worker, enum worker_state state);
+static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);
/*!
@@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker->id);
return 0;
}
- worker_set_state(worker, ALIVE);
+
+ if (worker_set_state(worker, ALIVE)) {
+ ast_debug(1, "Failed to activate thread %d. It is dead\n",
+ worker->id);
+ /* The worker thread will no longer exist in the active threads or
+ * idle threads container after this.
+ */
+ ao2_unlink(pool->active_threads, worker);
+ }
+
return CMP_MATCH;
}
@@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
+ unsigned int existing_active;
if (pool->listener && pool->listener->callbacks->task_pushed) {
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
}
- if (ao2_container_count(pool->idle_threads) == 0) {
+
+ existing_active = ao2_container_count(pool->active_threads);
+
+ /* The first pass transitions any existing idle threads to be active, and
+ * will also remove any worker threads that have recently entered the dead
+ * state.
+ */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
+
+ /* If no idle threads could be transitioned to active grow the pool as permitted. */
+ if (ao2_container_count(pool->active_threads) == existing_active) {
if (!pool->options.auto_increment) {
+ ao2_ref(tpd, -1);
return 0;
}
grow(pool, pool->options.auto_increment);
+ /* An optional second pass transitions any newly added threads. */
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+ activate_thread, pool);
}
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
-
threadpool_send_state_changed(pool);
ao2_ref(tpd, -1);
return 0;
@@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
+ ao2_container_count(pool->idle_threads);
if (current_size == num_threads) {
ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
@@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if (current_size < num_threads) {
+ ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+ activate_thread, pool);
+
+ /* As the above may have altered the number of current threads update it */
+ current_size = ao2_container_count(pool->active_threads) +
+ ao2_container_count(pool->idle_threads);
grow(pool, num_threads - current_size);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_thread, pool);
@@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
+ *
+ * \retval -1 failure (state transition not permitted)
+ * \retval 0 success
*/
-static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+static int worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
+
+ switch (state) {
+ case ALIVE:
+ /* This can occur due to a race condition between being told to go active
+ * and an idle timeout happening.
+ */
+ if (worker->state == DEAD) {
+ return -1;
+ }
+ ast_assert(worker->state != ZOMBIE);
+ break;
+ case DEAD:
+ break;
+ case ZOMBIE:
+ ast_assert(worker->state != DEAD);
+ break;
+ }
+
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
+
+ return 0;
}
/*! Serializer group shutdown control object. */
@@ -1346,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast
{
return ast_threadpool_serializer_group(name, pool, NULL);
}
+
+long ast_threadpool_queue_size(struct ast_threadpool *pool)
+{
+ return ast_taskprocessor_size(pool->tps);
+}
diff --git a/res/res_pjsip.c b/res/res_pjsip.c
index d2b393fcc..8e99c55d4 100644
--- a/res/res_pjsip.c
+++ b/res/res_pjsip.c
@@ -3155,9 +3155,11 @@ static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint,
char errmsg[PJ_ERR_MSG_SIZE];
if (timeout > 0) {
- pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
+ int timers_cancelled = pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt),
req_wrapper->timeout_timer, TIMER_INACTIVE);
- ao2_ref(req_wrapper, -1);
+ if (timers_cancelled > 0) {
+ ao2_ref(req_wrapper, -1);
+ }
}
/* Complain of failure to send the request. */
@@ -3755,6 +3757,11 @@ static void remove_request_headers(pjsip_endpoint *endpt)
}
}
+long ast_sip_threadpool_queue_size(void)
+{
+ return ast_threadpool_queue_size(sip_threadpool);
+}
+
AST_TEST_DEFINE(xml_sanitization_end_null)
{
char sanitized[8];
diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c
index 9b052603a..0e0e90f4e 100644
--- a/res/res_pjsip/pjsip_distributor.c
+++ b/res/res_pjsip/pjsip_distributor.c
@@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = {
.on_rx_request = endpoint_lookup,
};
+#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
+
static pj_bool_t distributor(pjsip_rx_data *rdata)
{
pjsip_dialog *dlg = find_dialog(rdata);
@@ -280,7 +282,19 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
}
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
+ /* When the threadpool is backed up this much, there is a good chance that we have encountered
+ * some sort of terrible condition and don't need to be adding more work to the threadpool.
+ * It's in our best interest to send back a 503 response and be done with it.
+ */
+ if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+ pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
+ }
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ } else {
+ ast_sip_push_task(serializer, distribute, clone);
+ }
end:
if (dlg) {
diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c
index e2062416e..f6bf34211 100644
--- a/res/res_rtp_asterisk.c
+++ b/res/res_rtp_asterisk.c
@@ -4780,7 +4780,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
return;
} else {
if (rtp->rtcp) {
- if (rtp->rtcp->schedid > 0) {
+ if (rtp->rtcp->schedid > -1) {
if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) {
/* Successfully cancelled scheduler entry. */
ao2_ref(instance, -1);
@@ -4997,7 +4997,7 @@ static void ast_rtp_stop(struct ast_rtp_instance *instance)
}
#endif
- if (rtp->rtcp && rtp->rtcp->schedid > 0) {
+ if (rtp->rtcp && rtp->rtcp->schedid > -1) {
if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) {
/* successfully cancelled scheduler entry. */
ao2_ref(instance, -1);
diff --git a/tests/test_format_cap.c b/tests/test_format_cap.c
index e1ae553c7..ed5427fa2 100644
--- a/tests/test_format_cap.c
+++ b/tests/test_format_cap.c
@@ -245,7 +245,7 @@ AST_TEST_DEFINE(format_cap_append_all_unknown)
} else if (!ast_format_cap_has_type(caps, AST_MEDIA_TYPE_VIDEO)) {
ast_test_status_update(test, "Added all media formats but no video formats exist when they should\n");
return AST_TEST_FAIL;
- } else if ((ast_format_cap_count(caps) + 1) != ast_codec_get_max()) {
+ } else if ((ast_format_cap_count(caps) + 1) != (ast_codec_get_max() - 1)) {
ast_test_status_update(test, "The number of formats in the capabilities structure does not match known number\n");
return AST_TEST_FAIL;
}
diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c
index f5b1073de..42181a25c 100644
--- a/tests/test_threadpool.c
+++ b/tests/test_threadpool.c
@@ -571,6 +571,87 @@ end:
return res;
}
+AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
+{
+ struct ast_threadpool *pool = NULL;
+ struct ast_threadpool_listener *listener = NULL;
+ enum ast_test_result_state res = AST_TEST_FAIL;
+ struct test_listener_data *tld = NULL;
+ struct ast_threadpool_options options = {
+ .version = AST_THREADPOOL_OPTIONS_VERSION,
+ .idle_timeout = 1,
+ .auto_increment = 1,
+ .initial_size = 0,
+ .max_size = 1,
+ };
+ int iteration;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = "thread_timeout_thrash";
+ info->category = "/main/threadpool/";
+ info->summary = "Thrash threadpool thread timeout";
+ info->description =
+ "Repeatedly queue a task when a threadpool thread should timeout.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+ tld = test_alloc();
+ if (!tld) {
+ return AST_TEST_FAIL;
+ }
+
+ listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
+ if (!listener) {
+ goto end;
+ }
+
+ pool = ast_threadpool_create(info->name, listener, &options);
+ if (!pool) {
+ goto end;
+ }
+
+ ast_threadpool_set_size(pool, 1);
+
+ for (iteration = 0; iteration < 30; ++iteration) {
+ struct simple_task_data *std = NULL;
+ struct timeval start = ast_tvnow();
+ struct timespec end = {
+ .tv_sec = start.tv_sec + options.idle_timeout,
+ .tv_nsec = start.tv_usec * 1000
+ };
+
+ std = simple_task_data_alloc();
+ if (!std) {
+ goto end;
+ }
+
+ /* Wait until the threadpool thread should timeout due to being idle */
+ ast_mutex_lock(&tld->lock);
+ while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
+ /* This purposely left empty as we want to loop waiting for a time out */
+ }
+ ast_mutex_unlock(&tld->lock);
+
+ ast_threadpool_push(pool, simple_task, std);
+ }
+
+ res = wait_until_thread_state(test, tld, 0, 0);
+ if (res == AST_TEST_FAIL) {
+ goto end;
+ }
+
+ res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
+
+end:
+ ast_threadpool_shutdown(pool);
+ ao2_cleanup(listener);
+ ast_free(tld);
+ return res;
+}
+
AST_TEST_DEFINE(threadpool_one_task_one_thread)
{
struct ast_threadpool *pool = NULL;
@@ -1610,6 +1691,7 @@ static int unload_module(void)
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
+ ast_test_unregister(threadpool_thread_timeout_thrash);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -1630,6 +1712,7 @@ static int load_module(void)
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
+ ast_test_register(threadpool_thread_timeout_thrash);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);