From 5c0c3837d64bd44b281be12de31627c8c3b305e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Thu, 29 Feb 2024 14:36:27 +0100 Subject: [PATCH] Add support for RTCP Receiver Reports (#77) --- lib/ex_webrtc/rtp_receiver/report_recorder.ex | 201 +++++++++++++++++ .../rtp_receiver/report_recorder_test.exs | 212 ++++++++++++++++++ 2 files changed, 413 insertions(+) create mode 100644 lib/ex_webrtc/rtp_receiver/report_recorder.ex create mode 100644 test/ex_webrtc/rtp_receiver/report_recorder_test.exs diff --git a/lib/ex_webrtc/rtp_receiver/report_recorder.ex b/lib/ex_webrtc/rtp_receiver/report_recorder.ex new file mode 100644 index 00000000..22ef0154 --- /dev/null +++ b/lib/ex_webrtc/rtp_receiver/report_recorder.ex @@ -0,0 +1,201 @@ +defmodule ExWebRTC.RTPReceiver.ReportRecorder do + @moduledoc false + # based on https://datatracker.ietf.org/doc/html/rfc3550#section-6.4.1 + + import Bitwise + + alias ExRTCP.Packet.{ReceiverReport, ReceptionReport} + + @max_u32 0xFFFFFFFF + @max_u24 0xFFFFFF + @max_seq_no 0xFFFF + @breakpoint 0x7FFF + + @type t() :: %__MODULE__{ + sender_ssrc: non_neg_integer(), + media_ssrc: non_neg_integer(), + clock_rate: non_neg_integer(), + lost_packets: MapSet.t(), + last_seq_no: {non_neg_integer(), ExRTP.Packet.uint16()}, + last_report_seq_no: {non_neg_integer(), ExRTP.Packet.uint16()}, + last_rtp_timestamp: ExRTP.Packet.uint32(), + last_timestamp: integer(), + last_sr_ntp_timestamp: ExRTP.Packet.uint32(), + last_sr_timestamp: integer(), + jitter: float(), + total_lost: non_neg_integer() + } + + @enforce_keys [:sender_ssrc, :media_ssrc, :clock_rate] + defstruct [ + lost_packets: MapSet.new(), + last_seq_no: nil, + last_report_seq_no: nil, + last_rtp_timestamp: nil, + last_timestamp: nil, + last_sr_ntp_timestamp: 0, + last_sr_timestamp: nil, + jitter: 0, + total_lost: 0 + ] ++ @enforce_keys + + @doc """ + Records incoming RTP Packet. + `time` parameter accepts output of `System.monotonic_time(:native)` as a value. + """ + @spec record_packet(t(), ExRTP.Packet.t(), integer()) :: t() + def record_packet(%{last_seq_no: nil} = recorder, packet, time) do + # seq_no == {cycle_no, seq_no as in RTP packet} + %__MODULE__{ + recorder + | last_seq_no: {0, packet.sequence_number}, + last_report_seq_no: {0, packet.sequence_number - 1}, + last_rtp_timestamp: packet.timestamp, + last_timestamp: time + } + end + + def record_packet(recorder, packet, time) do + recorder + |> record_seq_no(packet.sequence_number) + |> record_jitter(packet.timestamp, time) + end + + @doc """ + Records incoming RTCP Sender Report. + `time` parameter accepts output of `System.monotonic_time(:native)` as a value. + """ + @spec record_report(t(), ExRTCP.Packet.SenderReport.t(), integer()) :: t() + def record_report(recorder, sender_report, time) do + # we take the middle 32 bits of the NTP timestamp + ntp_ts = sender_report.ntp_timestamp >>> 16 &&& @max_u32 + + %__MODULE__{recorder | last_sr_ntp_timestamp: ntp_ts, last_sr_timestamp: time} + end + + @doc """ + Creates an RTCP Receiver Report. + `time` parameter accepts output of `System.monotonic_time(:native)` as a value. + """ + @spec get_report(t(), integer()) :: {ReceiverReport.t(), t()} + def get_report(recorder, time) do + received = + recorder.last_seq_no + |> seq_no_diff(recorder.last_report_seq_no) + |> min(@max_u24) + + lost = + recorder.lost_packets + |> MapSet.size() + |> min(@max_u24) + + total_lost = min(recorder.total_lost + lost, @max_u24) + + {cycle, seq_no} = recorder.last_seq_no + + report = %ReceiverReport{ + ssrc: recorder.sender_ssrc, + reports: [ + %ReceptionReport{ + ssrc: recorder.media_ssrc, + delay: round(delay_since(time, recorder.last_sr_timestamp) * 65_536), + last_sr: recorder.last_sr_ntp_timestamp, + last_sequence_number: (cycle <<< 16 &&& @max_u32) ||| seq_no, + fraction_lost: round(lost * 256 / received), + total_lost: total_lost, + jitter: round(recorder.jitter) + } + ] + } + + recorder = %__MODULE__{ + recorder + | lost_packets: MapSet.new(), + last_report_seq_no: recorder.last_seq_no, + total_lost: total_lost + } + + {report, recorder} + end + + defp record_seq_no(recorder, rtp_seq_no) do + %__MODULE__{ + lost_packets: lost_packets, + last_seq_no: {last_cycle, last_rtp_seq_no} = last_seq_no + } = recorder + + delta = rtp_seq_no - last_rtp_seq_no + + cycle = + cond do + delta in -@breakpoint..@breakpoint -> last_cycle + delta < -@breakpoint -> last_cycle + 1 + delta > @breakpoint -> last_cycle - 1 + end + + # NOTICE: cycle might be -1 in very specific cases (e.g. the very first packet is 2^16 - 1, + # second packet is 0, but we received the second packet first). + # We just ignore these packets. Similarly, we ignore packets that arrived late + # (counted as lost in previous report) instead of changing the last_report_seq_no + # to lower value to include them. + seq_no = {cycle, rtp_seq_no} + + {last_seq_no, lost_packets} = + if seq_no <= last_seq_no do + lost_packets = MapSet.delete(lost_packets, seq_no) + {last_seq_no, lost_packets} + else + lost_packets = set_lost_packets(next_seq_no(last_seq_no), seq_no, lost_packets) + {seq_no, lost_packets} + end + + %__MODULE__{recorder | last_seq_no: last_seq_no, lost_packets: lost_packets} + end + + defp set_lost_packets(start_seq_no, end_seq_no, lost_packets) + when start_seq_no == end_seq_no, + do: lost_packets + + defp set_lost_packets(start_seq_no, end_seq_no, lost_packets) do + lost_packets = MapSet.put(lost_packets, start_seq_no) + set_lost_packets(next_seq_no(start_seq_no), end_seq_no, lost_packets) + end + + defp next_seq_no({cycle, @max_seq_no}), do: {cycle + 1, 0} + defp next_seq_no({cycle, seq_no}), do: {cycle, seq_no + 1} + + defp record_jitter(recorder, rtp_ts, cur_ts) do + %__MODULE__{ + last_rtp_timestamp: last_rtp_ts, + last_timestamp: last_ts, + jitter: jitter, + clock_rate: clock_rate + } = recorder + + wlc_diff = native_to_sec(cur_ts - last_ts) + rtp_diff = rtp_ts - last_rtp_ts + diff = wlc_diff * clock_rate - rtp_diff + jitter = jitter + (abs(diff) - jitter) / 16 + + %__MODULE__{ + recorder + | last_rtp_timestamp: rtp_ts, + last_timestamp: cur_ts, + jitter: jitter + } + end + + defp native_to_sec(time) do + native_in_sec = System.convert_time_unit(1, :second, :native) + time / native_in_sec + end + + defp seq_no_diff({cycle_a, seq_no_a}, {cycle_b, seq_no_b}) do + cycle_diff = cycle_a - cycle_b + seq_no_diff = seq_no_a - seq_no_b + cycle_diff * (@max_seq_no + 1) + seq_no_diff + end + + defp delay_since(_cur_ts, nil), do: 0 + defp delay_since(cur_ts, last_ts), do: native_to_sec(cur_ts - last_ts) +end diff --git a/test/ex_webrtc/rtp_receiver/report_recorder_test.exs b/test/ex_webrtc/rtp_receiver/report_recorder_test.exs new file mode 100644 index 00000000..f25b7cdf --- /dev/null +++ b/test/ex_webrtc/rtp_receiver/report_recorder_test.exs @@ -0,0 +1,212 @@ +defmodule ExWebRTC.RTPReceiver.ReportRecorderTest do + use ExUnit.Case, async: true + + import Bitwise + + alias ExRTCP.Packet.{SenderReport, ReceiverReport, ReceptionReport} + alias ExRTP.Packet + alias ExWebRTC.RTPReceiver.ReportRecorder + + @rand_ts System.monotonic_time() + @seq_no 11_534 + @rtp_ts 234_444 + @packet Packet.new(<<>>, sequence_number: @seq_no, timestamp: @rtp_ts) + @clock_rate 90_000 + @sender_report %SenderReport{ + ssrc: 0, + ntp_timestamp: 0xFFFF11111111FFFF, + rtp_timestamp: 0, + packet_count: 0, + octet_count: 0 + } + @recorder %ReportRecorder{ + media_ssrc: 123_456, + sender_ssrc: 654_321, + clock_rate: @clock_rate + } + + test "record_report/3" do + ts = System.monotonic_time() + recorder = ReportRecorder.record_report(@recorder, @sender_report, ts) + + assert %ReportRecorder{ + last_sr_ntp_timestamp: 0x11111111, + last_sr_timestamp: ^ts + } = recorder + end + + describe "record_packet/3" do + test "initial packet" do + recorder = ReportRecorder.record_packet(@recorder, @packet, @rand_ts) + + last_report_seq_no = @seq_no - 1 + + assert %ReportRecorder{ + last_seq_no: {0, @seq_no}, + last_report_seq_no: {0, ^last_report_seq_no}, + last_rtp_timestamp: @rtp_ts, + last_timestamp: @rand_ts + } = recorder + end + + test "subsequent packets" do + packet1 = %Packet{@packet | sequence_number: @seq_no - 3} + packet2 = %Packet{@packet | sequence_number: @seq_no - 2} + packet3 = %Packet{@packet | sequence_number: @seq_no - 1} + + recorder = + @recorder + |> ReportRecorder.record_packet(packet1, @rand_ts) + |> ReportRecorder.record_packet(packet3, @rand_ts) + |> ReportRecorder.record_packet(@packet, @rand_ts) + |> ReportRecorder.record_packet(packet2, @rand_ts) + + last_report_seq_no = @seq_no - 4 + + assert %ReportRecorder{ + lost_packets: lost_packets, + last_seq_no: {0, @seq_no}, + last_report_seq_no: {0, ^last_report_seq_no} + } = recorder + + assert MapSet.size(lost_packets) == 0 + end + + test "missing packets" do + packet0 = %Packet{@packet | sequence_number: @seq_no - 10} + packet1 = %Packet{@packet | sequence_number: @seq_no - 6} + packet2 = %Packet{@packet | sequence_number: @seq_no - 3} + packet3 = %Packet{@packet | sequence_number: @seq_no - 1} + + recorder = + @recorder + |> ReportRecorder.record_packet(packet1, @rand_ts) + # packet 0 will be ignored, + # see comment in rtp_receiver/report_recorder.ex + |> ReportRecorder.record_packet(packet0, @rand_ts) + |> ReportRecorder.record_packet(packet3, @rand_ts) + |> ReportRecorder.record_packet(@packet, @rand_ts) + |> ReportRecorder.record_packet(packet2, @rand_ts) + + last_report_seq_no = @seq_no - 7 + + assert %ReportRecorder{ + lost_packets: lost_packets, + last_seq_no: {0, @seq_no}, + last_report_seq_no: {0, ^last_report_seq_no} + } = recorder + + actually_lost = + [@seq_no - 5, @seq_no - 4, @seq_no - 2] + |> Enum.map(&{0, &1}) + |> MapSet.new() + + assert actually_lost == lost_packets + end + + test "packets with wrapping sequence numbers" do + max_seq_no = 65_535 + packet1 = %Packet{@packet | sequence_number: max_seq_no - 1} + packet2 = %Packet{@packet | sequence_number: 1} + packet3 = %Packet{@packet | sequence_number: max_seq_no} + packet4 = %Packet{@packet | sequence_number: 0} + + recorder = + @recorder + |> ReportRecorder.record_packet(packet1, @rand_ts) + |> ReportRecorder.record_packet(packet2, @rand_ts) + |> ReportRecorder.record_packet(packet3, @rand_ts) + |> ReportRecorder.record_packet(packet4, @rand_ts) + + sr_seq_no = {0, max_seq_no - 2} + seq_no = {1, 1} + + assert %ReportRecorder{ + lost_packets: lost_packets, + last_seq_no: ^seq_no, + last_report_seq_no: ^sr_seq_no + } = recorder + + assert MapSet.size(lost_packets) == 0 + end + + test "properly calculates jitter" do + # 20 ms = clock_rate * (20/1000) in RTP timestamp units + ts_diff = 20 + rtp_ts_diff = @clock_rate * (ts_diff / 1000) + arrival_ts_diff = System.convert_time_unit(ts_diff, :millisecond, :native) + + packet = %Packet{@packet | timestamp: @rtp_ts + rtp_ts_diff} + arrival_ts = @rand_ts + arrival_ts_diff + System.convert_time_unit(1, :millisecond, :native) + + recorder = + @recorder + |> ReportRecorder.record_packet(@packet, @rand_ts) + |> ReportRecorder.record_packet(packet, arrival_ts) + + # second packet arrived 1 millisecond late + # thus, jitter should be roughly equal to 1 millisecond / 16 in RTP ts units + assert_in_delta recorder.jitter, @clock_rate / 1000 / 16, 0.5 + + # remaining packets arrived perfectly on time + # so the jitter should slowly converge to 0 + recorder = + Enum.reduce(2..100, recorder, fn i, recorder -> + packet = %Packet{@packet | timestamp: @rtp_ts + i * rtp_ts_diff} + arrival_ts = @rand_ts + i * arrival_ts_diff + ReportRecorder.record_packet(recorder, packet, arrival_ts) + end) + + assert_in_delta recorder.jitter, 0, 0.5 + end + end + + test "get_report/2" do + end_seq_no = 65_532 + + recorder = + 65_500..end_seq_no + |> Enum.reduce(@recorder, fn i, recorder -> + packet = %Packet{@packet | sequence_number: i} + ReportRecorder.record_packet(recorder, packet, @rand_ts) + end) + |> ReportRecorder.record_report(@sender_report, @rand_ts) + + one_second = System.convert_time_unit(1, :second, :native) + assert {report, recorder} = ReportRecorder.get_report(recorder, @rand_ts + one_second) + assert %ReceiverReport{reports: [rec_report]} = report + + assert %ReceptionReport{ + delay: 65_536, + last_sr: 0x11111111, + fraction_lost: 0, + last_sequence_number: ^end_seq_no, + total_lost: 0 + } = rec_report + + # now let's drop some and wrap around seq_no boundary + end_seq_no = 30 + + recorder = + Enum.reduce(0..end_seq_no//3, recorder, fn i, recorder -> + packet = %Packet{@packet | sequence_number: i} + ReportRecorder.record_packet(recorder, packet, @rand_ts) + end) + + assert {report, _recorder} = ReportRecorder.get_report(recorder, @rand_ts + 2 * one_second) + assert %ReceiverReport{reports: [rec_report]} = report + + lost = 23 + + assert %ReceptionReport{ + delay: 131_072, + last_sr: 0x11111111, + fraction_lost: fraction_lost, + last_sequence_number: last_sequence_number, + total_lost: ^lost + } = rec_report + + assert last_sequence_number == (1 <<< 16 ||| end_seq_no) + assert fraction_lost == round(lost * 256 / 34) + end +end