diff --git a/examples/echo/lib/echo/peer_handler.ex b/examples/echo/lib/echo/peer_handler.ex index 0050c6b..ab452e3 100644 --- a/examples/echo/lib/echo/peer_handler.ex +++ b/examples/echo/lib/echo/peer_handler.ex @@ -131,7 +131,7 @@ defmodule Echo.PeerHandler do defp handle_webrtc_msg({:rtcp, packets}, state) do for packet <- packets do case packet do - %ExRTCP.Packet.PayloadFeedback.PLI{} when state.in_video_track_id != nil -> + {track_id, %ExRTCP.Packet.PayloadFeedback.PLI{}} when state.in_video_track_id != nil -> Logger.info("Received keyframe request. Sending PLI.") :ok = PeerConnection.send_pli(state.peer_connection, state.in_video_track_id, "h") diff --git a/examples/whip_whep/lib/whip_whep/forwarder.ex b/examples/whip_whep/lib/whip_whep/forwarder.ex index 1281004..00c591b 100644 --- a/examples/whip_whep/lib/whip_whep/forwarder.ex +++ b/examples/whip_whep/lib/whip_whep/forwarder.ex @@ -119,7 +119,7 @@ defmodule WhipWhep.Forwarder do def handle_info({:ex_webrtc, _pc, {:rtcp, packets}}, state) do for packet <- packets do case packet do - %ExRTCP.Packet.PayloadFeedback.PLI{} when state.input_pc != nil -> + {_track_id, %ExRTCP.Packet.PayloadFeedback.PLI{}} when state.input_pc != nil -> :ok = PeerConnection.send_pli(state.input_pc, state.video_input) _other -> diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index b2b92d8..884e3b8 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -65,6 +65,9 @@ defmodule ExWebRTC.PeerConnection do * `:track_muted`, `:track_ended` - these match the [MediaStreamTrack events](https://developer.mozilla.org/en-US/docs/Web/API/MediaStreamTrack#events). * `:rtp` and `:rtcp` - these contain packets received by the PeerConnection. The third element of `:rtp` tuple is a simulcast RID and is set to `nil` if simulcast is not used. + * each of the packets in `:rtcp` message is in the form of `{track_id, packet}` tuple, where `track_id` is the id of the corrsponding track. + In case of PLI and NACK, this is the id of an outgoing (sender's) track id, in case of Sender and Receiver Reports - incoming (receiver's) track id. + If matching to a track was not possible (like in the case of TWCC packedts), `track_id` is set to `nil`. """ @type message() :: {:ex_webrtc, pid(), @@ -78,7 +81,7 @@ defmodule ExWebRTC.PeerConnection do | {:track_muted, MediaStreamTrack.id()} | {:track_ended, MediaStreamTrack.id()} | {:rtp, MediaStreamTrack.id(), String.t() | nil, ExRTP.Packet.t()}} - | {:rtcp, [ExRTCP.Packet.packet()]} + | {:rtcp, [{MediaStreamTrack.id() | nil, ExRTCP.Packet.packet()}]} #### NON-MDN-API #### @@ -1134,9 +1137,10 @@ defmodule ExWebRTC.PeerConnection do def handle_info({:dtls_transport, _pid, {:rtcp, data}}, state) do case ExRTCP.CompoundPacket.decode(data) do {:ok, packets} -> - state = - Enum.reduce(packets, state, fn packet, state -> - handle_rtcp_packet(state, packet) + {packets, state} = + Enum.map_reduce(packets, state, fn packet, state -> + {track_id, state} = handle_rtcp_packet(state, packet) + {{track_id, packet}, state} end) notify(state.owner, {:rtcp, packets}) @@ -1796,20 +1800,31 @@ defmodule ExWebRTC.PeerConnection do end end + defp handle_rtcp_packet(state, %ExRTCP.Packet.ReceiverReport{} = report) do + with true <- :rtcp_reports in state.config.features, + {:ok, mid} <- Demuxer.demux_ssrc(state.demuxer, report.ssrc), + {_idx, transceiver} <- find_transceiver(state.transceivers, mid) do + {transceiver.receiver.track.id, state} + else + _other -> + {nil, state} + end + end + defp handle_rtcp_packet(state, %ExRTCP.Packet.SenderReport{} = report) do with true <- :rtcp_reports in state.config.features, {:ok, mid} <- Demuxer.demux_ssrc(state.demuxer, report.ssrc), {idx, transceiver} <- find_transceiver(state.transceivers, mid) do transceiver = RTPTransceiver.receive_report(transceiver, report) transceivers = List.replace_at(state.transceivers, idx, transceiver) - %{state | transceivers: transceivers} + {transceiver.receiver.track.id, %{state | transceivers: transceivers}} else false -> - state + {nil, state} _other -> Logger.warning("Unable to handle RTCP Sender Report, packet: #{inspect(report)}") - state + {nil, state} end end @@ -1820,19 +1835,21 @@ defmodule ExWebRTC.PeerConnection do |> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == nack.media_ssrc end) |> case do nil -> - state + {nil, state} # in case NACK was received, but RTX was not negotiated # as NACK and RTX are negotiated independently - {%{sender: %{rtx_pt: nil}}, _idx} -> - state + {%{sender: %{rtx_pt: nil}} = tr, _idx} -> + {tr.sender.track.id, state} {tr, idx} -> {packets, tr} = RTPTransceiver.receive_nack(tr, nack) for packet <- packets, do: send_rtp(self(), tr.sender.track.id, packet, rtx?: true) transceivers = List.replace_at(state.transceivers, idx, tr) - %{state | transceivers: transceivers} + {tr.sender.track.id, %{state | transceivers: transceivers}} end + else + {nil, state} end end @@ -1842,16 +1859,16 @@ defmodule ExWebRTC.PeerConnection do |> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == pli.media_ssrc end) |> case do nil -> - state + {nil, state} {tr, idx} -> tr = RTPTransceiver.receive_pli(tr, pli) transceivers = List.replace_at(state.transceivers, idx, tr) - %{state | transceivers: transceivers} + {tr.sender.track.id, %{state | transceivers: transceivers}} end end - defp handle_rtcp_packet(state, _packet), do: state + defp handle_rtcp_packet(state, _packet), do: {nil, state} defp do_get_description(nil, _candidates), do: nil diff --git a/test/ex_webrtc/peer_connection_test.exs b/test/ex_webrtc/peer_connection_test.exs index 86d7489..c376fb0 100644 --- a/test/ex_webrtc/peer_connection_test.exs +++ b/test/ex_webrtc/peer_connection_test.exs @@ -998,6 +998,70 @@ defmodule ExWebRTC.PeerConnectionTest do # MISC TESTS + describe "RTCP" do + test "sends and handles PLI" do + # turn off features in order to not receive unwanted RTCP packets + {:ok, pc1} = PeerConnection.start_link(features: []) + {:ok, pc2} = PeerConnection.start_link(features: []) + + %MediaStreamTrack{id: id1} = track = MediaStreamTrack.new(:video) + {:ok, _sender} = PeerConnection.add_track(pc1, track) + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:track, %MediaStreamTrack{kind: :video, id: id2}}} + + # we have to send something on the track, otherwise pc2 won't know the ssrc of its track + :ok = PeerConnection.send_rtp(pc1, track.id, ExRTP.Packet.new(<<3, 2, 5>>)) + assert_receive {:ex_webrtc, ^pc2, {:rtp, ^id2, _rid, _packet}} + + assert :ok = PeerConnection.send_pli(pc2, id2) + + assert_receive {:ex_webrtc, ^pc1, {:rtcp, [{^id1, pli}]}} + assert %ExRTCP.Packet.PayloadFeedback.PLI{} = pli + end + + test "sends NACK" do + # turn off features in order to not receive unwanted RTCP packets + {:ok, pc1} = + PeerConnection.start_link(features: [:inbound_rtx, :outbound_rtx], rtcp_feedbacks: []) + + {:ok, pc2} = + PeerConnection.start_link(features: [:inbound_rtx, :outbound_rtx], rtcp_feedbacks: []) + + %MediaStreamTrack{id: id1} = track = MediaStreamTrack.new(:video) + {:ok, _sender} = PeerConnection.add_track(pc1, track) + + :ok = negotiate(pc1, pc2) + :ok = connect(pc1, pc2) + + assert_receive {:ex_webrtc, ^pc2, {:track, %MediaStreamTrack{kind: :video, id: id2}}} + + # we have to send something on the track, otherwise pc2 won't know the ssrc of its track + :ok = + PeerConnection.send_rtp( + pc1, + track.id, + ExRTP.Packet.new(<<3, 2, 5>>, sequence_number: 100) + ) + + assert_receive {:ex_webrtc, ^pc2, {:rtp, ^id2, _rid, _packet}} + + :ok = + PeerConnection.send_rtp( + pc1, + track.id, + ExRTP.Packet.new(<<3, 2, 5>>, sequence_number: 102) + ) + + assert_receive {:ex_webrtc, ^pc2, {:rtp, ^id2, _rid, _packet}} + + assert_receive {:ex_webrtc, ^pc1, {:rtcp, [{^id1, nack}]}} + assert %ExRTCP.Packet.TransportFeedback.NACK{nacks: [%{pid: 101}]} = nack + end + end + describe "send data in both directions on a single transceiver" do test "using one negotiation" do {:ok, pc1} = PeerConnection.start_link()