From 065b05c51593b986cb06af1fab2e87342a0a9c6a Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sat, 28 Oct 2023 18:33:31 +1100 Subject: [PATCH 1/3] Read only enough bytes to infer Arrow IPC file schema via stream --- .../core/src/datasource/file_format/arrow.rs | 158 ++++++++++++++++-- 1 file changed, 147 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 16ae4411d1bf..0c54c158425d 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -20,7 +20,7 @@ //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) use std::any::Any; -use std::io::{Read, Seek}; +use std::borrow::Cow; use std::sync::Arc; use crate::datasource::file_format::FileFormat; @@ -29,13 +29,18 @@ use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::ExecutionPlan; +use arrow::ipc::convert::fb_to_schema; use arrow::ipc::reader::FileReader; -use arrow_schema::{Schema, SchemaRef}; +use arrow::ipc::root_as_message; +use arrow_schema::{ArrowError, Schema, SchemaRef}; +use bytes::Bytes; use datafusion_common::{FileType, Statistics}; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; +use futures::stream::BoxStream; +use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; /// Arrow `FileFormat` implementation. @@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat { let r = store.as_ref().get(&object.location).await?; let schema = match r.payload { GetResultPayload::File(mut file, _) => { - read_arrow_schema_from_reader(&mut file)? + let reader = FileReader::try_new(&mut file, None)?; + reader.schema() } - GetResultPayload::Stream(_) => { - // TODO: Fetching entire file to get schema is potentially wasteful - let data = r.bytes().await?; - let mut cursor = std::io::Cursor::new(&data); - read_arrow_schema_from_reader(&mut cursor)? + GetResultPayload::Stream(stream) => { + infer_schema_from_file_stream(stream).await? } }; schemas.push(schema.as_ref().clone()); @@ -99,7 +102,140 @@ impl FileFormat for ArrowFormat { } } -fn read_arrow_schema_from_reader(reader: R) -> Result { - let reader = FileReader::try_new(reader, None)?; - Ok(reader.schema()) +const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; +const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; + +async fn infer_schema_from_file_stream( + mut stream: BoxStream<'static, object_store::Result>, +) -> Result { + // Expected format: + // - 6 bytes + // - 2 bytes + // - 4 bytes, not present below v0.15.0 + // - 4 bytes + // + // + + // So in first read we need at least all known sized sections, + // which is 6 + 2 + 4 + 4 = 16 bytes. + let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; + if bytes.len() < 16 { + return Err(ArrowError::ParseError( + "Arrow IPC file stream shorter than expected".to_string(), + ))?; + } + + // Files should start with these magic bytes + if bytes[0..6] != ARROW_MAGIC { + return Err(ArrowError::ParseError( + "Arrow file does not contian correct header".to_string(), + ))?; + } + + // Since continuation marker bytes added in later versions + let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER { + (&bytes[12..16], 16) + } else { + (&bytes[8..12], 12) + }; + + let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]]; + let meta_len = i32::from_le_bytes(meta_len); + + // Read bytes for Schema message + let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize { + // Need to read more bytes to decode Message + let mut block_data = Vec::with_capacity(meta_len as usize); + // In case we had some spare bytes in our initial read chunk + block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]); + let size_to_read = meta_len as usize - block_data.len(); + let block_data = + collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?; + Cow::Owned(block_data) + } else { + // Already have the bytes we need + let end_index = meta_len as usize + rest_of_bytes_start_index; + let block_data = &bytes[rest_of_bytes_start_index..end_index]; + Cow::Borrowed(block_data) + }; + + // Decode Schema message + let message = root_as_message(&block_data).map_err(|err| { + ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}")) + })?; + let ipc_schema = message.header_as_schema().ok_or_else(|| { + ArrowError::IpcError("Unable to read IPC message as schema".to_string()) + })?; + let schema = fb_to_schema(ipc_schema); + + Ok(Arc::new(schema)) +} + +async fn collect_at_least_n_bytes( + stream: &mut BoxStream<'static, object_store::Result>, + n: usize, + extend_from: Option>, +) -> Result> { + let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n)); + // If extending existing buffer then ensure we read n additional bytes + let n = n + buf.len(); + while let Some(bytes) = stream.next().await.transpose()? { + buf.extend_from_slice(&bytes); + if buf.len() >= n { + break; + } + } + Ok(buf) +} + +#[cfg(test)] +mod tests { + use chrono::DateTime; + use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path}; + + use crate::execution::context::SessionContext; + + use super::*; + + #[tokio::test] + async fn test_infer_schema_stream() -> Result<()> { + let mut bytes = std::fs::read("tests/data/example.arrow")?; + bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file + let location = Path::parse("example.arrow")?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: usize::MAX, + e_tag: None, + }; + + let arrow_format = ArrowFormat {}; + let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"]; + + // Test chunk sizes where too small so we keep having to read more bytes + // And when large enough that first read contains all we need + for chunk_size in [7, 3000] { + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size)); + let inferred_schema = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + &[object_meta.clone()], + ) + .await?; + let actual_fields = inferred_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(expected, actual_fields); + } + + Ok(()) + } } From 4e0881bea6c9301ad58831c3cc7b0f144a39b241 Mon Sep 17 00:00:00 2001 From: Jefffrey <22608443+Jefffrey@users.noreply.github.com> Date: Sun, 29 Oct 2023 07:05:45 +1100 Subject: [PATCH 2/3] Error checking for collect bytes func --- .../core/src/datasource/file_format/arrow.rs | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 0c54c158425d..dd7138e4cf45 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -119,11 +119,6 @@ async fn infer_schema_from_file_stream( // So in first read we need at least all known sized sections, // which is 6 + 2 + 4 + 4 = 16 bytes. let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?; - if bytes.len() < 16 { - return Err(ArrowError::ParseError( - "Arrow IPC file stream shorter than expected".to_string(), - ))?; - } // Files should start with these magic bytes if bytes[0..6] != ARROW_MAGIC { @@ -185,6 +180,11 @@ async fn collect_at_least_n_bytes( break; } } + if buf.len() < n { + return Err(ArrowError::ParseError( + "Unexpected end of byte stream for Arrow IPC file".to_string(), + ))?; + } Ok(buf) } @@ -238,4 +238,41 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_infer_schema_short_stream() -> Result<()> { + let mut bytes = std::fs::read("tests/data/example.arrow")?; + bytes.truncate(20); // should cause error that file shorter than expected + let location = Path::parse("example.arrow")?; + let in_memory_store: Arc = Arc::new(InMemory::new()); + in_memory_store.put(&location, bytes.into()).await?; + + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let object_meta = ObjectMeta { + location, + last_modified: DateTime::default(), + size: usize::MAX, + e_tag: None, + }; + + let arrow_format = ArrowFormat {}; + + let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7)); + let err = arrow_format + .infer_schema( + &state, + &(store.clone() as Arc), + &[object_meta.clone()], + ) + .await; + + assert!(err.is_err()); + assert_eq!( + "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", + err.unwrap_err().to_string() + ); + + Ok(()) + } } From 4ac4802551f60cd2201e545647ebc2958e3fd39c Mon Sep 17 00:00:00 2001 From: Jeffrey <22608443+Jefffrey@users.noreply.github.com> Date: Thu, 2 Nov 2023 21:27:59 +1100 Subject: [PATCH 3/3] Update datafusion/core/src/datasource/file_format/arrow.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/datasource/file_format/arrow.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index dd7138e4cf45..2777805078c7 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -105,6 +105,8 @@ impl FileFormat for ArrowFormat { const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs. +/// See https://github.com/apache/arrow-rs/issues/5021 async fn infer_schema_from_file_stream( mut stream: BoxStream<'static, object_store::Result>, ) -> Result {