diff options
-rw-r--r-- | channels/chan_pjsip.c | 6 | ||||
-rw-r--r-- | channels/pjsip/dialplan_functions.c | 8 | ||||
-rw-r--r-- | include/asterisk/res_pjsip.h | 109 | ||||
-rw-r--r-- | include/asterisk/rtp_engine.h | 6 | ||||
-rw-r--r-- | res/res_pjsip.c | 90 | ||||
-rw-r--r-- | res/res_pjsip/config_system.c | 2 | ||||
-rw-r--r-- | res/res_pjsip/config_transport.c | 2 | ||||
-rw-r--r-- | res/res_pjsip/pjsip_scheduler.c | 178 | ||||
-rw-r--r-- | res/res_pjsip_header_funcs.c | 22 | ||||
-rw-r--r-- | res/res_pjsip_history.c | 2 | ||||
-rw-r--r-- | res/res_pjsip_outbound_publish.c | 6 | ||||
-rw-r--r-- | res/res_pjsip_outbound_registration.c | 5 | ||||
-rw-r--r-- | res/res_pjsip_pubsub.c | 6 | ||||
-rw-r--r-- | res/res_pjsip_refer.c | 13 | ||||
-rw-r--r-- | res/res_pjsip_sdp_rtp.c | 3 | ||||
-rw-r--r-- | res/res_pjsip_transport_websocket.c | 6 | ||||
-rw-r--r-- | res/res_rtp_asterisk.c | 171 |
17 files changed, 506 insertions, 129 deletions
diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 6b2664819..dde7416c3 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -718,7 +718,7 @@ static int chan_pjsip_answer(struct ast_channel *ast) can occur between this thread and bridging (specifically when native bridging attempts to do direct media) */ ast_channel_unlock(ast); - res = ast_sip_push_task_synchronous(session->serializer, answer, session); + res = ast_sip_push_task_wait_serializer(session->serializer, answer, session); if (res) { if (res == -1) { ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n", @@ -2502,10 +2502,10 @@ static struct ast_channel *chan_pjsip_request_with_stream_topology(const char *t req_data.topology = topology; req_data.dest = data; - /* Default failure value in case ast_sip_push_task_synchronous() itself fails. */ + /* Default failure value in case ast_sip_push_task_wait_servant() itself fails. */ req_data.cause = AST_CAUSE_FAILURE; - if (ast_sip_push_task_synchronous(NULL, request, &req_data)) { + if (ast_sip_push_task_wait_servant(NULL, request, &req_data)) { *cause = req_data.cause; return NULL; } diff --git a/channels/pjsip/dialplan_functions.c b/channels/pjsip/dialplan_functions.c index aa376f892..ce347dcd9 100644 --- a/channels/pjsip/dialplan_functions.c +++ b/channels/pjsip/dialplan_functions.c @@ -897,7 +897,7 @@ int pjsip_acf_channel_read(struct ast_channel *chan, const char *cmd, char *data func_args.field = args.field; func_args.buf = buf; func_args.len = len; - if (ast_sip_push_task_synchronous(func_args.session->serializer, read_pjsip, &func_args)) { + if (ast_sip_push_task_wait_serializer(func_args.session->serializer, read_pjsip, &func_args)) { ast_log(LOG_WARNING, "Unable to read properties of channel %s: failed to push task\n", ast_channel_name(chan)); ao2_ref(func_args.session, -1); return -1; @@ -1219,7 +1219,7 @@ int pjsip_acf_media_offer_write(struct ast_channel *chan, const char *cmd, char mdata.media_type = AST_MEDIA_TYPE_VIDEO; } - return ast_sip_push_task_synchronous(channel->session->serializer, media_offer_write_av, &mdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, media_offer_write_av, &mdata); } int pjsip_acf_dtmf_mode_read(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) @@ -1390,7 +1390,7 @@ int pjsip_acf_dtmf_mode_write(struct ast_channel *chan, const char *cmd, char *d ast_channel_unlock(chan); - return ast_sip_push_task_synchronous(channel->session->serializer, dtmf_mode_refresh_cb, &rdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, dtmf_mode_refresh_cb, &rdata); } static int refresh_write_cb(void *obj) @@ -1438,5 +1438,5 @@ int pjsip_acf_session_refresh_write(struct ast_channel *chan, const char *cmd, c rdata.method = AST_SIP_SESSION_REFRESH_METHOD_UPDATE; } - return ast_sip_push_task_synchronous(channel->session->serializer, refresh_write_cb, &rdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, refresh_write_cb, &rdata); } diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index d3849ad34..092bb8420 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1407,7 +1407,7 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); * the next item on the SIP socket(s) can be serviced. On incoming messages, * Asterisk automatically will push the request to a servant thread. When your * module callback is called, processing will already be in a servant. However, - * for other PSJIP events, such as transaction state changes due to timer + * for other PJSIP events, such as transaction state changes due to timer * expirations, your module will be called into from a PJSIP thread. If you * are called into from a PJSIP thread, then you should push whatever processing * is needed to a servant as soon as possible. You can discern if you are currently @@ -1543,28 +1543,92 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg); int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); /*! - * \brief Push a task to SIP servants and wait for it to complete + * \brief Push a task to SIP servants and wait for it to complete. * - * Like \ref ast_sip_push_task except that it blocks until the task completes. + * Like \ref ast_sip_push_task except that it blocks until the task + * completes. If the current thread is a SIP servant thread then the + * task executes immediately. Otherwise, the specified serializer + * executes the task and the current thread waits for it to complete. * - * \warning \b Never use this function in a SIP servant thread. This can potentially - * cause a deadlock. If you are in a SIP servant thread, just call your function - * in-line. + * \note PJPROJECT callbacks tend to have locks already held when + * called. * - * \warning \b Never hold locks that may be acquired by a SIP servant thread when - * calling this function. Doing so may cause a deadlock if all SIP servant threads - * are blocked waiting to acquire the lock while the thread holding the lock is - * waiting for a free SIP servant thread. + * \warning \b Never hold locks that may be acquired by a SIP servant + * thread when calling this function. Doing so may cause a deadlock + * if all SIP servant threads are blocked waiting to acquire the lock + * while the thread holding the lock is waiting for a free SIP servant + * thread. * - * \param serializer The SIP serializer to which the task belongs. May be NULL. + * \warning \b Use of this function in an ao2 destructor callback is a + * bad idea. You don't have control over which thread executes the + * destructor. Attempting to shift execution to another thread with + * this function is likely to cause deadlock. + * + * \param serializer The SIP serializer to execute the task if the + * current thread is not a SIP servant. NULL if any of the default + * serializers can be used. * \param sip_task The task to execute * \param task_data The parameter to pass to the task when it executes - * \retval 0 Success - * \retval -1 Failure + * + * \note The sip_task() return value may need to be distinguished from + * the failure to push the task. + * + * \return sip_task() return value on success. + * \retval -1 Failure to push the task. + */ +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +/*! + * \brief Push a task to SIP servants and wait for it to complete. + * \deprecated Replaced with ast_sip_push_task_wait_servant(). */ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); /*! + * \brief Push a task to the serializer and wait for it to complete. + * + * Like \ref ast_sip_push_task except that it blocks until the task is + * completed by the specified serializer. If the specified serializer + * is the current thread then the task executes immediately. + * + * \note PJPROJECT callbacks tend to have locks already held when + * called. + * + * \warning \b Never hold locks that may be acquired by a SIP servant + * thread when calling this function. Doing so may cause a deadlock + * if all SIP servant threads are blocked waiting to acquire the lock + * while the thread holding the lock is waiting for a free SIP servant + * thread for the serializer to execute in. + * + * \warning \b Never hold locks that may be acquired by the serializer + * when calling this function. Doing so will cause a deadlock. + * + * \warning \b Never use this function in the pjsip monitor thread (It + * is a SIP servant thread). This is likely to cause a deadlock. + * + * \warning \b Use of this function in an ao2 destructor callback is a + * bad idea. You don't have control over which thread executes the + * destructor. Attempting to shift execution to another thread with + * this function is likely to cause deadlock. + * + * \param serializer The SIP serializer to execute the task. NULL if + * any of the default serializers can be used. + * \param sip_task The task to execute + * \param task_data The parameter to pass to the task when it executes + * + * \note It is generally better to call + * ast_sip_push_task_wait_servant() if you pass NULL for the + * serializer parameter. + * + * \note The sip_task() return value may need to be distinguished from + * the failure to push the task. + * + * \return sip_task() return value on success. + * \retval -1 Failure to push the task. + */ +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +/*! * \brief Determine if the current thread is a SIP servant thread * * \retval 0 This is not a SIP servant thread @@ -1588,13 +1652,13 @@ enum ast_sip_scheduler_task_flags { /*! * Run at a fixed interval. - * Stop scheduling if the callback returns 0. + * Stop scheduling if the callback returns <= 0. * Any other value is ignored. */ AST_SIP_SCHED_TASK_FIXED = (0 << 0), /*! * Run at a variable interval. - * Stop scheduling if the callback returns 0. + * Stop scheduling if the callback returns <= 0. * Any other return value is used as the new interval. */ AST_SIP_SCHED_TASK_VARIABLE = (1 << 0), @@ -1620,16 +1684,23 @@ enum ast_sip_scheduler_task_flags { */ AST_SIP_SCHED_TASK_DATA_FREE = ( 1 << 3 ), - /*! \brief AST_SIP_SCHED_TASK_PERIODIC - * The task is scheduled at multiples of interval + /*! + * \brief The task is scheduled at multiples of interval * \see Interval */ AST_SIP_SCHED_TASK_PERIODIC = (0 << 4), - /*! \brief AST_SIP_SCHED_TASK_DELAY - * The next invocation of the task is at last finish + interval + /*! + * \brief The next invocation of the task is at last finish + interval * \see Interval */ AST_SIP_SCHED_TASK_DELAY = (1 << 4), + /*! + * \brief The scheduled task's events are tracked in the debug log. + * \details + * Schedule events such as scheduling, running, rescheduling, canceling, + * and destroying are logged about the task. + */ + AST_SIP_SCHED_TASK_TRACK = (1 << 5), }; /*! diff --git a/include/asterisk/rtp_engine.h b/include/asterisk/rtp_engine.h index 8f044ce17..3426b2a1e 100644 --- a/include/asterisk/rtp_engine.h +++ b/include/asterisk/rtp_engine.h @@ -292,10 +292,14 @@ struct ast_rtp_payload_type { #define AST_RTP_RTCP_SR 200 /*! Receiver Report */ #define AST_RTP_RTCP_RR 201 +/*! Transport Layer Feed Back (From RFC4585 also RFC5104) */ +#define AST_RTP_RTCP_RTPFB 205 /*! Payload Specific Feed Back (From RFC4585 also RFC5104) */ -#define AST_RTP_RTCP_PSFB 206 +#define AST_RTP_RTCP_PSFB 206 /* Common RTCP feedback message types */ +/*! Generic NACK (From RFC4585 also RFC5104) */ +#define AST_RTP_RTCP_FMT_NACK 1 /*! Picture loss indication (From RFC4585) */ #define AST_RTP_RTCP_FMT_PLI 1 /*! Full INTRA-frame Request (From RFC5104) */ diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 3241d777d..19e6e1d13 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -2743,7 +2743,7 @@ static int register_service(void *data) int ast_sip_register_service(pjsip_module *module) { - return ast_sip_push_task_synchronous(NULL, register_service, &module); + return ast_sip_push_task_wait_servant(NULL, register_service, &module); } static int unregister_service(void *data) @@ -2759,7 +2759,7 @@ static int unregister_service(void *data) void ast_sip_unregister_service(pjsip_module *module) { - ast_sip_push_task_synchronous(NULL, unregister_service, &module); + ast_sip_push_task_wait_servant(NULL, unregister_service, &module); } static struct ast_sip_authenticator *registered_authenticator; @@ -3009,7 +3009,7 @@ static char *cli_dump_endpt(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SHOWUSAGE; } - ast_sip_push_task_synchronous(NULL, do_cli_dump_endpt, a); + ast_sip_push_task_wait_servant(NULL, do_cli_dump_endpt, a); return CLI_SUCCESS; } @@ -4484,21 +4484,30 @@ static int serializer_pool_setup(void) return 0; } +static struct ast_taskprocessor *serializer_pool_pick(void) +{ + struct ast_taskprocessor *serializer; + + unsigned int pos; + + /* + * Pick a serializer to use from the pool. + * + * Note: We don't care about any reentrancy behavior + * when incrementing serializer_pool_pos. If it gets + * incorrectly incremented it doesn't matter. + */ + pos = serializer_pool_pos++; + pos %= SERIALIZER_POOL_SIZE; + serializer = serializer_pool[pos]; + + return serializer; +} + int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - unsigned int pos; - - /* - * Pick a serializer to use from the pool. - * - * Note: We don't care about any reentrancy behavior - * when incrementing serializer_pool_pos. If it gets - * incorrectly incremented it doesn't matter. - */ - pos = serializer_pool_pos++; - pos %= SERIALIZER_POOL_SIZE; - serializer = serializer_pool[pos]; + serializer = serializer_pool_pick(); } return ast_taskprocessor_push(serializer, sip_task, task_data); @@ -4522,9 +4531,8 @@ static int sync_task(void *data) /* * Once we unlock std->lock after signaling, we cannot access - * std again. The thread waiting within - * ast_sip_push_task_synchronous() is free to continue and - * release its local variable (std). + * std again. The thread waiting within ast_sip_push_task_wait() + * is free to continue and release its local variable (std). */ ast_mutex_lock(&std->lock); std->complete = 1; @@ -4534,15 +4542,11 @@ static int sync_task(void *data) return ret; } -int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { /* This method is an onion */ struct sync_task_data std; - if (ast_sip_thread_is_servant()) { - return sip_task(task_data); - } - memset(&std, 0, sizeof(std)); ast_mutex_init(&std.lock); ast_cond_init(&std.cond, NULL); @@ -4566,6 +4570,42 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si return std.fail; } +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + if (ast_sip_thread_is_servant()) { + return sip_task(task_data); + } + + return ast_sip_push_task_wait(serializer, sip_task, task_data); +} + +int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return ast_sip_push_task_wait_servant(serializer, sip_task, task_data); +} + +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + if (!serializer) { + /* Caller doesn't care which PJSIP serializer the task executes under. */ + serializer = serializer_pool_pick(); + if (!serializer) { + /* No serializer picked to execute the task */ + return -1; + } + } + if (ast_taskprocessor_is_task(serializer)) { + /* + * We are the requested serializer so we must execute + * the task now or deadlock waiting on ourself to + * execute it. + */ + return sip_task(task_data); + } + + return ast_sip_push_task_wait(serializer, sip_task, task_data); +} + void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) { size_t chars_to_copy = MIN(size - 1, pj_strlen(src)); @@ -5191,7 +5231,7 @@ static int reload_module(void) * We must wait for the reload to complete so multiple * reloads cannot happen at the same time. */ - if (ast_sip_push_task_synchronous(NULL, reload_configuration_task, NULL)) { + if (ast_sip_push_task_wait_servant(NULL, reload_configuration_task, NULL)) { ast_log(LOG_WARNING, "Failed to reload PJSIP\n"); return -1; } @@ -5208,7 +5248,7 @@ static int unload_module(void) /* The thread this is called from cannot call PJSIP/PJLIB functions, * so we have to push the work to the threadpool to handle */ - ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL); + ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); diff --git a/res/res_pjsip/config_system.c b/res/res_pjsip/config_system.c index dfd92404b..ed2b5d232 100644 --- a/res/res_pjsip/config_system.c +++ b/res/res_pjsip/config_system.c @@ -282,5 +282,5 @@ static int system_create_resolver_and_set_nameservers(void *data) void ast_sip_initialize_dns(void) { - ast_sip_push_task_synchronous(NULL, system_create_resolver_and_set_nameservers, NULL); + ast_sip_push_task_wait_servant(NULL, system_create_resolver_and_set_nameservers, NULL); } diff --git a/res/res_pjsip/config_transport.c b/res/res_pjsip/config_transport.c index 15c03769b..dd7c7049d 100644 --- a/res/res_pjsip/config_transport.c +++ b/res/res_pjsip/config_transport.c @@ -267,7 +267,7 @@ static void sip_transport_state_destroy(void *obj) { struct ast_sip_transport_state *state = obj; - ast_sip_push_task_synchronous(NULL, destroy_sip_transport_state, state); + ast_sip_push_task_wait_servant(NULL, destroy_sip_transport_state, state); } /*! \brief Destructor for ast_sip_transport state information */ diff --git a/res/res_pjsip/pjsip_scheduler.c b/res/res_pjsip/pjsip_scheduler.c index 75945be77..bbf666fd7 100644 --- a/res/res_pjsip/pjsip_scheduler.c +++ b/res/res_pjsip/pjsip_scheduler.c @@ -28,6 +28,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/res_pjsip_cli.h" +#include "asterisk/taskprocessor.h" #define TASK_BUCKETS 53 @@ -36,30 +37,30 @@ static struct ao2_container *tasks; static int task_count; struct ast_sip_sched_task { - /*! ast_sip_sched task id */ - uint32_t task_id; - /*! ast_sched scheudler id */ - int current_scheduler_id; - /*! task is currently running */ - int is_running; - /*! task */ - ast_sip_task task; + /*! The serializer to be used (if any) (Holds a ref) */ + struct ast_taskprocessor *serializer; /*! task data */ void *task_data; - /*! reschedule interval in milliseconds */ - int interval; - /*! the time the task was queued */ + /*! task function */ + ast_sip_task task; + /*! the time the task was originally scheduled/queued */ struct timeval when_queued; /*! the last time the task was started */ struct timeval last_start; /*! the last time the task was ended */ struct timeval last_end; + /*! When the periodic task is next expected to run */ + struct timeval next_periodic; + /*! reschedule interval in milliseconds */ + int interval; + /*! ast_sched scheudler id */ + int current_scheduler_id; + /*! task is currently running */ + int is_running; /*! times run */ int run_count; /*! the task reschedule, cleanup and policy flags */ enum ast_sip_scheduler_task_flags flags; - /*! the serializer to be used (if any) */ - struct ast_taskprocessor *serializer; /*! A name to be associated with the task */ char name[0]; }; @@ -76,14 +77,22 @@ static int push_to_serializer(const void *data); */ static int run_task(void *data) { - RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup); + RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup); int res; int delay; + if (!schtd->interval) { + /* Task was cancelled while waiting to be executed by the serializer */ + return -1; + } + + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Running %s\n", schtd, schtd->name); + } ao2_lock(schtd); schtd->last_start = ast_tvnow(); schtd->is_running = 1; - schtd->run_count++; + ++schtd->run_count; ao2_unlock(schtd); res = schtd->task(schtd->task_data); @@ -93,10 +102,10 @@ static int run_task(void *data) schtd->last_end = ast_tvnow(); /* - * Don't restart if the task returned 0 or if the interval + * Don't restart if the task returned <= 0 or if the interval * was set to 0 while the task was running */ - if (!res || !schtd->interval) { + if (res <= 0 || !schtd->interval) { schtd->interval = 0; ao2_unlock(schtd); ao2_unlink(tasks, schtd); @@ -110,18 +119,31 @@ static int run_task(void *data) if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) { delay = schtd->interval; } else { - delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval); + int64_t diff; + + /* Determine next periodic interval we need to expire. */ + do { + schtd->next_periodic = ast_tvadd(schtd->next_periodic, + ast_samp2tv(schtd->interval, 1000)); + diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end); + } while (diff <= 0); + delay = diff; } schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd); if (schtd->current_scheduler_id < 0) { schtd->interval = 0; ao2_unlock(schtd); + ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name); ao2_unlink(tasks, schtd); return -1; } ao2_unlock(schtd); + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Rescheduled %s for %d ms\n", schtd, schtd->name, + delay); + } return 0; } @@ -133,9 +155,32 @@ static int run_task(void *data) static int push_to_serializer(const void *data) { struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data; + int sched_id; + + ao2_lock(schtd); + sched_id = schtd->current_scheduler_id; + schtd->current_scheduler_id = -1; + ao2_unlock(schtd); + if (sched_id < 0) { + /* Task was cancelled while waiting on the lock */ + return 0; + } + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Ready to run %s\n", schtd, schtd->name); + } + ao2_t_ref(schtd, +1, "Give ref to run_task()"); if (ast_sip_push_task(schtd->serializer, run_task, schtd)) { - ao2_ref(schtd, -1); + /* + * Oh my. Have to cancel the scheduled item because we + * unexpectedly cannot run it anymore. + */ + ao2_unlink(tasks, schtd); + ao2_lock(schtd); + schtd->interval = 0; + ao2_unlock(schtd); + + ao2_t_ref(schtd, -1, "Failed so release run_task() ref"); } return 0; @@ -144,20 +189,26 @@ static int push_to_serializer(const void *data) int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd) { int res; + int sched_id; - if (!ao2_ref_and_lock(schtd)) { - return -1; - } - - if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) { - ao2_unlock_and_unref(schtd); - return 0; + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Canceling %s\n", schtd, schtd->name); } + /* + * Prevent any tasks in the serializer queue from + * running and restarting the scheduled item on us + * first. + */ + ao2_lock(schtd); schtd->interval = 0; - ao2_unlock_and_unref(schtd); + + sched_id = schtd->current_scheduler_id; + schtd->current_scheduler_id = -1; + ao2_unlock(schtd); + res = ast_sched_del(scheduler_context, sched_id); + ao2_unlink(tasks, schtd); - res = ast_sched_del(scheduler_context, schtd->current_scheduler_id); return res; } @@ -306,16 +357,20 @@ int ast_sip_sched_is_task_running_by_name(const char *name) return is_running; } -static void schtd_destructor(void *data) +static void schtd_dtor(void *data) { struct ast_sip_sched_task *schtd = data; + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Destructor %s\n", schtd, schtd->name); + } if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) { /* release our own ref, then release the callers if asked to do so */ ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1); } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) { ast_free(schtd->task_data); } + ast_taskprocessor_unreference(schtd->serializer); } struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer, @@ -326,38 +381,64 @@ struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *seria struct ast_sip_sched_task *schtd; int res; - if (interval < 0) { + if (interval <= 0) { return NULL; } - schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor); + schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), + schtd_dtor); if (!schtd) { return NULL; } - schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1); - schtd->serializer = serializer; + schtd->serializer = ao2_bump(serializer); + schtd->task_data = task_data; schtd->task = sip_task; + schtd->interval = interval; + schtd->flags = flags; if (!ast_strlen_zero(name)) { strcpy(schtd->name, name); /* Safe */ } else { - sprintf(schtd->name, "task_%08x", schtd->task_id); + uint32_t task_id; + + task_id = ast_atomic_fetchadd_int(&task_count, 1); + sprintf(schtd->name, "task_%08x", task_id); + } + if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) { + ast_log(LOG_DEBUG, "Sched %p: Scheduling %s for %d ms\n", schtd, schtd->name, + interval); } - schtd->task_data = task_data; - schtd->flags = flags; - schtd->interval = interval; schtd->when_queued = ast_tvnow(); + if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) { + schtd->next_periodic = ast_tvadd(schtd->when_queued, + ast_samp2tv(schtd->interval, 1000)); + } if (flags & AST_SIP_SCHED_TASK_DATA_AO2) { ao2_ref(task_data, +1); } + + /* + * We must put it in the 'tasks' container before scheduling + * the task because we don't want the push_to_serializer() + * sched task to "remove" it on failure before we even put + * it in. If this happens then nothing would remove it from + * the 'tasks' container. + */ + ao2_link(tasks, schtd); + + /* + * Lock so we are guaranteed to get the sched id set before + * the push_to_serializer() sched task can clear it. + */ + ao2_lock(schtd); res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd); + schtd->current_scheduler_id = res; + ao2_unlock(schtd); if (res < 0) { + ao2_unlink(tasks, schtd); ao2_ref(schtd, -1); return NULL; - } else { - schtd->current_scheduler_id = res; - ao2_link(tasks, schtd); } return schtd; @@ -470,7 +551,8 @@ static struct ast_cli_entry cli_commands[] = { int ast_sip_initialize_scheduler(void) { - if (!(scheduler_context = ast_sched_context_create())) { + scheduler_context = ast_sched_context_create(); + if (!scheduler_context) { ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n"); return -1; } @@ -500,7 +582,21 @@ int ast_sip_destroy_scheduler(void) ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); if (scheduler_context) { + if (tasks) { + struct ao2_iterator iter; + struct ast_sip_sched_task *schtd; + + /* Cancel all scheduled tasks */ + iter = ao2_iterator_init(tasks, 0); + while ((schtd = ao2_iterator_next(&iter))) { + ast_sip_sched_task_cancel(schtd); + ao2_ref(schtd, -1); + } + ao2_iterator_destroy(&iter); + } + ast_sched_context_destroy(scheduler_context); + scheduler_context = NULL; } ao2_cleanup(tasks); diff --git a/res/res_pjsip_header_funcs.c b/res/res_pjsip_header_funcs.c index 6c0f9151d..798a1cde6 100644 --- a/res/res_pjsip_header_funcs.c +++ b/res/res_pjsip_header_funcs.c @@ -153,7 +153,7 @@ static const struct ast_datastore_info header_datastore = { .type = "header_datastore", }; -/*! \brief Data structure used for ast_sip_push_task_synchronous */ +/*! \brief Data structure used for ast_sip_push_task_wait_serializer */ struct header_data { struct ast_sip_channel_pvt *channel; char *header_name; @@ -480,11 +480,11 @@ static int func_read_header(struct ast_channel *chan, const char *function, char header_data.len = len; if (!strcasecmp(args.action, "read")) { - return ast_sip_push_task_synchronous(channel->session->serializer, read_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + read_header, &header_data); } else if (!strcasecmp(args.action, "remove")) { - return ast_sip_push_task_synchronous(channel->session->serializer, remove_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + remove_header, &header_data); } else { ast_log(AST_LOG_ERROR, "Unknown action '%s' is not valid, must be 'read' or 'remove'.\n", @@ -539,14 +539,14 @@ static int func_write_header(struct ast_channel *chan, const char *cmd, char *da header_data.len = 0; if (!strcasecmp(args.action, "add")) { - return ast_sip_push_task_synchronous(channel->session->serializer, add_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + add_header, &header_data); } else if (!strcasecmp(args.action, "update")) { - return ast_sip_push_task_synchronous(channel->session->serializer, update_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + update_header, &header_data); } else if (!strcasecmp(args.action, "remove")) { - return ast_sip_push_task_synchronous(channel->session->serializer, remove_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + remove_header, &header_data); } else { ast_log(AST_LOG_ERROR, "Unknown action '%s' is not valid, must be 'add', 'update', or 'remove'.\n", diff --git a/res/res_pjsip_history.c b/res/res_pjsip_history.c index ab035a296..eed06eed8 100644 --- a/res/res_pjsip_history.c +++ b/res/res_pjsip_history.c @@ -1385,7 +1385,7 @@ static int unload_module(void) ast_cli_unregister_multiple(cli_pjsip, ARRAY_LEN(cli_pjsip)); ast_sip_unregister_service(&logging_module); - ast_sip_push_task_synchronous(NULL, clear_history_entries, NULL); + ast_sip_push_task_wait_servant(NULL, clear_history_entries, NULL); AST_VECTOR_FREE(&vector_history); ast_pjproject_caching_pool_destroy(&cachingpool); diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 8befbc1e8..4894e55d1 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -1070,7 +1070,7 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc( return NULL; } - if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) { + if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_init, publisher)) { ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); ao2_ref(publisher, -1); @@ -1514,8 +1514,8 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, */ old_publish = current_state->client->publish; current_state->client->publish = publish; - if (ast_sip_push_task_synchronous( - NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) { + if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_reinit_all, + current_state->client->publishers)) { /* * If the state object fails to re-initialize then swap * the old publish info back in. diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 2839ecbab..8a90849c0 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -1480,7 +1480,7 @@ static int sip_outbound_registration_apply(const struct ast_sorcery *sorcery, vo return -1; } - if (ast_sip_push_task_synchronous(new_state->client_state->serializer, + if (ast_sip_push_task_wait_serializer(new_state->client_state->serializer, sip_outbound_registration_regc_alloc, new_state)) { return -1; } @@ -1850,8 +1850,7 @@ static int ami_outbound_registration_detail(void *obj, void *arg, int flags) struct sip_ami_outbound *ami = arg; ami->registration = obj; - return ast_sip_push_task_synchronous( - NULL, ami_outbound_registration_task, ami); + return ast_sip_push_task_wait_servant(NULL, ami_outbound_registration_task, ami); } static int ami_show_outbound_registrations(struct mansession *s, diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 69c256dab..9e0718f51 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -1318,7 +1318,8 @@ static void subscription_tree_destructor(void *obj) destroy_subscriptions(sub_tree->root); if (sub_tree->dlg) { - ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree); + ast_sip_push_task_wait_servant(sub_tree->serializer, + subscription_unreference_dialog, sub_tree); } ao2_cleanup(sub_tree->endpoint); @@ -1665,7 +1666,8 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) } recreate_data.persistence = persistence; recreate_data.rdata = &rdata; - if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) { + if (ast_sip_push_task_wait_serializer(serializer, sub_persistence_recreate, + &recreate_data)) { ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 120203c95..1e6ca7f46 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -316,7 +316,15 @@ static void refer_progress_on_evsub_state(pjsip_evsub *sub, pjsip_event *event) /* It's possible that a task is waiting to remove us already, so bump the refcount of progress so it doesn't get destroyed */ ao2_ref(progress, +1); pjsip_dlg_dec_lock(progress->dlg); - ast_sip_push_task_synchronous(progress->serializer, refer_progress_terminate, progress); + /* + * XXX We are always going to execute this inline rather than + * in the serializer because this function is a PJPROJECT + * callback and thus has to be a SIP servant thread. + * + * The likely remedy is to push most of this function into + * refer_progress_terminate() with ast_sip_push_task(). + */ + ast_sip_push_task_wait_servant(progress->serializer, refer_progress_terminate, progress); pjsip_dlg_inc_lock(progress->dlg); ao2_ref(progress, -1); @@ -960,7 +968,8 @@ static int refer_incoming_invite_request(struct ast_sip_session *session, struct invite.session = other_session; - if (ast_sip_push_task_synchronous(other_session->serializer, invite_replaces, &invite)) { + if (ast_sip_push_task_wait_serializer(other_session->serializer, invite_replaces, + &invite)) { response = 481; goto inv_replace_failed; } diff --git a/res/res_pjsip_sdp_rtp.c b/res/res_pjsip_sdp_rtp.c index 03d37652f..14ed3b186 100644 --- a/res/res_pjsip_sdp_rtp.c +++ b/res/res_pjsip_sdp_rtp.c @@ -1096,6 +1096,9 @@ static void add_rtcp_fb_to_stream(struct ast_sip_session *session, attr = pjmedia_sdp_attr_create(pool, "rtcp-fb", pj_cstr(&stmp, "* goog-remb")); pjmedia_sdp_attr_add(&media->attr_count, media->attr, attr); + + attr = pjmedia_sdp_attr_create(pool, "rtcp-fb", pj_cstr(&stmp, "* nack")); + pjmedia_sdp_attr_add(&media->attr_count, media->attr, attr); } /*! \brief Function which negotiates an incoming media stream */ diff --git a/res/res_pjsip_transport_websocket.c b/res/res_pjsip_transport_websocket.c index 974b15087..633594359 100644 --- a/res/res_pjsip_transport_websocket.c +++ b/res/res_pjsip_transport_websocket.c @@ -377,7 +377,7 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par create_data.ws_session = session; - if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) { + if (ast_sip_push_task_wait_serializer(serializer, transport_create, &create_data)) { ast_log(LOG_ERROR, "Could not create WebSocket transport.\n"); ast_taskprocessor_unreference(serializer); ast_websocket_unref(session); @@ -396,13 +396,13 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par } if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) { - ast_sip_push_task_synchronous(serializer, transport_read, &read_data); + ast_sip_push_task_wait_serializer(serializer, transport_read, &read_data); } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) { break; } } - ast_sip_push_task_synchronous(serializer, transport_shutdown, transport); + ast_sip_push_task_wait_serializer(serializer, transport_shutdown, transport); ast_taskprocessor_unreference(serializer); ast_websocket_unref(session); diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index c88903777..4ac20d551 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -71,6 +71,7 @@ #include "asterisk/smoother.h" #include "asterisk/uuid.h" #include "asterisk/test.h" +#include "asterisk/data_buffer.h" #ifdef HAVE_PJPROJECT #include "asterisk/res_pjproject.h" #endif @@ -92,6 +93,8 @@ #define TURN_STATE_WAIT_TIME 2000 +#define DEFAULT_RTP_BUFFER_SIZE 250 + /*! Full INTRA-frame Request / Fast Update Request (From RFC2032) */ #define RTCP_PT_FUR 192 /*! Sender Report (From RFC3550) */ @@ -373,6 +376,8 @@ struct ast_rtp { struct rtp_red *red; + struct ast_data_buffer *send_buffer; /*!< Buffer for storing sent packets for retransmission */ + #ifdef HAVE_PJPROJECT ast_cond_t cond; /*!< ICE/TURN condition for signaling */ @@ -509,6 +514,12 @@ struct rtp_red { long int prev_ts; }; +/*! \brief Structure for storing RTP packets for retransmission */ +struct ast_rtp_rtcp_nack_payload { + size_t size; /*!< The size of the payload */ + unsigned char buf[0]; /*!< The payload data */ +}; + AST_LIST_HEAD_NOLOCK(frame_list, ast_frame); /* Forward Declarations */ @@ -3675,6 +3686,11 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance) rtp->red = NULL; } + /* Destroy the send buffer if it was being used */ + if (rtp->send_buffer) { + ast_data_buffer_free(rtp->send_buffer); + } + ao2_cleanup(rtp->lasttxformat); ao2_cleanup(rtp->lastrxformat); ao2_cleanup(rtp->f.subclass.format); @@ -4369,7 +4385,7 @@ static int rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame *fr } else { /* This is the first frame with sequence number we've seen, so start keeping track */ rtp->expectedseqno = frame->seqno + 1; - } + } } else { rtp->expectedseqno = -1; } @@ -4383,13 +4399,27 @@ static int rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame *fr /* If we know the remote address construct a packet and send it out */ if (!ast_sockaddr_isnull(&remote_address)) { int hdrlen = 12, res, ice; + int packet_len = frame->datalen + hdrlen; unsigned char *rtpheader = (unsigned char *)(frame->data.ptr - hdrlen); put_unaligned_uint32(rtpheader, htonl((2 << 30) | (codec << 16) | (seqno) | (mark << 23))); put_unaligned_uint32(rtpheader + 4, htonl(rtp->lastts)); put_unaligned_uint32(rtpheader + 8, htonl(rtp->ssrc)); - if ((res = rtp_sendto(instance, (void *)rtpheader, frame->datalen + hdrlen, 0, &remote_address, &ice)) < 0) { + /* If retransmissions are enabled, we need to store this packet for future use */ + if (rtp->send_buffer) { + struct ast_rtp_rtcp_nack_payload *payload; + + payload = ast_malloc(sizeof(*payload) + packet_len); + if (payload) { + payload->size = packet_len; + memcpy(payload->buf, rtpheader, packet_len); + ast_data_buffer_put(rtp->send_buffer, rtp->seqno, payload); + } + } + + res = rtp_sendto(instance, (void *)rtpheader, packet_len, 0, &remote_address, &ice); + if (res < 0) { if (!ast_rtp_instance_get_prop(instance, AST_RTP_PROPERTY_NAT) || (ast_rtp_instance_get_prop(instance, AST_RTP_PROPERTY_NAT) && (ast_test_flag(rtp, FLAG_NAT_ACTIVE) == FLAG_NAT_ACTIVE))) { ast_debug(1, "RTP Transmission error of packet %d to %s: %s\n", rtp->seqno, @@ -5145,8 +5175,8 @@ static void update_lost_stats(struct ast_rtp *rtp, unsigned int lost_packets) } /*! \pre instance is locked */ -static struct ast_rtp_instance *rtp_find_instance_by_ssrc(struct ast_rtp_instance *instance, - struct ast_rtp *rtp, unsigned int ssrc) +static struct ast_rtp_instance *__rtp_find_instance_by_ssrc(struct ast_rtp_instance *instance, + struct ast_rtp *rtp, unsigned int ssrc, int source) { int index; @@ -5158,8 +5188,9 @@ static struct ast_rtp_instance *rtp_find_instance_by_ssrc(struct ast_rtp_instanc /* Find the bundled child instance */ for (index = 0; index < AST_VECTOR_SIZE(&rtp->ssrc_mapping); ++index) { struct rtp_ssrc_mapping *mapping = AST_VECTOR_GET_ADDR(&rtp->ssrc_mapping, index); + unsigned int mapping_ssrc = source ? ast_rtp_get_ssrc(mapping->instance) : mapping->ssrc; - if (mapping->ssrc_valid && mapping->ssrc == ssrc) { + if (mapping->ssrc_valid && mapping_ssrc == ssrc) { return mapping->instance; } } @@ -5171,6 +5202,20 @@ static struct ast_rtp_instance *rtp_find_instance_by_ssrc(struct ast_rtp_instanc return NULL; } +/*! \pre instance is locked */ +static struct ast_rtp_instance *rtp_find_instance_by_packet_source_ssrc(struct ast_rtp_instance *instance, + struct ast_rtp *rtp, unsigned int ssrc) +{ + return __rtp_find_instance_by_ssrc(instance, rtp, ssrc, 0); +} + +/*! \pre instance is locked */ +static struct ast_rtp_instance *rtp_find_instance_by_media_source_ssrc(struct ast_rtp_instance *instance, + struct ast_rtp *rtp, unsigned int ssrc) +{ + return __rtp_find_instance_by_ssrc(instance, rtp, ssrc, 1); +} + static const char *rtcp_payload_type2str(unsigned int pt) { const char *str; @@ -5203,6 +5248,69 @@ static const char *rtcp_payload_type2str(unsigned int pt) return str; } +/*! \pre instance is locked */ +static int ast_rtp_rtcp_handle_nack(struct ast_rtp_instance *instance, unsigned int *nackdata, unsigned int position, + unsigned int length) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + int res = 0; + int blp_index; + int packet_index; + int ice; + struct ast_rtp_rtcp_nack_payload *payload; + unsigned int current_word; + unsigned int pid; /* Packet ID which refers to seqno of lost packet */ + unsigned int blp; /* Bitmask of following lost packets */ + struct ast_sockaddr remote_address = { {0,} }; + + if (!rtp->send_buffer) { + ast_debug(1, "Tried to handle NACK request, but we don't have a RTP packet storage!\n"); + return res; + } + + ast_rtp_instance_get_remote_address(instance, &remote_address); + + /* + * We use index 3 because with feedback messages, the FCI (Feedback Control Information) + * does not begin until after the version, packet SSRC, and media SSRC words. + */ + for (packet_index = 3; packet_index < length; packet_index++) { + current_word = ntohl(nackdata[position + packet_index]); + pid = current_word >> 16; + /* We know the remote end is missing this packet. Go ahead and send it if we still have it. */ + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_get(rtp->send_buffer, pid); + if (payload) { + res += rtp_sendto(instance, payload->buf, payload->size, 0, &remote_address, &ice); + } else { + ast_debug(1, "Received NACK request for RTP packet with seqno %d, but we don't have it\n", pid); + } + /* + * The bitmask. Denoting the least significant bit as 1 and its most significant bit + * as 16, then bit i of the bitmask is set to 1 if the receiver has not received RTP + * packet (pid+i)(modulo 2^16). Otherwise, it is set to 0. We cannot assume bits set + * to 0 after a bit set to 1 have actually been received. + */ + blp = current_word & 0xFF; + blp_index = 1; + while (blp) { + if (blp & 1) { + /* Packet (pid + i)(modulo 2^16) is missing too. */ + unsigned int seqno = (pid + blp_index) % 65536; + payload = (struct ast_rtp_rtcp_nack_payload *)ast_data_buffer_get(rtp->send_buffer, seqno); + if (payload) { + res += rtp_sendto(instance, payload->buf, payload->size, 0, &remote_address, &ice); + } else { + ast_debug(1, "Remote end also requested RTP packet with seqno %d, but we don't have it\n", seqno); + } + } + blp >>= 1; + blp_index++; + } + } + + return res; +} + /* * Unshifted RTCP header bit field masks */ @@ -5243,6 +5351,7 @@ static const char *rtcp_payload_type2str(unsigned int pt) #define RTCP_RR_BLOCK_WORD_LENGTH 6 #define RTCP_HEADER_SSRC_LENGTH 2 #define RTCP_FB_REMB_BLOCK_WORD_LENGTH 4 +#define RTCP_FB_NACK_BLOCK_WORD_LENGTH 2 static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, const unsigned char *rtcpdata, size_t size, struct ast_sockaddr *addr) { @@ -5319,6 +5428,8 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c unsigned int ssrc_valid; unsigned int length; unsigned int min_length; + /*! Always use packet source SSRC to find the rtp instance unless explicitly told not to. */ + unsigned int use_packet_source = 1; struct ast_json *message_blob; RAII_VAR(struct ast_rtp_rtcp_report *, rtcp_report, NULL, ao2_cleanup); @@ -5341,9 +5452,20 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c /* fall through */ case RTCP_PT_RR: min_length += (rc * RTCP_RR_BLOCK_WORD_LENGTH); + use_packet_source = 0; break; case RTCP_PT_FUR: break; + case AST_RTP_RTCP_RTPFB: + switch (rc) { + case AST_RTP_RTCP_FMT_NACK: + min_length += RTCP_FB_NACK_BLOCK_WORD_LENGTH; + break; + default: + break; + } + use_packet_source = 0; + break; case RTCP_PT_PSFB: switch (rc) { case AST_RTP_RTCP_FMT_REMB: @@ -5392,13 +5514,16 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c } rtcp_report->reception_report_count = rc; - ssrc = ntohl(rtcpheader[i + 1]); + ssrc = ntohl(rtcpheader[i + 2]); rtcp_report->ssrc = ssrc; break; case RTCP_PT_FUR: case RTCP_PT_PSFB: ssrc = ntohl(rtcpheader[i + 1]); break; + case AST_RTP_RTCP_RTPFB: + ssrc = ntohl(rtcpheader[i + 2]); + break; case RTCP_PT_SDES: case RTCP_PT_BYE: default: @@ -5417,7 +5542,15 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c /* Determine the appropriate instance for this */ if (ssrc_valid) { - child = rtp_find_instance_by_ssrc(transport, transport_rtp, ssrc); + /* + * Depending on the payload type, either the packet source or media source + * SSRC is used. + */ + if (use_packet_source) { + child = rtp_find_instance_by_packet_source_ssrc(transport, transport_rtp, ssrc); + } else { + child = rtp_find_instance_by_media_source_ssrc(transport, transport_rtp, ssrc); + } if (child && child != transport) { /* * It is safe to hold the child lock while holding the parent lock. @@ -5438,7 +5571,7 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c } if (ssrc_valid && rtp->themssrc_valid) { - if (ssrc != rtp->themssrc) { + if (ssrc != rtp->themssrc && use_packet_source) { /* * Skip over this RTCP record as it does not contain the * correct SSRC. We should not act upon RTCP records @@ -5581,6 +5714,24 @@ static struct ast_frame *ast_rtcp_interpret(struct ast_rtp_instance *instance, c transport_rtp->f.src = "RTP"; f = &transport_rtp->f; break; + case AST_RTP_RTCP_RTPFB: + switch (rc) { + case AST_RTP_RTCP_FMT_NACK: + /* If retransmissions are not enabled ignore this message */ + if (!rtp->send_buffer) { + break; + } + + if (rtcp_debug_test_addr(addr)) { + ast_verbose("Received generic RTCP NACK message\n"); + } + + ast_rtp_rtcp_handle_nack(instance, rtcpheader, position, length); + break; + default: + break; + } + break; case RTCP_PT_FUR: /* Handle RTCP FUR as FIR by setting the format to 4 */ rc = AST_RTP_RTCP_FMT_FIR; @@ -5981,7 +6132,7 @@ static struct ast_frame *ast_rtp_read(struct ast_rtp_instance *instance, int rtc ssrc = ntohl(rtpheader[2]); /* Determine the appropriate instance for this */ - child = rtp_find_instance_by_ssrc(instance, rtp, ssrc); + child = rtp_find_instance_by_packet_source_ssrc(instance, rtp, ssrc); if (!child) { /* Neither the bundled parent nor any child has this SSRC */ return &ast_null_frame; @@ -6614,6 +6765,8 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro } } else if (property == AST_RTP_PROPERTY_ASYMMETRIC_CODEC) { rtp->asymmetric_codec = value; + } else if (property == AST_RTP_PROPERTY_RETRANS_SEND) { + rtp->send_buffer = ast_data_buffer_alloc(ast_free_ptr, DEFAULT_RTP_BUFFER_SIZE); } } |