Skip to content

Commit

Permalink
get rid of ignore_spurious_data
Browse files Browse the repository at this point in the history
This was an attempt at #17 that didn't work out. Besides additionally
hitting the framing error, my Reolink test camera never timed out
the old session while the new one was in progress. This behavior is
different than the vanilla 2013.04.06 live555 server, so apparently
Reolink sprinkled in their own brokenness on top. In any case,
the other solutions described in that bug are more likely to work.

There's one narrow case I want to keep working: ignoring RTCP messages
immediately prior to the PLAY response. More recent Reolink cameras
do this. I just have this behavior on all the time now.
  • Loading branch information
scottlamb committed Aug 31, 2021
1 parent 9e9366f commit 60a2530
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 184 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## `v0.3.0` (unreleased)

* [#30](https://github.com/scottlamb/retina/issues/30): experimental UDP
support.
* BREAKING CHANGE: [#30](https://github.com/scottlamb/retina/issues/30):
experimental UDP support. Several `RtspMessageContext` fields have been
replaced with `PacketContext`.
* BREAKING CHANGE: remove `retina::client::SessionOptions::ignore_spurious_data`. This
was an attempted workaround for old live555 servers
([#17](https://github.com/scottlamb/retina/issues/17)) that was ineffective.
* [#22](https://github.com/scottlamb/retina/issues/22): fix handling of
44.1 kHz AAC audio.

Expand Down
8 changes: 1 addition & 7 deletions examples/client/mp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ pub struct Opts {
#[structopt(long)]
allow_loss: bool,

/// Works around an old live555 server bug which sends data packets meant
/// for a closed RTP connection to one opened afterward.
#[structopt(long)]
ignore_spurious_data: bool,

/// Duration after which to exit automatically, in seconds.
#[structopt(long, name = "secs")]
duration: Option<u64>,
Expand Down Expand Up @@ -584,8 +579,7 @@ pub async fn run(opts: Opts) -> Result<(), Error> {
retina::client::SessionOptions::default()
.creds(creds)
.user_agent("Retina mp4 example".to_owned())
.transport(opts.transport)
.ignore_spurious_data(opts.ignore_spurious_data),
.transport(opts.transport),
)
.await?;
let video_stream = if !opts.no_video {
Expand Down
218 changes: 81 additions & 137 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl std::str::FromStr for InitialTimestampPolicy {
pub struct SessionOptions {
creds: Option<Credentials>,
user_agent: Option<Box<str>>,
ignore_spurious_data: bool,
transport: Transport,
}

Expand Down Expand Up @@ -145,34 +144,6 @@ impl std::str::FromStr for Transport {
}

impl SessionOptions {
/// Ignores RTSP interleaved data packets for channels that aren't assigned,
/// aren't in PLAY state, or already have a different SSRC in use.
///
/// This still assumes that for assigned channels, the packet's protocol
/// (RTP or RTCP) matches the assignment. All known RTSP implementations
/// only use RTP on even channels and RTCP on odd channels, so this seems
/// reasonably safe.
///
/// ``ignore_spurious_data` is necessary with some Reolink cameras for at
/// least two reasons:
/// * Reolink RLC-410 (IPC_3816M) firmware version v2.0.0.1441_19032101:
/// the camera sent interleaved data that apparently belonged to a
/// previous RTSP session. This happened immediately on connection
/// establishment and continued for some time after the following PLAY
/// request.
/// * Reolink RLC-822A (IPC_523128M8MP) firmware v3.0.0.177_21012101):
/// the camera sent RTCP SR packets immediately *before* its PLAY
/// response rather than afterward. It's easiest to treat them similarly
/// to the above case and discard them. (An alternative workaround would
/// be to buffer them until after Retina has examined the server's
/// PLAY response.)
///
/// Currently each packet is logged at debug priority. This may change.
pub fn ignore_spurious_data(mut self, ignore_spurious_data: bool) -> Self {
self.ignore_spurious_data = ignore_spurious_data;
self
}

/// Use the given credentials when/if the server requests digest authentication.
pub fn creds(mut self, creds: Option<Credentials>) -> Self {
self.creds = creds;
Expand Down Expand Up @@ -395,6 +366,13 @@ enum ResponseMode {
/// Anything but the response to this request is an error.
Normal,

/// Silently discard data messages on assigned channels.
/// This is a workaround for recent Reolink cameras which appear to send
/// RTCP sender reports immediately *before* the `PLAY` response when
/// using interleaved data. It's simplest to discard them rather than
/// attempt to interpret them before having `RTP-Info`.
Play,

/// Silently discard data messages and responses to the given keepalive
/// while awaiting the response to this request.
Teardown { keepalive_cseq: Option<u32> },
Expand Down Expand Up @@ -497,11 +475,12 @@ impl RtspConnection {
),
})
})?;
match msg.msg {
let msg_ctx = msg.ctx;
let description = match msg.msg {
rtsp_types::Message::Response(r) => {
if let Some(response_cseq) = parse::get_cseq(&r) {
if response_cseq == cseq {
break (r, msg.ctx);
break (r, msg_ctx);
}
if let ResponseMode::Teardown {
keepalive_cseq: Some(k),
Expand All @@ -512,33 +491,45 @@ impl RtspConnection {
continue;
}
}
format!("{} response with CSeq {}", r.reason_phrase(), response_cseq)
} else {
format!("{} response with no/unparseable cseq", r.reason_phrase())
}
}
rtsp_types::Message::Data(_)
if matches!(mode, ResponseMode::Teardown { .. }) =>
{
debug!("ignoring RTSP data during TEARDOWN");
continue;
}
rtsp_types::Message::Data(d) if options.ignore_spurious_data => {
debug!(
"ignoring interleaved data message on channel {} while waiting \
for response to {} CSeq {}",
d.channel_id(),
method,
cseq
);
continue;
rtsp_types::Message::Data(d) => {
if matches!(mode, ResponseMode::Teardown { .. }) {
debug!("ignoring RTSP data during TEARDOWN");
continue;
} else if let (ResponseMode::Play, Some(m)) =
(&mode, self.channels.lookup(d.channel_id()))
{
if m.channel_type == ChannelType::Rtcp {
debug!(
"ignoring interleaved data message on RTCP channel {} while \
waiting for response to {} CSeq {}",
d.channel_id(),
method,
cseq
);
continue;
}
}
format!(
"{}-byte interleaved data message on channel {}",
d.len(),
d.channel_id()
)
}
o => bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx: msg.ctx,
description: format!(
"Expected response to {} CSeq {}, got {:?}",
method, cseq, o,
),
}),
rtsp_types::Message::Request(r) => format!("{:?} request", r.method()),
};
bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
description: format!(
"Expected response to {} CSeq {}, got {}",
method, cseq, description,
),
});
};
if resp.status() == rtsp_types::StatusCode::Unauthorized {
if requested_auth.is_some() {
Expand Down Expand Up @@ -921,7 +912,7 @@ impl Session<Described> {
let (msg_ctx, cseq, response) = inner
.conn
.send(
ResponseMode::Normal,
ResponseMode::Play,
&inner.options,
inner.requested_auth,
&mut rtsp_types::Request::builder(Method::Play, rtsp_types::Version::V1_0)
Expand Down Expand Up @@ -1248,13 +1239,6 @@ impl Session<Playing> {
});
let m = match inner.conn.channels.lookup(channel_id) {
Some(m) => m,
None if inner.options.ignore_spurious_data => {
log::debug!(
"Ignoring interleaved data on unassigned channel id {}",
channel_id
);
return Ok(None);
}
None => bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *inner.conn.inner.ctx(),
msg_ctx: *msg_ctx,
Expand All @@ -1281,21 +1265,17 @@ impl Session<Playing> {
m.stream_i,
data.into_body(),
)?),
ChannelType::Rtcp => match rtp_handler.rtcp(
&inner.options,
&pkt_ctx,
&mut timeline,
m.stream_i,
data.into_body(),
) {
Ok(p) => Ok(p),
Err(description) => Err(wrap!(ErrorInt::PacketError {
conn_ctx: *inner.conn.inner.ctx(),
pkt_ctx: pkt_ctx,
stream_id: m.stream_i,
description,
})),
},
ChannelType::Rtcp => {
match rtp_handler.rtcp(&pkt_ctx, &mut timeline, m.stream_i, data.into_body()) {
Ok(p) => Ok(p),
Err(description) => Err(wrap!(ErrorInt::PacketError {
conn_ctx: *inner.conn.inner.ctx(),
pkt_ctx: pkt_ctx,
stream_id: m.stream_i,
description,
})),
}
}
}
}

Expand Down Expand Up @@ -1329,7 +1309,7 @@ impl Session<Playing> {
match r {
Ok(()) => {
let msg = Bytes::copy_from_slice(buf.filled());
match rtp_handler.rtcp(&inner.options, &pkt_ctx, &mut timeline, i, msg) {
match rtp_handler.rtcp(&pkt_ctx, &mut timeline, i, msg) {
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
Ok(None) => buf.clear(),
Err(description) => {
Expand Down Expand Up @@ -1617,7 +1597,7 @@ mod tests {
.unwrap();
}

/// Test the happy path of session initialization.
/// Tests the happy path of session initialization.
#[tokio::test]
async fn simple() {
let (conn, mut server) = connect_to_mock().await;
Expand Down Expand Up @@ -1694,29 +1674,28 @@ mod tests {
);
}

/// Tests the `ignore_spurious_data` feature:
/// * ignore a data packet while waiting for a RTSP response early on.
/// * ignore a data packet with the wrong ssrc after play.
/// Tests ignoring a bogus RTCP message while waiting for PLAY response.
#[tokio::test]
async fn ignore_spurious_data() {
async fn ignore_early_rtcp() {
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let bogus_pkt = rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(b"\x80\x60\xaa\xaa\x00\x00\x00\x00\xbb\xbb\xbb\xbbbogus pkt"),
1, // RTCP channel
Bytes::from_static(b"bogus pkt"), // the real packet parses but this is fine.
));

// DESCRIBE.
let options = SessionOptions::default().ignore_spurious_data(true);
let (session, _) = tokio::join!(Session::describe_with_conn(conn, options, url), async {
server.send(bogus_pkt.clone()).await.unwrap();
req_response(
&mut server,
rtsp_types::Method::Describe,
response(include_bytes!("testdata/reolink_describe.txt")),
)
.await;
},);
let (session, _) = tokio::join!(
Session::describe_with_conn(conn, SessionOptions::default(), url),
async {
req_response(
&mut server,
rtsp_types::Method::Describe,
response(include_bytes!("testdata/reolink_describe.txt")),
)
.await;
},
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);

Expand All @@ -1733,51 +1712,16 @@ mod tests {
);

// PLAY.
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
let (session, _) = tokio::join!(session.play(PlayOptions::default()), async move {
server.send(bogus_pkt).await.unwrap();
req_response(
&mut server,
rtsp_types::Method::Play,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let session = session.unwrap();
tokio::pin!(session);

// Packet.
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::RtpPacket(p))) => {
assert_eq!(p.ssrc, 0xdcc4a0d8);
assert_eq!(p.sequence_number, 0x41d4);
assert_eq!(&p.payload[..], b"hello world");
}
o => panic!("unexpected item: {:#?}", o),
}
},
async {
server.send(bogus_pkt).await.unwrap();
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(rtsp_types::Message::Data(rtsp_types::Data::new(
0,
Bytes::from_static(pkt),
)))
.await
.unwrap();
},
);

// End of stream.
tokio::join!(
async {
assert!(session.next().await.is_none());
},
async {
server.close().await.unwrap();
},
);
response(include_bytes!("testdata/reolink_play.txt")),
)
.await
},);
let _session = session.unwrap();
}

// See with: cargo test -- --nocapture client::tests::print_sizes
Expand Down
Loading

0 comments on commit 60a2530

Please sign in to comment.