Skip to content

Commit

Permalink
fix(replay): Limit allocations for replay decompression (#1691)
Browse files Browse the repository at this point in the history
The Replay recording processor reads a zlib compressed payload and
decompresses it in memory. This can lead to near unbounded allocations
and exhaust Relay's memory. This PR introduces an upper bound for these
payloads.
  • Loading branch information
jan-auer authored Dec 13, 2022
1 parent f884122 commit af4a544
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 95 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- Relay no longer accepts transaction events older than 5 days. Previously the event was accepted and stored, but since metrics for such old transactions are not supported it did not show up in parts of Sentry such as the Performance landing page. ([#1663](https://github.com/getsentry/relay/pull/1663))
- Apply dynamic sampling to transactions from older SDKs and even in case Relay cannot load project information. This avoids accidentally storing 100% of transactions. ([#1667](https://github.com/getsentry/relay/pull/1667))
- Replay recording parser now uses the entire body rather than a subset. ([#1682](https://github.com/getsentry/relay/pull/1682))
- Fix a potential OOM in the Replay recording parser. ([#1691](https://github.com/getsentry/relay/pull/1691))

**Internal**:

Expand Down
162 changes: 68 additions & 94 deletions relay-replays/src/recording.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,49 @@ use relay_general::processor::{
};
use relay_general::types::{Meta, ProcessingAction};

use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use flate2::Compression;
use serde::de::Error as DError;
use serde::{Deserialize, Serialize};
use serde_json::{Error, Value};

pub fn process_recording(bytes: &[u8]) -> Result<Vec<u8>, RecordingParseError> {
use flate2::{read::ZlibDecoder, write::ZlibEncoder, Compression};
use serde::{de::Error as DError, Deserialize, Serialize};
use serde_json::Value;

/// Parses compressed replay recording payloads and applies data scrubbers.
///
/// `limit` controls the maximum size in bytes during decompression. This function returns an `Err`
/// if decompressed contents exceed the limit.
pub fn process_recording(bytes: &[u8], limit: usize) -> Result<Vec<u8>, RecordingParseError> {
// Check for null byte condition.
if bytes.is_empty() {
return Err(RecordingParseError::Message("no data found.".to_string()));
return Err(RecordingParseError::Message("no data found"));
}

// Find the header value.
let header = bytes
.split(|b| b == &b'\n')
let mut split = bytes.splitn(2, |b| b == &b'\n');
let header = split
.next()
.ok_or_else(|| RecordingParseError::Message("no headers found.".to_string()))?;
.ok_or(RecordingParseError::Message("no headers found"))?;

// Find the body value.
if bytes.len() <= header.len() + 1 {
return Err(RecordingParseError::Message("no body found.".to_string()));
}
let body = match split.next() {
Some(b"") | None => return Err(RecordingParseError::Message("no body found")),
Some(body) => body,
};

// Deserialization.
let mut events = loads(&bytes[header.len() + 1..])?;

// Processing.
let mut events = deserialize_compressed(body, limit)?;
strip_pii(&mut events).map_err(RecordingParseError::ProcessingAction)?;

// Serialization.
let out_bytes = dumps(events)?;
let out_bytes = serialize_compressed(events)?;
Ok([header.into(), vec![b'\n'], out_bytes].concat())
}

fn loads(zipped_input: &[u8]) -> Result<Vec<Event>, RecordingParseError> {
let mut decoder = ZlibDecoder::new(zipped_input);
let mut buffer = String::new();
decoder.read_to_string(&mut buffer)?;
fn deserialize_compressed(
zipped_input: &[u8],
limit: usize,
) -> Result<Vec<Event>, RecordingParseError> {
let decoder = ZlibDecoder::new(zipped_input);

let events: Vec<Event> = serde_json::from_str(&buffer)?;
Ok(events)
let mut buffer = Vec::new();
decoder.take(limit as u64).read_to_end(&mut buffer)?;

Ok(serde_json::from_slice(&buffer)?)
}

fn dumps(rrweb: Vec<Event>) -> Result<Vec<u8>, RecordingParseError> {
fn serialize_compressed(rrweb: Vec<Event>) -> Result<Vec<u8>, RecordingParseError> {
let buffer = serde_json::to_vec(&rrweb)?;

let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
Expand All @@ -78,17 +76,17 @@ fn strip_pii(events: &mut Vec<Event>) -> Result<(), ProcessingAction> {

#[derive(Debug)]
pub enum RecordingParseError {
SerdeError(Error),
IoError(std::io::Error),
Message(String),
Json(serde_json::Error),
Compression(std::io::Error),
Message(&'static str),
ProcessingAction(ProcessingAction),
}

impl Display for RecordingParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecordingParseError::SerdeError(serde_error) => write!(f, "{}", serde_error),
RecordingParseError::IoError(io_error) => write!(f, "{}", io_error),
RecordingParseError::Json(serde_error) => write!(f, "{}", serde_error),
RecordingParseError::Compression(io_error) => write!(f, "{}", io_error),
RecordingParseError::Message(message) => write!(f, "{}", message),
RecordingParseError::ProcessingAction(action) => write!(f, "{}", action),
}
Expand All @@ -97,15 +95,15 @@ impl Display for RecordingParseError {

impl std::error::Error for RecordingParseError {}

impl From<Error> for RecordingParseError {
fn from(err: Error) -> Self {
RecordingParseError::SerdeError(err)
impl From<serde_json::Error> for RecordingParseError {
fn from(err: serde_json::Error) -> Self {
RecordingParseError::Json(err)
}
}

impl From<std::io::Error> for RecordingParseError {
fn from(err: std::io::Error) -> Self {
RecordingParseError::IoError(err)
RecordingParseError::Compression(err)
}
}

Expand Down Expand Up @@ -577,7 +575,7 @@ mod tests {
fn test_process_recording_end_to_end() {
// Valid compressed rrweb payload. Contains a 16 byte header followed by a new line
// character and concludes with a gzipped rrweb payload.
let payload: [u8; 241] = [
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 120,
156, 149, 144, 91, 106, 196, 32, 20, 64, 247, 114, 191, 237, 160, 241, 145, 234, 38,
102, 1, 195, 124, 152, 104, 6, 33, 169, 193, 40, 52, 4, 247, 94, 91, 103, 40, 20, 108,
Expand All @@ -593,85 +591,62 @@ mod tests {
146, 59, 13, 115, 10, 144, 115, 190, 126, 0, 2, 68, 180, 16,
];

let result = recording::process_recording(&payload);
match result {
Ok(v) => assert!(!v.is_empty()),
Err(_) => unreachable!(),
}
let result = recording::process_recording(payload, 1000);
assert!(!result.unwrap().is_empty());
}

#[test]
fn test_process_recording_no_body_data() {
// Empty bodies can not be decompressed and fail.
let payload: [u8; 17] = [
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10,
];

let result = recording::process_recording(&payload);
match result {
Ok(_) => unreachable!(),
Err(e) => match e {
recording::RecordingParseError::Message(er) => {
assert_eq!(er, "no body found.".to_string())
}
_ => unreachable!(),
},
}
let result = recording::process_recording(payload, 1000);
assert!(matches!(
result.unwrap_err(),
recording::RecordingParseError::Message("no body found"),
));
}

#[test]
fn test_process_recording_bad_body_data() {
// Invalid gzip body contents. Can not deflate.
let payload: [u8; 18] = [
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125, 10, 22,
];

let result = recording::process_recording(&payload);
match result {
Ok(_) => unreachable!(),
Err(e) => match e {
recording::RecordingParseError::IoError(er) => {
assert_eq!(er.to_string(), "corrupt deflate stream".to_string())
}
_ => unreachable!(),
},
}
let result = recording::process_recording(payload, 1000);
assert!(matches!(
result.unwrap_err(),
recording::RecordingParseError::Compression(_),
));
}

#[test]
fn test_process_recording_no_headers() {
// No header delimiter. Entire payload is consumed as headers. The empty body fails.
let payload: [u8; 16] = [
let payload: &[u8] = &[
123, 34, 115, 101, 103, 109, 101, 110, 116, 95, 105, 100, 34, 58, 51, 125,
];

let result = recording::process_recording(&payload);
match result {
Ok(_) => unreachable!(),
Err(e) => match e {
recording::RecordingParseError::Message(er) => {
assert_eq!(er, "no body found.".to_string())
}
_ => unreachable!(),
},
}
let result = recording::process_recording(payload, 1000);
assert!(matches!(
result.unwrap_err(),
recording::RecordingParseError::Message("no body found"),
));
}

#[test]
fn test_process_recording_no_contents() {
// Empty payload can not be decompressed. Header check never fails.
let payload: [u8; 0] = [];

let result = recording::process_recording(&payload);
match result {
Ok(_) => unreachable!(),
Err(e) => match e {
recording::RecordingParseError::Message(er) => {
assert_eq!(er, "no data found.".to_string())
}
_ => unreachable!(),
},
}
let payload: &[u8] = &[];

let result = recording::process_recording(payload, 1000);
assert!(matches!(
result.unwrap_err(),
recording::RecordingParseError::Message("no data found"),
));
}

// RRWeb Payload Coverage
Expand All @@ -690,7 +665,7 @@ mod tests {
if let recording::NodeVariant::T2(mut ee) = dd.node.variant {
let ff = ee.child_nodes.pop().unwrap();
if let recording::NodeVariant::Rest(gg) = ff.variant {
assert!(gg.text_content.as_str() == "[creditcard]");
assert_eq!(gg.text_content, "[creditcard]");
return;
}
}
Expand All @@ -713,8 +688,7 @@ mod tests {
if let recording::NodeVariant::T2(mut ee) = dd.node.variant {
let ff = ee.child_nodes.pop().unwrap();
if let recording::NodeVariant::Rest(gg) = ff.variant {
println!("{}", gg.text_content.as_str());
assert!(gg.text_content.as_str() == "[ip]");
assert_eq!(gg.text_content, "[ip]");
return;
}
}
Expand Down
7 changes: 6 additions & 1 deletion relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,8 +1021,13 @@ impl EnvelopeProcessorService {
// XXX: Temporarily, only the Sentry org will be allowed to parse replays while
// we measure the impact of this change.
if replays_enabled && state.project_state.organization_id == Some(1) {
// Limit expansion of recordings to the envelope size. The payload is
// decompressed temporarily and then immediately re-compressed. However, to
// limit memory pressure, we use the envelope limit as a good overall limit for
// allocations.
let limit = self.config.max_envelope_size();
let parsed_recording =
relay_replays::recording::process_recording(&item.payload());
relay_replays::recording::process_recording(&item.payload(), limit);

match parsed_recording {
Ok(recording) => {
Expand Down

0 comments on commit af4a544

Please sign in to comment.