From f2985e310663b67ccc948515efeae500bdf94a0c Mon Sep 17 00:00:00 2001 From: Joshua Colp Date: Sat, 16 Sep 2017 11:19:59 -0300 Subject: bridge: Change participant SFU streams when source streams change. Some endpoints do not like a stream being reused for a new media stream. The frame/jitterbuffer can rely on underlying attributes of the media stream in order to order the packets. When a new stream takes its place without any notice the buffer can get confused and the media ends up getting dropped. This change uses the SSRC change to determine that a new source is reusing an existing stream and then bridge_softmix renegotiates each participant such that they see a new media stream. This causes the frame/jitterbuffer to start fresh and work as expected. ASTERISK-27277 Change-Id: I30ccbdba16ca073d7f31e0e59ab778c153afae07 --- bridges/bridge_softmix.c | 135 ++++++++++++++++++++++++++++++++--- channels/chan_iax2.c | 1 + channels/chan_pjsip.c | 2 + funcs/func_frame_trace.c | 3 + include/asterisk/frame.h | 1 + include/asterisk/res_pjsip_session.h | 2 + main/channel.c | 2 + res/res_pjsip_sdp_rtp.c | 13 ++++ res/res_pjsip_session.c | 12 +++- 9 files changed, 162 insertions(+), 9 deletions(-) diff --git a/bridges/bridge_softmix.c b/bridges/bridge_softmix.c index 59b16b78e..5e0a4856f 100644 --- a/bridges/bridge_softmix.c +++ b/bridges/bridge_softmix.c @@ -79,7 +79,7 @@ struct softmix_stats { struct softmix_translate_helper_entry { int num_times_requested; /*!< Once this entry is no longer requested, free the trans_pvt - and re-init if it was usable. */ + and re-init if it was usable. */ struct ast_format *dst_format; /*!< The destination format for this helper */ struct ast_trans_pvt *trans_pvt; /*!< the translator for this slot. */ struct ast_frame *out_frame; /*!< The output frame from the last translation */ @@ -493,21 +493,21 @@ static int append_source_streams(struct ast_stream_topology *dest, for (i = 0; i < ast_stream_topology_get_count(source); ++i) { struct ast_stream *stream; struct ast_stream *stream_clone; - char *stream_clone_name; - size_t stream_clone_name_len; + char *stream_clone_name = NULL; stream = ast_stream_topology_get_stream(source, i); if (!is_video_source(stream)) { continue; } - /* The +3 is for the two underscore separators and null terminator */ - stream_clone_name_len = SOFTBRIDGE_VIDEO_DEST_LEN + strlen(channel_name) + strlen(ast_stream_get_name(stream)) + 3; - stream_clone_name = ast_alloca(stream_clone_name_len); - snprintf(stream_clone_name, stream_clone_name_len, "%s_%s_%s", SOFTBRIDGE_VIDEO_DEST_PREFIX, - channel_name, ast_stream_get_name(stream)); + if (ast_asprintf(&stream_clone_name, "%s_%s_%s", SOFTBRIDGE_VIDEO_DEST_PREFIX, + channel_name, ast_stream_get_name(stream)) < 0) { + ast_free(stream_clone_name); + return -1; + } stream_clone = ast_stream_clone(stream, stream_clone_name); + ast_free(stream_clone_name); if (!stream_clone) { return -1; } @@ -987,6 +987,120 @@ static void softmix_bridge_write_voice(struct ast_bridge *bridge, struct ast_bri } } +static int remove_all_original_streams(struct ast_stream_topology *dest, + const struct ast_stream_topology *source, + const struct ast_stream_topology *original) +{ + int i; + + for (i = 0; i < ast_stream_topology_get_count(source); ++i) { + struct ast_stream *stream; + int original_index; + + stream = ast_stream_topology_get_stream(source, i); + + /* Mark the existing stream as removed so we get a new one, this will get + * reused on a subsequent renegotiation. + */ + for (original_index = 0; original_index < ast_stream_topology_get_count(original); ++original_index) { + struct ast_stream *original_stream = ast_stream_topology_get_stream(original, original_index); + + if (!strcmp(ast_stream_get_name(stream), ast_stream_get_name(original_stream))) { + struct ast_stream *removed; + + /* Since the participant is still going to be in the bridge we + * change the name so that routing does not attempt to route video + * to this stream. + */ + removed = ast_stream_clone(stream, "removed"); + if (!removed) { + return -1; + } + + ast_stream_set_state(removed, AST_STREAM_STATE_REMOVED); + + /* The destination topology can only ever contain the same, or more, + * streams than the original so this is safe. + */ + if (ast_stream_topology_set_stream(dest, original_index, removed)) { + ast_stream_free(removed); + return -1; + } + + break; + } + } + } + + return 0; +} + +static void sfu_topologies_on_source_change(struct ast_bridge_channel *source, struct ast_bridge_channels_list *participants) +{ + struct ast_stream_topology *source_video = NULL; + struct ast_bridge_channel *participant; + int res; + + source_video = ast_stream_topology_alloc(); + if (!source_video) { + return; + } + + ast_channel_lock(source->chan); + res = append_source_streams(source_video, ast_channel_name(source->chan), ast_channel_get_stream_topology(source->chan)); + ast_channel_unlock(source->chan); + if (res) { + goto cleanup; + } + + AST_LIST_TRAVERSE(participants, participant, entry) { + struct ast_stream_topology *original_topology; + struct ast_stream_topology *participant_topology; + + if (participant == source) { + continue; + } + + ast_channel_lock(participant->chan); + original_topology = ast_stream_topology_clone(ast_channel_get_stream_topology(participant->chan)); + ast_channel_unlock(participant->chan); + if (!original_topology) { + goto cleanup; + } + + participant_topology = ast_stream_topology_clone(original_topology); + if (!participant_topology) { + ast_stream_topology_free(original_topology); + goto cleanup; + } + + /* We add all the source streams back in, if any removed streams are already present they will + * get used first followed by appending new ones. + */ + if (append_all_streams(participant_topology, source_video)) { + ast_stream_topology_free(participant_topology); + ast_stream_topology_free(original_topology); + goto cleanup; + } + + /* And the original existing streams get marked as removed. This causes the remote side to see + * a new stream for the source streams. + */ + if (remove_all_original_streams(participant_topology, source_video, original_topology)) { + ast_stream_topology_free(participant_topology); + ast_stream_topology_free(original_topology); + goto cleanup; + } + + ast_channel_request_stream_topology_change(participant->chan, participant_topology, NULL); + ast_stream_topology_free(participant_topology); + ast_stream_topology_free(original_topology); + } + +cleanup: + ast_stream_topology_free(source_video); +} + /*! * \internal * \brief Determine what to do with a control frame. @@ -1016,6 +1130,11 @@ static int softmix_bridge_write_control(struct ast_bridge *bridge, struct ast_br softmix_data->last_video_update = ast_tvnow(); } break; + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: + if (bridge->softmix.video_mode.mode == AST_BRIDGE_VIDEO_MODE_SFU) { + sfu_topologies_on_source_change(bridge_channel, &bridge->channels); + } + break; default: break; } diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 490c4cea5..04aa228bf 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1433,6 +1433,7 @@ static int iax2_is_control_frame_allowed(int subtype) /* Intended only for internal stream topology manipulation. */ case AST_CONTROL_STREAM_TOPOLOGY_CHANGED: /* Intended only for internal stream topology change notification. */ + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: case AST_CONTROL_STREAM_RESTART: diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 84b508b61..7520c2b0e 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -1740,6 +1740,8 @@ static int chan_pjsip_indicate(struct ast_channel *ast, int condition, const voi break; case AST_CONTROL_STREAM_TOPOLOGY_CHANGED: break; + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: + break; case -1: res = -1; break; diff --git a/funcs/func_frame_trace.c b/funcs/func_frame_trace.c index 49abfdf14..e88cafaf6 100644 --- a/funcs/func_frame_trace.c +++ b/funcs/func_frame_trace.c @@ -342,6 +342,9 @@ static void print_frame(struct ast_frame *frame) case AST_CONTROL_STREAM_TOPOLOGY_CHANGED: ast_verbose("SubClass: STREAM_TOPOLOGY_CHANGED\n"); break; + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: + ast_verbose("SubClass: STREAM_TOPOLOGY_SOURCE_CHANGED\n"); + break; case AST_CONTROL_STREAM_STOP: ast_verbose("SubClass: STREAM_STOP\n"); break; diff --git a/include/asterisk/frame.h b/include/asterisk/frame.h index 8f0daccb7..eb6a6479a 100644 --- a/include/asterisk/frame.h +++ b/include/asterisk/frame.h @@ -301,6 +301,7 @@ enum ast_control_frame_type { AST_CONTROL_MASQUERADE_NOTIFY = 34, /*!< A masquerade is about to begin/end. (Never sent as a frame but directly with ast_indicate_data().) */ AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE = 35, /*!< Channel indication that a stream topology change has been requested */ AST_CONTROL_STREAM_TOPOLOGY_CHANGED = 36, /*!< Channel indication that a stream topology change has occurred */ + AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED = 37, /*!< Channel indication that one of the source streams has changed its source */ /* * WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING diff --git a/include/asterisk/res_pjsip_session.h b/include/asterisk/res_pjsip_session.h index d5b6fa194..fcb14b79d 100644 --- a/include/asterisk/res_pjsip_session.h +++ b/include/asterisk/res_pjsip_session.h @@ -109,6 +109,8 @@ struct ast_sip_session_media { char mslabel[AST_UUID_STR_LEN]; /*! \brief Track label */ char label[AST_UUID_STR_LEN]; + /*! \brief The underlying session has been changed in some fashion */ + unsigned int changed; }; /*! diff --git a/main/channel.c b/main/channel.c index 74de9caac..ecc771c1d 100644 --- a/main/channel.c +++ b/main/channel.c @@ -4228,6 +4228,7 @@ static int attribute_const is_visible_indication(enum ast_control_frame_type con case AST_CONTROL_MASQUERADE_NOTIFY: case AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE: case AST_CONTROL_STREAM_TOPOLOGY_CHANGED: + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: case AST_CONTROL_STREAM_REVERSE: @@ -4528,6 +4529,7 @@ static int indicate_data_internal(struct ast_channel *chan, int _condition, cons case AST_CONTROL_UPDATE_RTP_PEER: case AST_CONTROL_STREAM_TOPOLOGY_REQUEST_CHANGE: case AST_CONTROL_STREAM_TOPOLOGY_CHANGED: + case AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED: case AST_CONTROL_STREAM_STOP: case AST_CONTROL_STREAM_SUSPEND: case AST_CONTROL_STREAM_REVERSE: diff --git a/res/res_pjsip_sdp_rtp.c b/res/res_pjsip_sdp_rtp.c index e095f0660..88b94ee43 100644 --- a/res/res_pjsip_sdp_rtp.c +++ b/res/res_pjsip_sdp_rtp.c @@ -1022,6 +1022,19 @@ static void process_ssrc_attributes(struct ast_sip_session *session, struct ast_ continue; } + /* If we are currently negotiating as a result of the remote side renegotiating then + * determine if the source for this stream has changed. + */ + if (pjmedia_sdp_neg_get_state(session->inv_session->neg) == PJMEDIA_SDP_NEG_STATE_REMOTE_OFFER && + session->active_media_state) { + struct ast_rtp_instance_stats stats = { 0, }; + + if (!ast_rtp_instance_get_stats(session_media->rtp, &stats, AST_RTP_INSTANCE_STAT_REMOTE_SSRC) && + stats.remote_ssrc != ssrc) { + session_media->changed = 1; + } + } + ast_rtp_instance_set_remote_ssrc(session_media->rtp, ssrc); } } diff --git a/res/res_pjsip_session.c b/res/res_pjsip_session.c index 64416a063..4b3bdb812 100644 --- a/res/res_pjsip_session.c +++ b/res/res_pjsip_session.c @@ -765,6 +765,7 @@ static int handle_negotiated_sdp(struct ast_sip_session *session, const pjmedia_ { int i; struct ast_stream_topology *topology; + unsigned int changed = 0; for (i = 0; i < local->media_count; ++i) { struct ast_sip_session_media *session_media; @@ -802,6 +803,9 @@ static int handle_negotiated_sdp(struct ast_sip_session *session, const pjmedia_ if (handle_negotiated_sdp_session_media(session_media, session, local, remote, i, stream)) { return -1; } + + changed |= session_media->changed; + session_media->changed = 0; } /* Apply the pending media state to the channel and make it active */ @@ -858,7 +862,13 @@ static int handle_negotiated_sdp(struct ast_sip_session *session, const pjmedia_ ast_channel_unlock(session->channel); - ast_queue_frame(session->channel, &ast_null_frame); + if (changed) { + struct ast_frame f = { AST_FRAME_CONTROL, .subclass.integer = AST_CONTROL_STREAM_TOPOLOGY_SOURCE_CHANGED }; + + ast_queue_frame(session->channel, &f); + } else { + ast_queue_frame(session->channel, &ast_null_frame); + } return 0; } -- cgit v1.2.3