diff --git a/CHANGELOG.md b/CHANGELOG.md index cebc29b..cb4c165 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,11 @@ ## `v0.3.0` (unreleased) -* [#30](https://github.com/scottlamb/retina/issues/30): experimental UDP - support. +* BREAKING CHANGE: [#30](https://github.com/scottlamb/retina/issues/30): + experimental UDP support. Several `RtspMessageContext` fields have been + replaced with `PacketContext`. +* BREAKING CHANGE: remove `retina::client::SessionOptions::ignore_spurious_data`. This + was an attempted workaround for old live555 servers + ([#17](https://github.com/scottlamb/retina/issues/17)) that was ineffective. * [#22](https://github.com/scottlamb/retina/issues/22): fix handling of 44.1 kHz AAC audio. diff --git a/examples/client/mp4.rs b/examples/client/mp4.rs index 86b2790..6f844cc 100644 --- a/examples/client/mp4.rs +++ b/examples/client/mp4.rs @@ -54,11 +54,6 @@ pub struct Opts { #[structopt(long)] allow_loss: bool, - /// Works around an old live555 server bug which sends data packets meant - /// for a closed RTP connection to one opened afterward. - #[structopt(long)] - ignore_spurious_data: bool, - /// Duration after which to exit automatically, in seconds. #[structopt(long, name = "secs")] duration: Option, @@ -584,8 +579,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> { retina::client::SessionOptions::default() .creds(creds) .user_agent("Retina mp4 example".to_owned()) - .transport(opts.transport) - .ignore_spurious_data(opts.ignore_spurious_data), + .transport(opts.transport), ) .await?; let video_stream = if !opts.no_video { diff --git a/src/client/mod.rs b/src/client/mod.rs index 2bbb2e8..18a4e64 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -95,7 +95,6 @@ impl std::str::FromStr for InitialTimestampPolicy { pub struct SessionOptions { creds: Option, user_agent: Option>, - ignore_spurious_data: bool, transport: Transport, } @@ -145,34 +144,6 @@ impl std::str::FromStr for Transport { } impl SessionOptions { - /// Ignores RTSP interleaved data packets for channels that aren't assigned, - /// aren't in PLAY state, or already have a different SSRC in use. - /// - /// This still assumes that for assigned channels, the packet's protocol - /// (RTP or RTCP) matches the assignment. All known RTSP implementations - /// only use RTP on even channels and RTCP on odd channels, so this seems - /// reasonably safe. - /// - /// ``ignore_spurious_data` is necessary with some Reolink cameras for at - /// least two reasons: - /// * Reolink RLC-410 (IPC_3816M) firmware version v2.0.0.1441_19032101: - /// the camera sent interleaved data that apparently belonged to a - /// previous RTSP session. This happened immediately on connection - /// establishment and continued for some time after the following PLAY - /// request. - /// * Reolink RLC-822A (IPC_523128M8MP) firmware v3.0.0.177_21012101): - /// the camera sent RTCP SR packets immediately *before* its PLAY - /// response rather than afterward. It's easiest to treat them similarly - /// to the above case and discard them. (An alternative workaround would - /// be to buffer them until after Retina has examined the server's - /// PLAY response.) - /// - /// Currently each packet is logged at debug priority. This may change. - pub fn ignore_spurious_data(mut self, ignore_spurious_data: bool) -> Self { - self.ignore_spurious_data = ignore_spurious_data; - self - } - /// Use the given credentials when/if the server requests digest authentication. pub fn creds(mut self, creds: Option) -> Self { self.creds = creds; @@ -395,6 +366,13 @@ enum ResponseMode { /// Anything but the response to this request is an error. Normal, + /// Silently discard data messages on assigned channels. + /// This is a workaround for recent Reolink cameras which appear to send + /// RTCP sender reports immediately *before* the `PLAY` response when + /// using interleaved data. It's simplest to discard them rather than + /// attempt to interpret them before having `RTP-Info`. + Play, + /// Silently discard data messages and responses to the given keepalive /// while awaiting the response to this request. Teardown { keepalive_cseq: Option }, @@ -497,11 +475,12 @@ impl RtspConnection { ), }) })?; - match msg.msg { + let msg_ctx = msg.ctx; + let description = match msg.msg { rtsp_types::Message::Response(r) => { if let Some(response_cseq) = parse::get_cseq(&r) { if response_cseq == cseq { - break (r, msg.ctx); + break (r, msg_ctx); } if let ResponseMode::Teardown { keepalive_cseq: Some(k), @@ -512,33 +491,45 @@ impl RtspConnection { continue; } } + format!("{} response with CSeq {}", r.reason_phrase(), response_cseq) + } else { + format!("{} response with no/unparseable cseq", r.reason_phrase()) } } - rtsp_types::Message::Data(_) - if matches!(mode, ResponseMode::Teardown { .. }) => - { - debug!("ignoring RTSP data during TEARDOWN"); - continue; - } - rtsp_types::Message::Data(d) if options.ignore_spurious_data => { - debug!( - "ignoring interleaved data message on channel {} while waiting \ - for response to {} CSeq {}", - d.channel_id(), - method, - cseq - ); - continue; + rtsp_types::Message::Data(d) => { + if matches!(mode, ResponseMode::Teardown { .. }) { + debug!("ignoring RTSP data during TEARDOWN"); + continue; + } else if let (ResponseMode::Play, Some(m)) = + (&mode, self.channels.lookup(d.channel_id())) + { + if m.channel_type == ChannelType::Rtcp { + debug!( + "ignoring interleaved data message on RTCP channel {} while \ + waiting for response to {} CSeq {}", + d.channel_id(), + method, + cseq + ); + continue; + } + } + format!( + "{}-byte interleaved data message on channel {}", + d.len(), + d.channel_id() + ) } - o => bail!(ErrorInt::RtspFramingError { - conn_ctx: *self.inner.ctx(), - msg_ctx: msg.ctx, - description: format!( - "Expected response to {} CSeq {}, got {:?}", - method, cseq, o, - ), - }), + rtsp_types::Message::Request(r) => format!("{:?} request", r.method()), }; + bail!(ErrorInt::RtspFramingError { + conn_ctx: *self.inner.ctx(), + msg_ctx, + description: format!( + "Expected response to {} CSeq {}, got {}", + method, cseq, description, + ), + }); }; if resp.status() == rtsp_types::StatusCode::Unauthorized { if requested_auth.is_some() { @@ -921,7 +912,7 @@ impl Session { let (msg_ctx, cseq, response) = inner .conn .send( - ResponseMode::Normal, + ResponseMode::Play, &inner.options, inner.requested_auth, &mut rtsp_types::Request::builder(Method::Play, rtsp_types::Version::V1_0) @@ -1248,13 +1239,6 @@ impl Session { }); let m = match inner.conn.channels.lookup(channel_id) { Some(m) => m, - None if inner.options.ignore_spurious_data => { - log::debug!( - "Ignoring interleaved data on unassigned channel id {}", - channel_id - ); - return Ok(None); - } None => bail!(ErrorInt::RtspUnassignedChannelError { conn_ctx: *inner.conn.inner.ctx(), msg_ctx: *msg_ctx, @@ -1281,21 +1265,17 @@ impl Session { m.stream_i, data.into_body(), )?), - ChannelType::Rtcp => match rtp_handler.rtcp( - &inner.options, - &pkt_ctx, - &mut timeline, - m.stream_i, - data.into_body(), - ) { - Ok(p) => Ok(p), - Err(description) => Err(wrap!(ErrorInt::PacketError { - conn_ctx: *inner.conn.inner.ctx(), - pkt_ctx: pkt_ctx, - stream_id: m.stream_i, - description, - })), - }, + ChannelType::Rtcp => { + match rtp_handler.rtcp(&pkt_ctx, &mut timeline, m.stream_i, data.into_body()) { + Ok(p) => Ok(p), + Err(description) => Err(wrap!(ErrorInt::PacketError { + conn_ctx: *inner.conn.inner.ctx(), + pkt_ctx: pkt_ctx, + stream_id: m.stream_i, + description, + })), + } + } } } @@ -1329,7 +1309,7 @@ impl Session { match r { Ok(()) => { let msg = Bytes::copy_from_slice(buf.filled()); - match rtp_handler.rtcp(&inner.options, &pkt_ctx, &mut timeline, i, msg) { + match rtp_handler.rtcp(&pkt_ctx, &mut timeline, i, msg) { Ok(Some(p)) => return Poll::Ready(Some(Ok(p))), Ok(None) => buf.clear(), Err(description) => { @@ -1617,7 +1597,7 @@ mod tests { .unwrap(); } - /// Test the happy path of session initialization. + /// Tests the happy path of session initialization. #[tokio::test] async fn simple() { let (conn, mut server) = connect_to_mock().await; @@ -1694,29 +1674,28 @@ mod tests { ); } - /// Tests the `ignore_spurious_data` feature: - /// * ignore a data packet while waiting for a RTSP response early on. - /// * ignore a data packet with the wrong ssrc after play. + /// Tests ignoring a bogus RTCP message while waiting for PLAY response. #[tokio::test] - async fn ignore_spurious_data() { + async fn ignore_early_rtcp() { let (conn, mut server) = connect_to_mock().await; let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap(); let bogus_pkt = rtsp_types::Message::Data(rtsp_types::Data::new( - 0, - Bytes::from_static(b"\x80\x60\xaa\xaa\x00\x00\x00\x00\xbb\xbb\xbb\xbbbogus pkt"), + 1, // RTCP channel + Bytes::from_static(b"bogus pkt"), // the real packet parses but this is fine. )); // DESCRIBE. - let options = SessionOptions::default().ignore_spurious_data(true); - let (session, _) = tokio::join!(Session::describe_with_conn(conn, options, url), async { - server.send(bogus_pkt.clone()).await.unwrap(); - req_response( - &mut server, - rtsp_types::Method::Describe, - response(include_bytes!("testdata/reolink_describe.txt")), - ) - .await; - },); + let (session, _) = tokio::join!( + Session::describe_with_conn(conn, SessionOptions::default(), url), + async { + req_response( + &mut server, + rtsp_types::Method::Describe, + response(include_bytes!("testdata/reolink_describe.txt")), + ) + .await; + }, + ); let mut session = session.unwrap(); assert_eq!(session.streams().len(), 2); @@ -1733,51 +1712,16 @@ mod tests { ); // PLAY. - let (session, _) = tokio::join!( - session.play(PlayOptions::default()), + let (session, _) = tokio::join!(session.play(PlayOptions::default()), async move { + server.send(bogus_pkt).await.unwrap(); req_response( &mut server, rtsp_types::Method::Play, - response(include_bytes!("testdata/reolink_play.txt")) - ), - ); - let session = session.unwrap(); - tokio::pin!(session); - - // Packet. - tokio::join!( - async { - match session.next().await { - Some(Ok(PacketItem::RtpPacket(p))) => { - assert_eq!(p.ssrc, 0xdcc4a0d8); - assert_eq!(p.sequence_number, 0x41d4); - assert_eq!(&p.payload[..], b"hello world"); - } - o => panic!("unexpected item: {:#?}", o), - } - }, - async { - server.send(bogus_pkt).await.unwrap(); - let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world"; - server - .send(rtsp_types::Message::Data(rtsp_types::Data::new( - 0, - Bytes::from_static(pkt), - ))) - .await - .unwrap(); - }, - ); - - // End of stream. - tokio::join!( - async { - assert!(session.next().await.is_none()); - }, - async { - server.close().await.unwrap(); - }, - ); + response(include_bytes!("testdata/reolink_play.txt")), + ) + .await + },); + let _session = session.unwrap(); } // See with: cargo test -- --nocapture client::tests::print_sizes diff --git a/src/client/rtp.rs b/src/client/rtp.rs index c1f2007..d3464da 100644 --- a/src/client/rtp.rs +++ b/src/client/rtp.rs @@ -125,29 +125,17 @@ impl InorderParser { let ssrc = reader.ssrc(); let loss = sequence_number.wrapping_sub(self.next_seq.unwrap_or(sequence_number)); if matches!(self.ssrc, Some(s) if s != ssrc) { - if session_options.ignore_spurious_data { - log::debug!( - "Ignoring spurious RTP data with ssrc={:08x} seq={:04x} while expecting \ - ssrc={:08x?} seq={:04x?}", - ssrc, - sequence_number, - self.ssrc, - self.next_seq - ); - return Ok(None); - } else { - bail!(ErrorInt::RtpPacketError { - conn_ctx: *conn_ctx, - pkt_ctx: *pkt_ctx, - stream_id, - ssrc, - sequence_number, - description: format!( - "Wrong ssrc; expecting ssrc={:08x?} seq={:04x?}", - self.ssrc, self.next_seq - ), - }); - } + bail!(ErrorInt::RtpPacketError { + conn_ctx: *conn_ctx, + pkt_ctx: *pkt_ctx, + stream_id, + ssrc, + sequence_number, + description: format!( + "Wrong ssrc; expecting ssrc={:08x?} seq={:04x?}", + self.ssrc, self.next_seq + ), + }); } if loss > 0x80_00 { if matches!(session_options.transport, super::Transport::Tcp) { @@ -211,7 +199,6 @@ impl InorderParser { pub fn rtcp( &mut self, - session_options: &super::SessionOptions, pkt_ctx: &crate::PacketContext, timeline: &mut super::Timeline, stream_id: usize, @@ -238,20 +225,10 @@ impl InorderParser { let ssrc = pkt.ssrc(); if matches!(self.ssrc, Some(s) if s != ssrc) { - if session_options.ignore_spurious_data { - log::debug!( - "Ignoring spurious RTCP data with ssrc={:08x} while \ - expecting ssrc={:08x?}", - ssrc, - self.ssrc - ); - return Ok(None); - } else { - return Err(format!( - "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", - self.ssrc, ssrc - )); - } + return Err(format!( + "Expected ssrc={:08x?}, got RTCP SR ssrc={:08x}", + self.ssrc, ssrc + )); } self.ssrc = Some(ssrc);