diff options
-rw-r--r-- | CHANGES | 7 | ||||
-rw-r--r-- | apps/app_confbridge.c | 18 | ||||
-rw-r--r-- | apps/app_queue.c | 1 | ||||
-rw-r--r-- | apps/confbridge/conf_config_parser.c | 4 | ||||
-rw-r--r-- | apps/confbridge/include/confbridge.h | 2 | ||||
-rw-r--r-- | channels/chan_iax2.c | 6 | ||||
-rw-r--r-- | channels/chan_sip.c | 18 | ||||
-rw-r--r-- | configs/samples/confbridge.conf.sample | 6 | ||||
-rw-r--r-- | include/asterisk/res_pjsip.h | 6 | ||||
-rw-r--r-- | include/asterisk/taskprocessor.h | 8 | ||||
-rw-r--r-- | include/asterisk/threadpool.h | 6 | ||||
-rw-r--r-- | main/format_cap.c | 8 | ||||
-rw-r--r-- | main/hashtab.c | 2 | ||||
-rw-r--r-- | main/taskprocessor.c | 13 | ||||
-rw-r--r-- | main/threadpool.c | 72 | ||||
-rw-r--r-- | res/res_pjsip.c | 11 | ||||
-rw-r--r-- | res/res_pjsip/pjsip_distributor.c | 16 | ||||
-rw-r--r-- | res/res_rtp_asterisk.c | 4 | ||||
-rw-r--r-- | tests/test_format_cap.c | 2 | ||||
-rw-r--r-- | tests/test_threadpool.c | 83 |
20 files changed, 260 insertions, 33 deletions
@@ -12,6 +12,13 @@ --- Functionality changes from Asterisk 13.6.0 to Asterisk 13.7.0 ------------ ------------------------------------------------------------------------------ +ConfBridge +------------------ + * A new "timeout" user profile option has been added. This configures the number + of seconds that a participant may stay in the ConfBridge after joining. When + the time expires, the user is ejected from the conference and CONFBRIDGE_RESULT + is set to "TIMEOUT" on the channel. + chan_sip ------------------ * The websockets_enabled option has been added to the general section of diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c index 8045680e4..5976d39db 100644 --- a/apps/app_confbridge.c +++ b/apps/app_confbridge.c @@ -118,6 +118,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") <value name="KICKED">The channel was kicked from the conference.</value> <value name="ENDMARKED">The channel left the conference as a result of the last marked user leaving.</value> <value name="DTMF">The channel pressed a DTMF sequence to exit the conference.</value> + <value name="TIMEOUT">The channel reached its configured timeout.</value> </variable> </variablelist> </description> @@ -1542,6 +1543,13 @@ static int conf_get_pin(struct ast_channel *chan, struct confbridge_user *user) return -1; } +static int user_timeout(struct ast_bridge_channel *bridge_channel, void *ignore) +{ + ast_bridge_channel_leave_bridge(bridge_channel, BRIDGE_CHANNEL_STATE_END, 0); + pbx_builtin_setvar_helper(bridge_channel->chan, "CONFBRIDGE_RESULT", "TIMEOUT"); + return -1; +} + static int conf_rec_name(struct confbridge_user *user, const char *conf_name) { char destdir[PATH_MAX]; @@ -1776,6 +1784,16 @@ static int confbridge_exec(struct ast_channel *chan, const char *data) ast_autoservice_stop(chan); } + if (user.u_profile.timeout) { + ast_bridge_interval_hook(&user.features, + 0, + user.u_profile.timeout * 1000, + user_timeout, + NULL, + NULL, + AST_BRIDGE_HOOK_REMOVE_ON_PULL); + } + /* See if we need to automatically set this user as a video source or not */ handle_video_on_join(conference, user.chan, ast_test_flag(&user.u_profile, USER_OPT_MARKEDUSER)); diff --git a/apps/app_queue.c b/apps/app_queue.c index 6dfb14356..5a8dcd246 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -6500,7 +6500,6 @@ static int try_calling(struct queue_ent *qe, struct ast_flags opts, char **opt_a ast_test_flag(&(bridge_config.features_caller), AST_FEATURE_DISCONNECT), forwardsallowed, ringing); - ast_channel_unlock(qe->chan); ao2_lock(qe->parent); if (qe->parent->strategy == QUEUE_STRATEGY_RRMEMORY || qe->parent->strategy == QUEUE_STRATEGY_RRORDERED) { store_next_rr(qe, outgoing); diff --git a/apps/confbridge/conf_config_parser.c b/apps/confbridge/conf_config_parser.c index 6d6f7ab34..b8b1e2a9c 100644 --- a/apps/confbridge/conf_config_parser.c +++ b/apps/confbridge/conf_config_parser.c @@ -227,6 +227,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") <configOption name="template"> <synopsis>When using the CONFBRIDGE dialplan function, use a user profile as a template for creating a new temporary profile</synopsis> </configOption> + <configOption name="timeout"> + <synopsis>Kick the user out of the conference after this many seconds. 0 means there is no timeout for the user.</synopsis> + </configOption> </configObject> <configObject name="bridge_profile"> <synopsis>A named profile to apply to specific bridges.</synopsis> @@ -2109,6 +2112,7 @@ int conf_load_config(void) aco_option_register(&cfg_info, "dsp_silence_threshold", ACO_EXACT, user_types, __stringify(DEFAULT_SILENCE_THRESHOLD), OPT_UINT_T, 0, FLDSET(struct user_profile, silence_threshold)); aco_option_register(&cfg_info, "dsp_talking_threshold", ACO_EXACT, user_types, __stringify(DEFAULT_TALKING_THRESHOLD), OPT_UINT_T, 0, FLDSET(struct user_profile, talking_threshold)); aco_option_register(&cfg_info, "jitterbuffer", ACO_EXACT, user_types, "no", OPT_BOOLFLAG_T, 1, FLDSET(struct user_profile, flags), USER_OPT_JITTERBUFFER); + aco_option_register(&cfg_info, "timeout", ACO_EXACT, user_types, "0", OPT_UINT_T, 0, FLDSET(struct user_profile, timeout)); /* This option should only be used with the CONFBRIDGE dialplan function */ aco_option_register_custom(&cfg_info, "template", ACO_EXACT, user_types, NULL, user_template_handler, 0); diff --git a/apps/confbridge/include/confbridge.h b/apps/confbridge/include/confbridge.h index 6f8439a9c..8d2dffb1c 100644 --- a/apps/confbridge/include/confbridge.h +++ b/apps/confbridge/include/confbridge.h @@ -140,6 +140,8 @@ struct user_profile { unsigned int talking_threshold; /*! The time in ms of silence before a user is considered to be silent by the dsp. */ unsigned int silence_threshold; + /*! The time in ms the user may stay in the confbridge */ + unsigned int timeout; }; enum conf_sounds { diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 5af0bfe1d..42a538fd4 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -7114,7 +7114,7 @@ static char *handle_cli_iax2_unregister(struct ast_cli_entry *e, int cmd, struct p = find_peer(a->argv[2], 1); if (p) { - if (p->expire > 0) { + if (p->expire > -1) { struct iax2_peer *peer; peer = ao2_find(peers, a->argv[2], OBJ_KEY); @@ -7146,8 +7146,8 @@ static char *complete_iax2_unregister(const char *line, const char *word, int po if (pos == 2) { struct ao2_iterator i = ao2_iterator_init(peers, 0); while ((p = ao2_iterator_next(&i))) { - if (!strncasecmp(p->name, word, wordlen) && - ++which > state && p->expire > 0) { + if (!strncasecmp(p->name, word, wordlen) && + ++which > state && p->expire > -1) { res = ast_strdup(p->name); peer_unref(p); break; diff --git a/channels/chan_sip.c b/channels/chan_sip.c index cda437024..c4d26d56b 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -20829,7 +20829,7 @@ static char *sip_unregister(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SHOWUSAGE; if ((peer = sip_find_peer(a->argv[2], NULL, load_realtime, FINDPEERS, TRUE, 0))) { - if (peer->expire > 0) { + if (peer->expire > -1) { AST_SCHED_DEL_UNREF(sched, peer->expire, sip_unref_peer(peer, "remove register expire ref")); expire_register(sip_ref_peer(peer, "ref for expire_register")); @@ -21408,7 +21408,7 @@ static char *complete_sip_registered_peer(const char *word, int state, int flags while ((peer = ao2_t_iterator_next(&i, "iterate thru peers table"))) { if (!strncasecmp(word, peer->name, wordlen) && (!flags2 || ast_test_flag(&peer->flags[1], flags2)) && - ++which > state && peer->expire > 0) + ++which > state && peer->expire > -1) result = ast_strdup(peer->name); if (result) { sip_unref_peer(peer, "toss iterator peer ptr before break"); @@ -30264,13 +30264,11 @@ static struct ast_variable *add_var(const char *buf, struct ast_variable *list) /*! \brief Set peer defaults before configuring specific configurations */ static void set_peer_defaults(struct sip_peer *peer) { - if (peer->expire == 0) { + if (peer->expire < 0) { /* Don't reset expire or port time during reload if we have an active registration */ - peer->expire = -1; - peer->pokeexpire = -1; - peer->keepalivesend = -1; + peer_sched_cleanup(peer); set_socket_transport(&peer->socket, AST_TRANSPORT_UDP); } peer->type = SIP_TYPE_PEER; @@ -30355,6 +30353,10 @@ static struct sip_peer *temp_peer(const char *name) } ast_atomic_fetchadd_int(&apeerobjs, 1); + peer->expire = -1; + peer->pokeexpire = -1; + peer->keepalivesend = -1; + set_peer_defaults(peer); ast_copy_string(peer->name, name, sizeof(peer->name)); @@ -30475,6 +30477,10 @@ static struct sip_peer *build_peer(const char *name, struct ast_variable *v, str ast_debug(3, "-REALTIME- peer built. Name: %s. Peer objects: %d\n", name, rpeerobjs); } else ast_atomic_fetchadd_int(&speerobjs, 1); + + peer->expire = -1; + peer->pokeexpire = -1; + peer->keepalivesend = -1; } /* Note that our peer HAS had its reference count increased */ diff --git a/configs/samples/confbridge.conf.sample b/configs/samples/confbridge.conf.sample index 860f1cb87..0419001eb 100644 --- a/configs/samples/confbridge.conf.sample +++ b/configs/samples/confbridge.conf.sample @@ -137,6 +137,12 @@ type=user ; This option is off by default. ;announcement=</path/to/file> ; Play a sound file to the user when they join the conference. +;timeout=3600 ; When set non-zero, this specifies the number of seconds that the participant + ; may stay in the conference before being automatically ejected. When the user + ; is ejected from the conference, the user's channel will have the CONFBRIDGE_RESULT + ; variable set to "TIMEOUT". A value of 0 indicates that there is no timeout. + ; Default: 0 + ; --- ConfBridge Bridge Profile Options --- [default_bridge] type=bridge diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 459082901..37b766211 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2116,4 +2116,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); */ const char *ast_sip_get_host_ip_string(int af); +/*! + * \brief Return the size of the SIP threadpool's task queue + * \since 13.7.0 + */ +long ast_sip_threadpool_queue_size(void); + #endif /* _RES_PJSIP_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f16f144cb..6ebf0729c 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -56,6 +56,8 @@ struct ast_taskprocessor; +#define AST_TASKPROCESSOR_HIGH_WATER_LEVEL 500 + /*! * \brief ast_tps_options for specification of taskprocessor options * @@ -262,4 +264,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); +/*! + * \brief Return the current size of the taskprocessor queue + * \since 13.7.0 + */ +long ast_taskprocessor_size(struct ast_taskprocessor *tps); + #endif /* __AST_TASKPROCESSOR_H__ */ diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 75ce0e4e4..0f360c7a4 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group); +/*! + * \brief Return the size of the threadpool's task queue + * \since 13.7.0 + */ +long ast_threadpool_queue_size(struct ast_threadpool *pool); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/format_cap.c b/main/format_cap.c index 4739efa0a..8d6046ab2 100644 --- a/main/format_cap.c +++ b/main/format_cap.c @@ -265,7 +265,15 @@ int ast_format_cap_append_by_type(struct ast_format_cap *cap, enum ast_media_typ } format = ast_format_cache_get(codec->name); + + if (format == ast_format_none) { + ao2_ref(format, -1); + ao2_ref(codec, -1); + continue; + } + if (!format || (codec != ast_format_get_codec(format))) { + ao2_cleanup(format); format = ast_format_create(codec); } ao2_ref(codec, -1); diff --git a/main/hashtab.c b/main/hashtab.c index 4b765979f..9b334d4a5 100644 --- a/main/hashtab.c +++ b/main/hashtab.c @@ -745,6 +745,8 @@ struct ast_hashtab_iter *ast_hashtab_start_write_traversal(struct ast_hashtab *t void ast_hashtab_end_traversal(struct ast_hashtab_iter *it) { + if (!it) + return; if (it->tab->do_locking) ast_rwlock_unlock(&it->tab->lock); free(it); diff --git a/main/taskprocessor.c b/main/taskprocessor.c index f382814af..7c50089f2 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap); /*! \brief Remove the front task off the taskprocessor queue */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); -/*! \brief Return the size of the taskprocessor queue */ -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) return task; } -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps) +long ast_taskprocessor_size(struct ast_taskprocessor *tps) { return (tps) ? tps->tps_queue_size : -1; } @@ -716,8 +713,6 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) return NULL; } -#define HIGH_WATER_LEVEL 100 - /* push the task into the taskprocessor queue */ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) { @@ -738,7 +733,7 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t) AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; - if (previous_size >= HIGH_WATER_LEVEL && !tps->high_water_warned) { + if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) { ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n", tps->name, previous_size); tps->high_water_warned = 1; @@ -765,7 +760,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { struct ast_taskprocessor_local local; struct tps_task *t; - int size; + long size; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -797,7 +792,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) * after we pop an empty stack. */ tps->executing = 0; - size = tps_taskprocessor_depth(tps); + size = ast_taskprocessor_size(tps); /* If we executed a task, bump the stats */ if (tps->stats) { tps->stats->_tasks_processed_count++; diff --git a/main/threadpool.c b/main/threadpool.c index d97a7adb8..60e1e9a3b 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -168,7 +168,7 @@ static void *worker_start(void *arg); static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); static int worker_thread_start(struct worker_thread *worker); static int worker_idle(struct worker_thread *worker); -static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static int worker_set_state(struct worker_thread *worker, enum worker_state state); static void worker_shutdown(struct worker_thread *worker); /*! @@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags) worker->id); return 0; } - worker_set_state(worker, ALIVE); + + if (worker_set_state(worker, ALIVE)) { + ast_debug(1, "Failed to activate thread %d. It is dead\n", + worker->id); + /* The worker thread will no longer exist in the active threads or + * idle threads container after this. + */ + ao2_unlink(pool->active_threads, worker); + } + return CMP_MATCH; } @@ -538,20 +547,33 @@ static int queued_task_pushed(void *data) struct task_pushed_data *tpd = data; struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; + unsigned int existing_active; if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } - if (ao2_container_count(pool->idle_threads) == 0) { + + existing_active = ao2_container_count(pool->active_threads); + + /* The first pass transitions any existing idle threads to be active, and + * will also remove any worker threads that have recently entered the dead + * state. + */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + + /* If no idle threads could be transitioned to active grow the pool as permitted. */ + if (ao2_container_count(pool->active_threads) == existing_active) { if (!pool->options.auto_increment) { + ao2_ref(tpd, -1); return 0; } grow(pool, pool->options.auto_increment); + /* An optional second pass transitions any newly added threads. */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); } - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, - activate_thread, pool); - threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; @@ -797,7 +819,7 @@ static int queued_set_size(void *data) /* We don't count zombie threads as being "live" when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + - ao2_container_count(pool->idle_threads); + ao2_container_count(pool->idle_threads); if (current_size == num_threads) { ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n", @@ -806,6 +828,12 @@ static int queued_set_size(void *data) } if (current_size < num_threads) { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_thread, pool); + + /* As the above may have altered the number of current threads update it */ + current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); grow(pool, num_threads - current_size); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, activate_thread, pool); @@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker) * * The threadpool calls into this function in order to let a worker know * how it should proceed. + * + * \retval -1 failure (state transition not permitted) + * \retval 0 success */ -static void worker_set_state(struct worker_thread *worker, enum worker_state state) +static int worker_set_state(struct worker_thread *worker, enum worker_state state) { SCOPED_MUTEX(lock, &worker->lock); + + switch (state) { + case ALIVE: + /* This can occur due to a race condition between being told to go active + * and an idle timeout happening. + */ + if (worker->state == DEAD) { + return -1; + } + ast_assert(worker->state != ZOMBIE); + break; + case DEAD: + break; + case ZOMBIE: + ast_assert(worker->state != DEAD); + break; + } + worker->state = state; worker->wake_up = 1; ast_cond_signal(&worker->cond); + + return 0; } /*! Serializer group shutdown control object. */ @@ -1346,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast { return ast_threadpool_serializer_group(name, pool, NULL); } + +long ast_threadpool_queue_size(struct ast_threadpool *pool) +{ + return ast_taskprocessor_size(pool->tps); +} diff --git a/res/res_pjsip.c b/res/res_pjsip.c index d2b393fcc..8e99c55d4 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3155,9 +3155,11 @@ static pj_status_t endpt_send_request(struct ast_sip_endpoint *endpoint, char errmsg[PJ_ERR_MSG_SIZE]; if (timeout > 0) { - pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt), + int timers_cancelled = pj_timer_heap_cancel_if_active(pjsip_endpt_get_timer_heap(endpt), req_wrapper->timeout_timer, TIMER_INACTIVE); - ao2_ref(req_wrapper, -1); + if (timers_cancelled > 0) { + ao2_ref(req_wrapper, -1); + } } /* Complain of failure to send the request. */ @@ -3755,6 +3757,11 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } +long ast_sip_threadpool_queue_size(void) +{ + return ast_threadpool_queue_size(sip_threadpool); +} + AST_TEST_DEFINE(xml_sanitization_end_null) { char sanitized[8]; diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 9b052603a..0e0e90f4e 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = { .on_rx_request = endpoint_lookup, }; +#define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3) + static pj_bool_t distributor(pjsip_rx_data *rdata) { pjsip_dialog *dlg = find_dialog(rdata); @@ -280,7 +282,19 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint); } - ast_sip_push_task(serializer, distribute, clone); + if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) { + /* When the threadpool is backed up this much, there is a good chance that we have encountered + * some sort of terrible condition and don't need to be adding more work to the threadpool. + * It's in our best interest to send back a 503 response and be done with it. + */ + if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + } + ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); + pjsip_rx_data_free_cloned(clone); + } else { + ast_sip_push_task(serializer, distribute, clone); + } end: if (dlg) { diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index e2062416e..f6bf34211 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -4780,7 +4780,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro return; } else { if (rtp->rtcp) { - if (rtp->rtcp->schedid > 0) { + if (rtp->rtcp->schedid > -1) { if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) { /* Successfully cancelled scheduler entry. */ ao2_ref(instance, -1); @@ -4997,7 +4997,7 @@ static void ast_rtp_stop(struct ast_rtp_instance *instance) } #endif - if (rtp->rtcp && rtp->rtcp->schedid > 0) { + if (rtp->rtcp && rtp->rtcp->schedid > -1) { if (!ast_sched_del(rtp->sched, rtp->rtcp->schedid)) { /* successfully cancelled scheduler entry. */ ao2_ref(instance, -1); diff --git a/tests/test_format_cap.c b/tests/test_format_cap.c index e1ae553c7..ed5427fa2 100644 --- a/tests/test_format_cap.c +++ b/tests/test_format_cap.c @@ -245,7 +245,7 @@ AST_TEST_DEFINE(format_cap_append_all_unknown) } else if (!ast_format_cap_has_type(caps, AST_MEDIA_TYPE_VIDEO)) { ast_test_status_update(test, "Added all media formats but no video formats exist when they should\n"); return AST_TEST_FAIL; - } else if ((ast_format_cap_count(caps) + 1) != ast_codec_get_max()) { + } else if ((ast_format_cap_count(caps) + 1) != (ast_codec_get_max() - 1)) { ast_test_status_update(test, "The number of formats in the capabilities structure does not match known number\n"); return AST_TEST_FAIL; } diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index f5b1073de..42181a25c 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -571,6 +571,87 @@ end: return res; } +AST_TEST_DEFINE(threadpool_thread_timeout_thrash) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 1, + .auto_increment = 1, + .initial_size = 0, + .max_size = 1, + }; + int iteration; + + switch (cmd) { + case TEST_INIT: + info->name = "thread_timeout_thrash"; + info->category = "/main/threadpool/"; + info->summary = "Thrash threadpool thread timeout"; + info->description = + "Repeatedly queue a task when a threadpool thread should timeout."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + for (iteration = 0; iteration < 30; ++iteration) { + struct simple_task_data *std = NULL; + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + options.idle_timeout, + .tv_nsec = start.tv_usec * 1000 + }; + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + /* Wait until the threadpool thread should timeout due to being idle */ + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + /* This purposely left empty as we want to loop waiting for a time out */ + } + ast_mutex_unlock(&tld->lock); + + ast_threadpool_push(pool, simple_task, std); + } + + res = wait_until_thread_state(test, tld, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 30, 0, 0, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + AST_TEST_DEFINE(threadpool_one_task_one_thread) { struct ast_threadpool *pool = NULL; @@ -1610,6 +1691,7 @@ static int unload_module(void) ast_test_unregister(threadpool_thread_creation); ast_test_unregister(threadpool_thread_destruction); ast_test_unregister(threadpool_thread_timeout); + ast_test_unregister(threadpool_thread_timeout_thrash); ast_test_unregister(threadpool_one_task_one_thread); ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); @@ -1630,6 +1712,7 @@ static int load_module(void) ast_test_register(threadpool_thread_creation); ast_test_register(threadpool_thread_destruction); ast_test_register(threadpool_thread_timeout); + ast_test_register(threadpool_thread_timeout_thrash); ast_test_register(threadpool_one_task_one_thread); ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks); |