Skip to content

Commit

Permalink
review fixes vol2 pt1
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Sep 16, 2024
1 parent c0ef08d commit bdee2d0
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 173 deletions.
42 changes: 20 additions & 22 deletions lib/ex_webrtc/rtp/jitter_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule ExWebRTC.RTP.JitterBuffer do
@type options :: [latency: non_neg_integer()]

@typedoc """
Time (in milliseconds) after which `handle_timer/1` should be called.
Time (in milliseconds) after which `handle_timeout/1` should be called.
Can be `nil`, in which case no timer needs to be set.
"""
@type timer :: non_neg_integer() | nil
Expand Down Expand Up @@ -63,19 +63,19 @@ defmodule ExWebRTC.RTP.JitterBuffer do
Places a packet in the JitterBuffer.
Note: The initial latency timer will be set after the first packet is inserted into the buffer.
If you want to start it at your own discretion, schedule a `handle_timer/1` call prior to that.
If you want to start it at your own discretion, schedule a `handle_timeout/1` call prior to that.
"""
@spec place_packet(t(), Packet.t()) :: result()
def place_packet(buffer, packet)
@spec insert(t(), Packet.t()) :: result()
def insert(buffer, packet)

def place_packet(%{state: :initial_wait} = buffer, packet) do
def insert(%{state: :initial_wait} = buffer, packet) do
{buffer, timer} = maybe_set_timer(buffer)
{_result, buffer} = try_insert_packet(buffer, packet)

{[], timer, buffer}
end

def place_packet(buffer, packet) do
def insert(buffer, packet) do
case try_insert_packet(buffer, packet) do
{:ok, buffer} -> send_packets(buffer)
{:error, buffer} -> {[], nil, buffer}
Expand All @@ -92,22 +92,22 @@ defmodule ExWebRTC.RTP.JitterBuffer do
packets =
buffer.store
|> PacketStore.dump()
|> records_to_packets()
|> handle_missing_packets()

{packets, nil, %__MODULE__{latency: buffer.latency}}
end

@doc """
Handles the end of a previously set timer.
"""
@spec handle_timer(t()) :: result()
def handle_timer(buffer) do
@spec handle_timeout(t()) :: result()
def handle_timeout(buffer) do
%__MODULE__{buffer | state: :timer_not_set} |> send_packets()
end

@spec try_insert_packet(t(), Packet.t()) :: {:ok | :error, t()}
defp try_insert_packet(buffer, packet) do
case PacketStore.insert_packet(buffer.store, packet) do
case PacketStore.insert(buffer.store, packet) do
{:ok, store} -> {:ok, %__MODULE__{buffer | store: store}}
{:error, :late_packet} -> {:error, buffer}
end
Expand All @@ -116,41 +116,39 @@ defmodule ExWebRTC.RTP.JitterBuffer do
@spec send_packets(t()) :: result()
defp send_packets(%{store: store} = buffer) do
# Flush packets that stayed in queue longer than latency and any gaps before them
{too_old_records, store} = PacketStore.flush_older_than(store, buffer.latency)
{too_old_packets, store} = PacketStore.flush_older_than(store, buffer.latency)
# Additionally, flush packets as long as there are no gaps
{gapless_records, store} = PacketStore.flush_ordered(store)
{gapless_packets, store} = PacketStore.flush_ordered(store)

packets =
too_old_records
|> Stream.concat(gapless_records)
|> records_to_packets()
too_old_packets
|> Stream.concat(gapless_packets)
|> handle_missing_packets()

{buffer, timer} = maybe_set_timer(%__MODULE__{buffer | store: store})

{packets, timer, buffer}
end

@spec records_to_packets(Enumerable.t(PacketStore.Record.t())) :: [Packet.t()]
defp records_to_packets(records) do
records
@spec handle_missing_packets(Enumerable.t(Packet.t() | nil)) :: [Packet.t()]
defp handle_missing_packets(packets) do
# TODO: nil -- missing packet (maybe owner should be notified about that)
|> Stream.reject(&is_nil/1)
|> Enum.map(& &1.packet)
Enum.reject(packets, &is_nil/1)
end

@spec maybe_set_timer(t()) :: {t(), timer()}
defp maybe_set_timer(buffer)

defp maybe_set_timer(%{state: :initial_wait} = buffer) do
case PacketStore.first_record_timestamp(buffer.store) do
case PacketStore.first_entry_timestamp(buffer.store) do
# If we're inserting the very first packet, set the initial latency timer
nil -> {buffer, buffer.latency}
_ts -> {buffer, nil}
end
end

defp maybe_set_timer(%{state: :timer_not_set} = buffer) do
case PacketStore.first_record_timestamp(buffer.store) do
case PacketStore.first_entry_timestamp(buffer.store) do
nil ->
{buffer, nil}

Expand Down
139 changes: 71 additions & 68 deletions lib/ex_webrtc/rtp/jitter_buffer/packet_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do

import Bitwise

defmodule Record do
defmodule Entry do
@moduledoc false
# Describes a structure that is stored in the PacketStore.

Expand All @@ -29,9 +29,9 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
end

@doc """
Compares two records.
Compares two entries.
Returns true if the first record is older than the second one.
Returns true if the first entry is older than the second one.
"""
@spec comparator(t(), t()) :: boolean()
# Designed to be used with Heap: https://gitlab.com/jimsy/heap/blob/master/lib/heap.ex#L71
Expand All @@ -43,7 +43,7 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do

defstruct flush_index: nil,
highest_incoming_index: nil,
heap: Heap.new(&Record.comparator/2),
heap: Heap.new(&Entry.comparator/2),
set: MapSet.new(),
rollover_count: 0

Expand All @@ -56,7 +56,7 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
- `highest_incoming_index` - the highest index in the buffer so far, mapping to the most recently produced
RTP packet placed in JitterBuffer
- `rollover_count` - count of all performed rollovers (cycles of sequence number)
- `heap` - contains records containing packets
- `heap` - contains entries containing packets
- `set` - helper structure for faster read operations; content is the same as in `heap`
"""
@type t :: %__MODULE__{
Expand All @@ -73,18 +73,18 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
Each subsequent packet must have sequence number greater than the previously returned
one or be part of a rollover.
"""
@spec insert_packet(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:error, :late_packet}
def insert_packet(store, %{sequence_number: seq_num} = packet) do
@spec insert(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:error, :late_packet}
def insert(store, %{sequence_number: seq_num} = packet) do
do_insert_packet(store, packet, seq_num)
end

defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, 0) do
store = add_record(store, Record.new(packet, @seq_number_limit), :next)
store = add_entry(store, Entry.new(packet, @seq_number_limit), :next)
{:ok, %__MODULE__{store | flush_index: @seq_number_limit - 1}}
end

defp do_insert_packet(%__MODULE__{flush_index: nil} = store, packet, seq_num) do
store = add_record(store, Record.new(packet, seq_num), :current)
store = add_entry(store, Entry.new(packet, seq_num), :current)
{:ok, %__MODULE__{store | flush_index: seq_num - 1}}
end

Expand All @@ -106,86 +106,52 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
:next -> {:next, seq_num + (roc + 1) * @seq_number_limit}
end

if fresh_packet?(flush_index, index) do
record = Record.new(packet, index)
{:ok, add_record(store, record, rollover)}
if index > flush_index do
entry = Entry.new(packet, index)
{:ok, add_entry(store, entry, rollover)}
else
{:error, :late_packet}
end
end

@doc """
Flushes the store to the packet with the next sequence number.
If this packet is present, it will be returned.
Otherwise it will be treated as late and rejected on attempt to insert into the store.
Flushes the store until the first gap in sequence numbers of entries
"""
@spec flush_one(t()) :: {Record.t() | nil, t()}
def flush_one(store)

def flush_one(%__MODULE__{flush_index: nil} = store) do
{nil, store}
end

def flush_one(%__MODULE__{flush_index: flush_index, heap: heap, set: set} = store) do
record = Heap.root(heap)

expected_next_index = flush_index + 1

{result, store} =
if record != nil and record.index == expected_next_index do
updated_heap = Heap.pop(heap)
updated_set = MapSet.delete(set, record.index)

updated_store = %__MODULE__{store | heap: updated_heap, set: updated_set}

{record, updated_store}
else
# TODO: instead of nil use expected_next_index to notify owner about missing packet
{nil, store}
end

{result, %__MODULE__{store | flush_index: expected_next_index}}
end

@doc """
Flushes the store until the first gap in sequence numbers of records
"""
@spec flush_ordered(t()) :: {[Record.t() | nil], t()}
@spec flush_ordered(t()) :: {[ExRTP.Packet.t() | nil], t()}
def flush_ordered(store) do
flush_while(store, fn %__MODULE__{flush_index: flush_index}, %Record{index: index} ->
flush_while(store, fn %__MODULE__{flush_index: flush_index}, %Entry{index: index} ->
index == flush_index + 1
end)
end

@doc """
Flushes the store as long as it contains a packet with the timestamp older than provided duration
"""
@spec flush_older_than(t(), non_neg_integer()) :: {[Record.t() | nil], t()}
@spec flush_older_than(t(), non_neg_integer()) :: {[ExRTP.Packet.t() | nil], t()}
def flush_older_than(store, max_age_ms) do
max_age_timestamp = System.monotonic_time(:millisecond) - max_age_ms

flush_while(store, fn _store, %Record{timestamp_ms: timestamp} ->
flush_while(store, fn _store, %Entry{timestamp_ms: timestamp} ->
timestamp <= max_age_timestamp
end)
end

@doc """
Returns all packets that are stored in the `PacketStore`.
"""
@spec dump(t()) :: [Record.t() | nil]
@spec dump(t()) :: [ExRTP.Packet.t() | nil]
def dump(%__MODULE__{} = store) do
{records, _store} = flush_while(store, fn _store, _record -> true end)
records
{packets, _store} = flush_while(store, fn _store, _entry -> true end)
packets
end

@doc """
Returns timestamp (time of insertion) of the packet with the lowest index
"""
@spec first_record_timestamp(t()) :: integer() | nil
def first_record_timestamp(%__MODULE__{heap: heap}) do
@spec first_entry_timestamp(t()) :: integer() | nil
def first_entry_timestamp(%__MODULE__{heap: heap}) do
case Heap.root(heap) do
%Record{timestamp_ms: time} -> time
%Entry{timestamp_ms: time} -> time
nil -> nil
end
end
Expand All @@ -212,7 +178,40 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
|> then(fn {result, _value} -> result end)
end

defp fresh_packet?(flush_index, index), do: index > flush_index
@doc false
@spec flush_one(t()) :: {Entry.t() | nil, t()}
# Flushes the store to the packet with the next sequence number.
#
# If this packet is present, it will be returned.
# Otherwise it will be treated as late and rejected on attempt to insert into the store.
#
# Should be called directly only when testing this module
def flush_one(store)

def flush_one(%__MODULE__{flush_index: nil} = store) do
{nil, store}
end

def flush_one(%__MODULE__{flush_index: flush_index, heap: heap, set: set} = store) do
record = Heap.root(heap)

expected_next_index = flush_index + 1

{result, store} =
if record != nil and record.index == expected_next_index do
updated_heap = Heap.pop(heap)
updated_set = MapSet.delete(set, record.index)

updated_store = %__MODULE__{store | heap: updated_heap, set: updated_set}

{record, updated_store}
else
# TODO: instead of nil use expected_next_index to notify owner about missing packet
{nil, store}
end

{result, %__MODULE__{store | flush_index: expected_next_index}}
end

defp flush_while(%__MODULE__{heap: heap} = store, fun, acc \\ []) do
heap
Expand All @@ -221,23 +220,24 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
nil ->
{Enum.reverse(acc), store}

record ->
if fun.(store, record) do
{record, store} = flush_one(store)
flush_while(store, fun, [record | acc])
entry ->
if fun.(store, entry) do
{entry, store} = flush_one(store)
packet = get_packet(entry)
flush_while(store, fun, [packet | acc])
else
{Enum.reverse(acc), store}
end
end
end

defp add_record(%__MODULE__{heap: heap, set: set} = store, %Record{} = record, record_rollover) do
if set |> MapSet.member?(record.index) do
defp add_entry(%__MODULE__{heap: heap, set: set} = store, %Entry{} = entry, entry_rollover) do
if set |> MapSet.member?(entry.index) do
store
else
%__MODULE__{store | heap: Heap.push(heap, record), set: MapSet.put(set, record.index)}
|> update_highest_incoming_index(record.index)
|> update_roc(record_rollover)
%__MODULE__{store | heap: Heap.push(heap, entry), set: MapSet.put(set, entry.index)}
|> update_highest_incoming_index(entry.index)
|> update_roc(entry_rollover)
end
end

Expand All @@ -258,5 +258,8 @@ defmodule ExWebRTC.RTP.JitterBuffer.PacketStore do
defp update_roc(%{rollover_count: roc} = store, :next),
do: %__MODULE__{store | rollover_count: roc + 1}

defp update_roc(store, _record_rollover), do: store
defp update_roc(store, _entry_rollover), do: store

defp get_packet(nil), do: nil
defp get_packet(entry), do: entry.packet
end
Loading

0 comments on commit bdee2d0

Please sign in to comment.