diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 9575874c41d2..d2436f0c15de 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -33,6 +33,11 @@ name = "arrow_avro" path = "src/lib.rs" bench = false +[features] +default = ["deflate", "snappy", "zstd"] +deflate = ["flate2"] +snappy = ["snap", "crc"] + [dependencies] arrow-array = { workspace = true } arrow-buffer = { workspace = true } @@ -41,6 +46,11 @@ arrow-data = { workspace = true } arrow-schema = { workspace = true } serde_json = { version = "1.0", default-features = false, features = ["std"] } serde = { version = "1.0.188", features = ["derive"] } +flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true } +snap = { version = "1.0", default-features = false, optional = true } +zstd = { version = "0.13", default-features = false, optional = true } +crc = { version = "3.0", optional = true } + [dev-dependencies] diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs index a1a44fc22b68..c5c7a6dabc33 100644 --- a/arrow-avro/src/compression.rs +++ b/arrow-avro/src/compression.rs @@ -15,18 +15,69 @@ // specific language governing permissions and limitations // under the License. -use serde::{Deserialize, Serialize}; +use arrow_schema::ArrowError; +use flate2::read; +use std::io; +use std::io::Read; /// The metadata key used for storing the JSON encoded [`CompressionCodec`] pub const CODEC_METADATA_KEY: &str = "avro.codec"; -#[derive(Debug, Copy, Clone, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum CompressionCodec { - Null, Deflate, - BZip2, Snappy, - XZ, ZStandard, } + +impl CompressionCodec { + pub(crate) fn decompress(&self, block: &[u8]) -> Result, ArrowError> { + match self { + #[cfg(feature = "deflate")] + CompressionCodec::Deflate => { + let mut decoder = read::DeflateDecoder::new(block); + let mut out = Vec::new(); + decoder.read_to_end(&mut out)?; + Ok(out) + } + #[cfg(not(feature = "deflate"))] + CompressionCodec::Deflate => Err(ArrowError::ParseError( + "Deflate codec requires deflate feature".to_string(), + )), + #[cfg(feature = "snappy")] + CompressionCodec::Snappy => { + // Each compressed block is followed by the 4-byte, big-endian CRC32 + // checksum of the uncompressed data in the block. + let crc = &block[block.len() - 4..]; + let block = &block[..block.len() - 4]; + + let mut decoder = snap::raw::Decoder::new(); + let decoded = decoder + .decompress_vec(block) + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; + + let checksum = crc::Crc::::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded); + if checksum != u32::from_be_bytes(crc.try_into().unwrap()) { + return Err(ArrowError::ParseError("Snappy CRC mismatch".to_string())); + } + Ok(decoded) + } + #[cfg(not(feature = "snappy"))] + CompressionCodec::Snappy => Err(ArrowError::ParseError( + "Snappy codec requires snappy feature".to_string(), + )), + + #[cfg(feature = "zstd")] + CompressionCodec::ZStandard => { + let mut decoder = zstd::Decoder::new(block)?; + let mut out = Vec::new(); + decoder.read_to_end(&mut out)?; + Ok(out) + } + #[cfg(not(feature = "zstd"))] + CompressionCodec::ZStandard => Err(ArrowError::ParseError( + "ZStandard codec requires zstd feature".to_string(), + )), + } + } +} diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index 97f5d3b8b112..19d48d1f89a1 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -17,6 +17,7 @@ //! Decoder for [`Header`] +use crate::compression::{CompressionCodec, CODEC_METADATA_KEY}; use crate::reader::vlq::VLQDecoder; use crate::schema::Schema; use arrow_schema::ArrowError; @@ -55,7 +56,7 @@ impl Header { /// Returns an iterator over the meta keys in this header pub fn metadata(&self) -> impl Iterator { let mut last = 0; - self.meta_offsets.windows(2).map(move |w| { + self.meta_offsets.chunks_exact(2).map(move |w| { let start = last; last = w[1]; (&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]]) @@ -72,6 +73,22 @@ impl Header { pub fn sync(&self) -> [u8; 16] { self.sync } + + /// Returns the [`CompressionCodec`] if any + pub fn compression(&self) -> Result, ArrowError> { + let v = self.get(CODEC_METADATA_KEY); + + match v { + None | Some(b"null") => Ok(None), + Some(b"deflate") => Ok(Some(CompressionCodec::Deflate)), + Some(b"snappy") => Ok(Some(CompressionCodec::Snappy)), + Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)), + Some(v) => Err(ArrowError::ParseError(format!( + "Unrecognized compression codec \'{}\'", + String::from_utf8_lossy(v) + ))), + } + } } /// A decoder for [`Header`] @@ -305,6 +322,17 @@ mod test { ); let header = decode_file(&arrow_test_data("avro/fixed_length_decimal.avro")); + + let meta: Vec<_> = header + .metadata() + .map(|(k, _)| std::str::from_utf8(k).unwrap()) + .collect(); + + assert_eq!( + meta, + &["avro.schema", "org.apache.spark.version", "avro.codec"] + ); + let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap(); let expected = br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#; assert_eq!(schema_json, expected); diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 7769bbbc4998..0151db7f855a 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -73,6 +73,7 @@ fn read_blocks(mut reader: R) -> impl Iterator