Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read only enough bytes to infer Arrow IPC file schema via stream #7962

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 186 additions & 11 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

use std::any::Any;
use std::io::{Read, Seek};
use std::borrow::Cow;
use std::sync::Arc;

use crate::datasource::file_format::FileFormat;
Expand All @@ -29,13 +29,18 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;

use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
use arrow::ipc::root_as_message;
use arrow_schema::{ArrowError, Schema, SchemaRef};

use bytes::Bytes;
use datafusion_common::{FileType, Statistics};
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

/// Arrow `FileFormat` implementation.
Expand All @@ -59,13 +64,11 @@ impl FileFormat for ArrowFormat {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
GetResultPayload::File(mut file, _) => {
read_arrow_schema_from_reader(&mut file)?
let reader = FileReader::try_new(&mut file, None)?;
reader.schema()
}
GetResultPayload::Stream(_) => {
// TODO: Fetching entire file to get schema is potentially wasteful
let data = r.bytes().await?;
let mut cursor = std::io::Cursor::new(&data);
read_arrow_schema_from_reader(&mut cursor)?
GetResultPayload::Stream(stream) => {
infer_schema_from_file_stream(stream).await?
}
};
schemas.push(schema.as_ref().clone());
Expand Down Expand Up @@ -99,7 +102,179 @@ impl FileFormat for ArrowFormat {
}
}

fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) -> Result<SchemaRef> {
let reader = FileReader::try_new(reader, None)?;
Ok(reader.schema())
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving this logic upstream into the arrow-rs reader.

https://github.com/apache/arrow-rs/blob/78735002d99eb0212166924948f95554c4ac2866/arrow-ipc/src/reader.rs#L560

If you agree, I can file an upstream ticket to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would rather the approach described on apache/arrow-rs#5021, reading the footer is more generally useful, providing information beyond just the schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving this logic upstream into the arrow-rs reader.

Yes, this logic was essentially ripped from StreamReader and FileReader of arrow-ipc, but adjusted to be made compatible with async stream of bytes. We could move this logic to arrow-ipc, but we need to keep in mind that though we are getting a stream of bytes, this is a stream of bytes in the IPC file format and not the IPC streaming format. So an AsyncStreamReader might not exactly fit our use, whereas an AsyncFileReader could but might be limited if we don't read its footer when attempting to decode the rest of the data.

I think I would rather the approach described on apache/arrow-rs#5021, reading the footer is more generally useful, providing information beyond just the schema

It seems this ticket could be appropriate for that. Just to note, that we can't exactly read the footer in a stream without reverting to the old method of reading the entire stream just to decode the schema.

Copy link
Contributor

@tustvold tustvold Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverting to the old method of reading the entire stream just to decode the schema.

The idea would be to do something similar to what we do to read the parquet footer, I provided a few more details on the linked ticket. The trick is to perform ranged reads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah apologies, I took a look at the parquet code and I see what you mean now. That indeed would be a better approach, in line with existing behaviour for FileReader 👍

const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See https://github.com/apache/arrow-rs/issues/5021
async fn infer_schema_from_file_stream(
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
// Expected format:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>

// So in first read we need at least all known sized sections,
// which is 6 + 2 + 4 + 4 = 16 bytes.
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;

// Files should start with these magic bytes
if bytes[0..6] != ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contian correct header".to_string(),
))?;
}

// Since continuation marker bytes added in later versions
let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
(&bytes[12..16], 16)
} else {
(&bytes[8..12], 12)
};

let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
let meta_len = i32::from_le_bytes(meta_len);

// Read bytes for Schema message
let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as usize {
// Need to read more bytes to decode Message
let mut block_data = Vec::with_capacity(meta_len as usize);
// In case we had some spare bytes in our initial read chunk
block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
let size_to_read = meta_len as usize - block_data.len();
let block_data =
collect_at_least_n_bytes(&mut stream, size_to_read, Some(block_data)).await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my previous comment, there is currently no check here that we actually did read at least n bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect_at_least_n_bytes() should now do the length checking

Cow::Owned(block_data)
} else {
// Already have the bytes we need
let end_index = meta_len as usize + rest_of_bytes_start_index;
let block_data = &bytes[rest_of_bytes_start_index..end_index];
Cow::Borrowed(block_data)
};

// Decode Schema message
let message = root_as_message(&block_data).map_err(|err| {
ArrowError::ParseError(format!("Unable to read IPC message as metadata: {err:?}"))
})?;
let ipc_schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IpcError("Unable to read IPC message as schema".to_string())
})?;
let schema = fb_to_schema(ipc_schema);

Ok(Arc::new(schema))
}

async fn collect_at_least_n_bytes(
stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
n: usize,
extend_from: Option<Vec<u8>>,
) -> Result<Vec<u8>> {
let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n));
// If extending existing buffer then ensure we read n additional bytes
let n = n + buf.len();
while let Some(bytes) = stream.next().await.transpose()? {
buf.extend_from_slice(&bytes);
if buf.len() >= n {
break;
}
}
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
))?;
}
Ok(buf)
}

#[cfg(test)]
mod tests {
use chrono::DateTime;
use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path};

use crate::execution::context::SessionContext;

use super::*;

#[tokio::test]
async fn test_infer_schema_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let session_ctx = SessionContext::new();
let state = session_ctx.state();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
e_tag: None,
};

let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
}

Ok(())
}

#[tokio::test]
async fn test_infer_schema_short_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let session_ctx = SessionContext::new();
let state = session_ctx.state();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
e_tag: None,
};

let arrow_format = ArrowFormat {};

let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
&[object_meta.clone()],
)
.await;

assert!(err.is_err());
assert_eq!(
"Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
err.unwrap_err().to_string()
);

Ok(())
}
}
Loading