Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix missing decoder draining: added drain_raw, drain and decoder auto-drain logic #51

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 121 additions & 34 deletions src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<'a> DecoderBuilder<'a> {
)?,
reader,
reader_stream_index,
draining: false,
})
}
}
Expand All @@ -111,6 +112,7 @@ pub struct Decoder {
decoder: DecoderSplit,
reader: Reader,
reader_stream_index: usize,
draining: bool,
}

impl Decoder {
Expand Down Expand Up @@ -192,9 +194,20 @@ impl Decoder {
#[cfg(feature = "ndarray")]
pub fn decode(&mut self) -> Result<(Time, Frame)> {
Ok(loop {
let packet = self.reader.read(self.reader_stream_index)?;
if let Some(frame) = self.decoder.decode(packet)? {
if !self.draining {
let packet_result = self.reader.read(self.reader_stream_index);
if matches!(packet_result, Err(Error::ReadExhausted)) {
self.draining = true;
continue;
}
let packet = packet_result?;
if let Some(frame) = self.decoder.decode(packet)? {
break frame;
}
} else if let Some(frame) = self.decoder.drain()? {
break frame;
} else {
return Err(Error::DecodeExhausted);
}
})
}
Expand All @@ -212,9 +225,20 @@ impl Decoder {
/// The decoded raw frame as [`RawFrame`].
pub fn decode_raw(&mut self) -> Result<RawFrame> {
Ok(loop {
let packet = self.reader.read(self.reader_stream_index)?;
if let Some(frame) = self.decoder.decode_raw(packet)? {
if !self.draining {
let packet_result = self.reader.read(self.reader_stream_index);
if matches!(packet_result, Err(Error::ReadExhausted)) {
self.draining = true;
continue;
}
let packet = packet_result?;
if let Some(frame) = self.decoder.decode_raw(packet)? {
break frame;
}
} else if let Some(frame) = self.decoder.drain_raw()? {
break frame;
} else {
return Err(Error::DecodeExhausted);
}
})
}
Expand Down Expand Up @@ -296,13 +320,17 @@ impl Decoder {
}

/// Decoder part of a split [`Decoder`] and [`Reader`].
///
/// Important note: Do not forget to drain the decoder after the reader is exhausted. It may still
/// contain frames. Run `drain_raw()` or `drain()` in a loop until no more frames are produced.
pub struct DecoderSplit {
decoder: AvDecoder,
decoder_time_base: AvRational,
hwaccel_context: Option<HardwareAccelerationContext>,
scaler: Option<AvScaler>,
size: (u32, u32),
size_out: (u32, u32),
draining: bool,
}

impl DecoderSplit {
Expand Down Expand Up @@ -382,6 +410,7 @@ impl DecoderSplit {
scaler,
size,
size_out,
draining: false,
})
}

Expand All @@ -396,22 +425,18 @@ impl DecoderSplit {
/// Feeds the packet to the decoder and returns a frame if there is one available. The caller
/// should keep feeding packets until the decoder returns a frame.
///
/// # Panics
///
/// Panics if in draining mode.
///
/// # Return value
///
/// A tuple of the [`Frame`] and timestamp (relative to the stream) and the frame itself if the
/// decoder has a frame available, [`None`] if not.
#[cfg(feature = "ndarray")]
pub fn decode(&mut self, packet: Packet) -> Result<Option<(Time, Frame)>> {
match self.decode_raw(packet)? {
Some(mut frame) => {
// We use the packet DTS here (which is `frame->pkt_dts`) because that is what the
// encoder will use when encoding for the `PTS` field.
let timestamp = Time::new(Some(frame.packet().dts), self.decoder_time_base);
let frame =
ffi::convert_frame_to_ndarray_rgb24(&mut frame).map_err(Error::BackendError)?;

Ok(Some((timestamp, frame)))
}
Some(mut frame) => Ok(Some(self.raw_frame_to_time_and_frame(&mut frame)?)),
None => Ok(None),
}
}
Expand All @@ -421,17 +446,82 @@ impl DecoderSplit {
/// Feeds the packet to the decoder and returns a frame if there is one available. The caller
/// should keep feeding packets until the decoder returns a frame.
///
/// # Panics
///
/// Panics if in draining mode.
///
/// # Return value
///
/// The decoded raw frame as [`RawFrame`] if the decoder has a frame available, [`None`] if not.
pub fn decode_raw(&mut self, packet: Packet) -> Result<Option<RawFrame>> {
assert!(!self.draining);
self.send_packet_to_decoder(packet)?;
self.receive_frame_from_decoder()
}

/// Drain one frame from the decoder.
///
/// After calling drain once the decoder is in draining mode and the caller may not use normal
/// decode anymore or it will panic.
///
/// # Return value
///
///
/// # Return value
///
/// A tuple of the [`Frame`] and timestamp (relative to the stream) and the frame itself if the
/// decoder has a frame available, [`None`] if not.
#[cfg(feature = "ndarray")]
pub fn drain(&mut self) -> Result<Option<(Time, Frame)>> {
match self.drain_raw()? {
Some(mut frame) => Ok(Some(self.raw_frame_to_time_and_frame(&mut frame)?)),
None => Ok(None),
}
}

/// Drain one frame from the decoder.
///
/// After calling drain once the decoder is in draining mode and the caller may not use normal
/// decode anymore or it will panic.
///
/// # Return value
///
/// The decoded raw frame as [`RawFrame`] if the decoder has a frame available, [`None`] if not.
pub fn drain_raw(&mut self) -> Result<Option<RawFrame>> {
if !self.draining {
self.decoder.send_eof().map_err(Error::BackendError)?;
self.draining = true;
}
self.receive_frame_from_decoder()
}

/// Get the decoders input size (resolution dimensions): width and height.
#[inline(always)]
pub fn size(&self) -> (u32, u32) {
self.size
}

/// Get the decoders output size after resizing is applied (resolution dimensions): width and
/// height.
#[inline(always)]
pub fn size_out(&self) -> (u32, u32) {
self.size_out
}

/// Send packet to decoder. Includes rescaling timestamps accordingly.
fn send_packet_to_decoder(&mut self, packet: Packet) -> Result<()> {
let (mut packet, packet_time_base) = packet.into_inner_parts();
packet.rescale_ts(packet_time_base, self.decoder_time_base);

self.decoder
.send_packet(&packet)
.map_err(Error::BackendError)?;

Ok(())
}

/// Receive packet from decoder. Will handle hwaccel conversions and scaling as well.
fn receive_frame_from_decoder(&mut self) -> Result<Option<RawFrame>> {
match self.decoder_receive_frame()? {
Some(frame) => {
let frame = match self.hwaccel_context.as_ref() {
Expand All @@ -452,17 +542,16 @@ impl DecoderSplit {
}
}

/// Get the decoders input size (resolution dimensions): width and height.
#[inline(always)]
pub fn size(&self) -> (u32, u32) {
self.size
}

/// Get the decoders output size after resizing is applied (resolution dimensions): width and
/// height.
#[inline(always)]
pub fn size_out(&self) -> (u32, u32) {
self.size_out
/// Pull a decoded frame from the decoder. This function also implements retry mechanism in case
/// the decoder signals `EAGAIN`.
fn decoder_receive_frame(&mut self) -> Result<Option<RawFrame>> {
let mut frame = RawFrame::empty();
let decode_result = self.decoder.receive_frame(&mut frame);
match decode_result {
Ok(()) => Ok(Some(frame)),
Err(AvError::Other { errno }) if errno == EAGAIN => Ok(None),
Err(err) => Err(err.into()),
}
}

/// Download frame from foreign hardware acceleration device.
Expand All @@ -484,16 +573,14 @@ impl DecoderSplit {
Ok(frame_scaled)
}

/// Pull a decoded frame from the decoder. This function also implements retry mechanism in case
/// the decoder signals `EAGAIN`.
fn decoder_receive_frame(&mut self) -> Result<Option<RawFrame>> {
let mut frame = RawFrame::empty();
let decode_result = self.decoder.receive_frame(&mut frame);
match decode_result {
Ok(()) => Ok(Some(frame)),
Err(AvError::Other { errno }) if errno == EAGAIN => Ok(None),
Err(err) => Err(err.into()),
}
#[cfg(feature = "ndarray")]
fn raw_frame_to_time_and_frame(&self, frame: &mut RawFrame) -> Result<(Time, Frame)> {
// We use the packet DTS here (which is `frame->pkt_dts`) because that is what the
// encoder will use when encoding for the `PTS` field.
let timestamp = Time::new(Some(frame.packet().dts), self.decoder_time_base);
let frame = ffi::convert_frame_to_ndarray_rgb24(frame).map_err(Error::BackendError)?;

Ok((timestamp, frame))
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ffmpeg::Error as FfmpegError;
#[derive(Debug, Clone)]
pub enum Error {
ReadExhausted,
DecodeExhausted,
WriteRetryLimitReached,
InvalidFrameFormat,
InvalidExtraData,
Expand All @@ -22,6 +23,7 @@ impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
Error::ReadExhausted => None,
Error::DecodeExhausted => None,
Error::WriteRetryLimitReached => None,
Error::InvalidFrameFormat => None,
Error::InvalidExtraData => None,
Expand All @@ -39,6 +41,7 @@ impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match *self {
Error::ReadExhausted => write!(f, "stream exhausted"),
Error::DecodeExhausted => write!(f, "stream exhausted"),
Error::WriteRetryLimitReached => {
write!(f, "cannot write to video stream, even after multiple tries")
}
Expand Down
Loading