diff options
Diffstat (limited to 'main/rtp_engine.c')
-rw-r--r-- | main/rtp_engine.c | 391 |
1 files changed, 384 insertions, 7 deletions
diff --git a/main/rtp_engine.c b/main/rtp_engine.c index 8e58f658d..4dd4d46de 100644 --- a/main/rtp_engine.c +++ b/main/rtp_engine.c @@ -25,6 +25,114 @@ /*** MODULEINFO <support_level>core</support_level> +***/ + +/*** DOCUMENTATION + <managerEvent language="en_US" name="RTCPSent"> + <managerEventInstance class="EVENT_FLAG_REPORTING"> + <synopsis>Raised when an RTCP packet is sent.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + <parameter name="SSRC"> + <para>The SSRC identifier for our stream</para> + </parameter> + <parameter name="PT"> + <para>The type of packet for this RTCP report.</para> + <enumlist> + <enum name="200(SR)"/> + <enum name="201(RR)"/> + </enumlist> + </parameter> + <parameter name="To"> + <para>The address the report is sent to.</para> + </parameter> + <parameter name="ReportCount"> + <para>The number of reports that were sent.</para> + <para>The report count determines the number of ReportX headers in + the message. The X for each set of report headers will range from 0 to + <literal>ReportCount - 1</literal>.</para> + </parameter> + <parameter name="SentNTP" required="false"> + <para>The time the sender generated the report. Only valid when + PT is <literal>200(SR)</literal>.</para> + </parameter> + <parameter name="SentRTP" required="false"> + <para>The sender's last RTP timestamp. Only valid when PT is + <literal>200(SR)</literal>.</para> + </parameter> + <parameter name="SentPackets" required="false"> + <para>The number of packets the sender has sent. Only valid when PT + is <literal>200(SR)</literal>.</para> + </parameter> + <parameter name="SentOctets" required="false"> + <para>The number of bytes the sender has sent. Only valid when PT is + <literal>200(SR)</literal>.</para> + </parameter> + <parameter name="ReportXSourceSSRC"> + <para>The SSRC for the source of this report block.</para> + </parameter> + <parameter name="ReportXFractionLost"> + <para>The fraction of RTP data packets from <literal>ReportXSourceSSRC</literal> + lost since the previous SR or RR report was sent.</para> + </parameter> + <parameter name="ReportXCumulativeLost"> + <para>The total number of RTP data packets from <literal>ReportXSourceSSRC</literal> + lost since the beginning of reception.</para> + </parameter> + <parameter name="ReportXHighestSequence"> + <para>The highest sequence number received in an RTP data packet from + <literal>ReportXSourceSSRC</literal>.</para> + </parameter> + <parameter name="ReportXSequenceNumberCycles"> + <para>The number of sequence number cycles seen for the RTP data + received from <literal>ReportXSourceSSRC</literal>.</para> + </parameter> + <parameter name="ReportXIAJitter"> + <para>An estimate of the statistical variance of the RTP data packet + interarrival time, measured in timestamp units.</para> + </parameter> + <parameter name="ReportXLSR"> + <para>The last SR timestamp received from <literal>ReportXSourceSSRC</literal>. + If no SR has been received from <literal>ReportXSourceSSRC</literal>, + then 0.</para> + </parameter> + <parameter name="ReportXDLSR"> + <para>The delay, expressed in units of 1/65536 seconds, between + receiving the last SR packet from <literal>ReportXSourceSSRC</literal> + and sending this report.</para> + </parameter> + </syntax> + </managerEventInstance> + </managerEvent> + <managerEvent language="en_US" name="RTCPReceived"> + <managerEventInstance class="EVENT_FLAG_REPORTING"> + <synopsis>Raised when an RTCP packet is received.</synopsis> + <syntax> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='Newchannel']/managerEventInstance/syntax/parameter)" /> + <parameter name="SSRC"> + <para>The SSRC identifier for the remote system</para> + </parameter> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[@name='PT'])" /> + <parameter name="From"> + <para>The address the report was received from.</para> + </parameter> + <parameter name="RTT"> + <para>Calculated Round-Trip Time in seconds</para> + </parameter> + <parameter name="ReportCount"> + <para>The number of reports that were received.</para> + <para>The report count determines the number of ReportX headers in + the message. The X for each set of report headers will range from 0 to + <literal>ReportCount - 1</literal>.</para> + </parameter> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[@name='SentNTP'])" /> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[@name='SentRTP'])" /> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[@name='SentPackets'])" /> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[@name='SentOctets'])" /> + <xi:include xpointer="xpointer(/docs/managerEvent[@name='RTCPSent']/managerEventInstance/syntax/parameter[contains(@name, 'ReportX')])" /> + </syntax> + </managerEventInstance> + </managerEvent> ***/ #include "asterisk.h" @@ -45,6 +153,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/netsock2.h" #include "asterisk/_private.h" #include "asterisk/framehook.h" +#include "asterisk/stasis.h" +#include "asterisk/json.h" +#include "asterisk/stasis_channels.h" struct ast_srtp_res *res_srtp = NULL; struct ast_srtp_policy_res *res_srtp_policy = NULL; @@ -73,10 +184,10 @@ struct ast_rtp_instance { int keepalive; /*! Glue currently in use */ struct ast_rtp_glue *glue; - /*! Channel associated with the instance */ - struct ast_channel *chan; /*! SRTP info associated with the instance */ struct ast_srtp *srtp; + /*! Channel unique ID */ + char channel_uniqueid[AST_MAX_UNIQUEID]; }; /*! List of RTP engines that are currently registered */ @@ -109,6 +220,9 @@ static int mime_types_len = 0; static struct ast_rtp_payload_type static_RTP_PT[AST_RTP_MAX_PT]; static ast_rwlock_t static_RTP_PT_lock; +/*! \brief \ref stasis topic for RTP related messages */ +static struct stasis_topic *rtp_topic; + int ast_rtp_engine_register2(struct ast_rtp_engine *engine, struct ast_module *module) { struct ast_rtp_engine *current_engine; @@ -292,6 +406,16 @@ struct ast_rtp_instance *ast_rtp_instance_new(const char *engine_name, return instance; } +const char *ast_rtp_instance_get_channel_id(struct ast_rtp_instance *instance) +{ + return instance->channel_uniqueid; +} + +void ast_rtp_instance_set_channel_id(struct ast_rtp_instance *instance, const char *uniqueid) +{ + ast_copy_string(instance->channel_uniqueid, uniqueid, sizeof(instance->channel_uniqueid)); +} + void ast_rtp_instance_set_data(struct ast_rtp_instance *instance, void *data) { instance->data = data; @@ -1317,11 +1441,6 @@ struct ast_rtp_glue *ast_rtp_instance_get_active_glue(struct ast_rtp_instance *i return instance->glue; } -struct ast_channel *ast_rtp_instance_get_chan(struct ast_rtp_instance *instance) -{ - return instance->chan; -} - int ast_rtp_engine_register_srtp(struct ast_srtp_res *srtp_res, struct ast_srtp_policy_res *policy_res) { if (res_srtp || res_srtp_policy) { @@ -1552,6 +1671,256 @@ int ast_rtp_engine_unload_format(const struct ast_format *format) return 0; } +/*! \internal \brief \ref stasis message payload for RTCP messages */ +struct rtcp_message_payload { + struct ast_channel_snapshot *snapshot; /*< The channel snapshot, if available */ + struct ast_rtp_rtcp_report *report; /*< The RTCP report */ + struct ast_json *blob; /*< Extra JSON data to publish */ +}; + +static void rtcp_message_payload_dtor(void *obj) +{ + struct rtcp_message_payload *payload = obj; + + ao2_cleanup(payload->report); + ao2_cleanup(payload->snapshot); + ast_json_unref(payload->blob); +} + +static struct ast_manager_event_blob *rtcp_report_to_ami(struct stasis_message *msg) +{ + struct rtcp_message_payload *payload = stasis_message_data(msg); + RAII_VAR(struct ast_str *, channel_string, NULL, ast_free); + RAII_VAR(struct ast_str *, packet_string, ast_str_create(512), ast_free); + unsigned int ssrc = payload->report->ssrc; + unsigned int type = payload->report->type; + unsigned int report_count = payload->report->reception_report_count; + int i; + + if (!packet_string) { + return NULL; + } + + if (payload->snapshot) { + channel_string = ast_manager_build_channel_state_string(payload->snapshot); + if (!channel_string) { + return NULL; + } + } + + if (payload->blob) { + /* Optional data */ + struct ast_json *to = ast_json_object_get(payload->blob, "to"); + struct ast_json *from = ast_json_object_get(payload->blob, "from"); + struct ast_json *rtt = ast_json_object_get(payload->blob, "rtt"); + if (to) { + ast_str_append(&packet_string, 0, "To: %s\r\n", ast_json_string_get(to)); + } + if (from) { + ast_str_append(&packet_string, 0, "From: %s\r\n", ast_json_string_get(from)); + } + if (rtt) { + ast_str_append(&packet_string, 0, "RTT: %4.4f\r\n", ast_json_real_get(rtt)); + } + } + + ast_str_append(&packet_string, 0, "SSRC: 0x%.8x\r\n", ssrc); + ast_str_append(&packet_string, 0, "PT: %u(%s)\r\n", type, type== AST_RTP_RTCP_SR ? "SR" : "RR"); + ast_str_append(&packet_string, 0, "ReportCount: %u\r\n", report_count); + if (type == AST_RTP_RTCP_SR) { + ast_str_append(&packet_string, 0, "SentNTP: %lu.%06lu\r\n", + (unsigned long)payload->report->sender_information.ntp_timestamp.tv_sec, + (unsigned long)payload->report->sender_information.ntp_timestamp.tv_usec * 4096); + ast_str_append(&packet_string, 0, "SentRTP: %u\r\n", + payload->report->sender_information.rtp_timestamp); + ast_str_append(&packet_string, 0, "SentPackets: %u\r\n", + payload->report->sender_information.packet_count); + ast_str_append(&packet_string, 0, "SentOctets: %u\r\n", + payload->report->sender_information.octet_count); + } + + for (i = 0; i < report_count; i++) { + RAII_VAR(struct ast_str *, report_string, NULL, ast_free); + + if (!payload->report->report_block[i]) { + break; + } + + report_string = ast_str_create(256); + if (!report_string) { + return NULL; + } + + ast_str_append(&report_string, 0, "Report%dSourceSSRC: 0x%.8x\r\n", + i, payload->report->report_block[i]->source_ssrc); + ast_str_append(&report_string, 0, "Report%dFractionLost: %u\r\n", + i, payload->report->report_block[i]->lost_count.fraction); + ast_str_append(&report_string, 0, "Report%dCumulativeLost: %u\r\n", + i, payload->report->report_block[i]->lost_count.packets); + ast_str_append(&report_string, 0, "Report%dHighestSequence: %u\r\n", + i, payload->report->report_block[i]->highest_seq_no & 0xffff); + ast_str_append(&report_string, 0, "Report%dSequenceNumberCycles: %u\r\n", + i, payload->report->report_block[i]->highest_seq_no >> 16); + ast_str_append(&report_string, 0, "Report%dIAJitter: %u\r\n", + i, payload->report->report_block[i]->ia_jitter); + ast_str_append(&report_string, 0, "Report%dLSR: %u\r\n", + i, payload->report->report_block[i]->lsr); + ast_str_append(&report_string, 0, "Report%dDLSR: %4.4f\r\n", + i, ((double)payload->report->report_block[i]->dlsr) / 65536); + ast_str_append(&packet_string, 0, "%s", ast_str_buffer(report_string)); + } + + return ast_manager_event_blob_create(EVENT_FLAG_REPORTING, + stasis_message_type(msg) == ast_rtp_rtcp_received_type() ? "RTCPReceived" : "RTCPSent", + "%s%s", + AS_OR(channel_string, ""), + ast_str_buffer(packet_string)); +} + +static struct ast_json *rtcp_report_to_json(struct stasis_message *msg) +{ + struct rtcp_message_payload *payload = stasis_message_data(msg); + RAII_VAR(struct ast_json *, json_rtcp_report, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, json_rtcp_report_blocks, NULL, ast_json_unref); + RAII_VAR(struct ast_json *, json_rtcp_sender_info, NULL, ast_json_unref); + struct ast_json * json_payload; + int i; + + json_rtcp_report_blocks = ast_json_array_create(); + if (!json_rtcp_report_blocks) { + return NULL; + } + + for (i = 0; i < payload->report->reception_report_count; i++) { + struct ast_json *json_report_block; + json_report_block = ast_json_pack("{s: i, s: i, s: i, s: i, s: i, s: i, s: i}", + "source_ssrc", payload->report->report_block[i]->source_ssrc, + "fraction_lost", payload->report->report_block[i]->lost_count.fraction, + "packets_lost", payload->report->report_block[i]->lost_count.packets, + "highest_seq_no", payload->report->report_block[i]->highest_seq_no, + "ia_jitter", payload->report->report_block[i]->ia_jitter, + "lsr", payload->report->report_block[i]->lsr, + "dlsr", payload->report->report_block[i]->dlsr); + if (!json_report_block) { + return NULL; + } + + if (ast_json_array_append(json_rtcp_report_blocks, json_report_block)) { + return NULL; + } + } + + if (payload->report->type == AST_RTP_RTCP_SR) { + json_rtcp_sender_info = ast_json_pack("{s: i, s: i, s: i, s: i, s: i}", + "ntp_timestamp_sec", payload->report->sender_information.ntp_timestamp.tv_sec, + "ntp_timestamp_usec", payload->report->sender_information.ntp_timestamp.tv_usec, + "rtp_timestamp", payload->report->sender_information.rtp_timestamp, + "packets", payload->report->sender_information.packet_count, + "octets", payload->report->sender_information.octet_count); + if (!json_rtcp_sender_info) { + return NULL; + } + } + + json_rtcp_report = ast_json_pack("{s: i, s: i, s: i, s: O, s: O}", + "ssrc", payload->report->ssrc, + "type", payload->report->type, + "report_count", payload->report->reception_report_count, + "sender_information", json_rtcp_sender_info ? json_rtcp_sender_info : ast_json_null(), + "report_blocks", json_rtcp_report_blocks); + if (!json_rtcp_report) { + return NULL; + } + + json_payload = ast_json_pack("{s: O, s: O, s: O}", + "channel", payload->snapshot ? ast_channel_snapshot_to_json(payload->snapshot) : ast_json_null(), + "rtcp_report", json_rtcp_report, + "blob", payload->blob); + return json_payload; +} + +static void rtp_rtcp_report_dtor(void *obj) +{ + int i; + struct ast_rtp_rtcp_report *rtcp_report = obj; + + for (i = 0; i < rtcp_report->reception_report_count; i++) { + ast_free(rtcp_report->report_block[i]); + } +} + +struct ast_rtp_rtcp_report *ast_rtp_rtcp_report_alloc(unsigned int report_blocks) +{ + struct ast_rtp_rtcp_report *rtcp_report; + + /* Size of object is sizeof the report + the number of report_blocks * sizeof pointer */ + rtcp_report = ao2_alloc((sizeof(*rtcp_report) + report_blocks * sizeof(struct ast_rtp_rtcp_report_block *)), + rtp_rtcp_report_dtor); + + return rtcp_report; +} + +void ast_rtp_publish_rtcp_message(struct ast_rtp_instance *rtp, + struct stasis_message_type *message_type, + struct ast_rtp_rtcp_report *report, + struct ast_json *blob) +{ + RAII_VAR(struct rtcp_message_payload *, payload, + ao2_alloc(sizeof(*payload), rtcp_message_payload_dtor), ao2_cleanup); + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + + if (!payload || !report) { + return; + } + + if (!ast_strlen_zero(rtp->channel_uniqueid)) { + snapshot = ast_channel_snapshot_get_latest(rtp->channel_uniqueid); + if (snapshot) { + ao2_ref(snapshot, +1); + } + } + + if (blob) { + ast_json_ref(blob); + } + ao2_ref(report, 1); + payload->snapshot = snapshot; + payload->blob = blob; + payload->report = report; + + message = stasis_message_create(message_type, payload); + if (!message) { + return; + } + + stasis_publish(ast_rtp_topic(), message); +} + +/*! + * @{ \brief Define RTCP/RTP message types. + */ +STASIS_MESSAGE_TYPE_DEFN(ast_rtp_rtcp_sent_type, + .to_ami = rtcp_report_to_ami, + .to_json = rtcp_report_to_json,); +STASIS_MESSAGE_TYPE_DEFN(ast_rtp_rtcp_received_type, + .to_ami = rtcp_report_to_ami, + .to_json = rtcp_report_to_json,); +/*! @} */ + +struct stasis_topic *ast_rtp_topic(void) +{ + return rtp_topic; +} + +static void rtp_engine_shutdown(void) +{ + ao2_cleanup(rtp_topic); + rtp_topic = NULL; + STASIS_MESSAGE_TYPE_CLEANUP(ast_rtp_rtcp_received_type); + STASIS_MESSAGE_TYPE_CLEANUP(ast_rtp_rtcp_sent_type); +} + int ast_rtp_engine_init() { struct ast_format tmpfmt; @@ -1559,6 +1928,14 @@ int ast_rtp_engine_init() ast_rwlock_init(&mime_types_lock); ast_rwlock_init(&static_RTP_PT_lock); + rtp_topic = stasis_topic_create("rtp_topic"); + if (!rtp_topic) { + return -1; + } + STASIS_MESSAGE_TYPE_INIT(ast_rtp_rtcp_sent_type); + STASIS_MESSAGE_TYPE_INIT(ast_rtp_rtcp_received_type); + ast_register_atexit(rtp_engine_shutdown); + /* Define all the RTP mime types available */ set_next_mime_type(ast_format_set(&tmpfmt, AST_FORMAT_G723_1, 0), 0, "audio", "G723", 8000); set_next_mime_type(ast_format_set(&tmpfmt, AST_FORMAT_GSM, 0), 0, "audio", "GSM", 8000); |