Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

feat: better error message when reader feather v1 #1528

Merged
merged 3 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
let ptr = *buffers.add(index);
if ptr.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
"An array of type {data_type:?}

Check warning on line 212 in src/ffi/array.rs

View check run for this annotation

Codecov / codecov/patch

src/ffi/array.rs#L212

Added line #L212 was not covered by tests
must have a non-null buffer {index}"
)));
}
Expand All @@ -235,9 +235,14 @@
owner: InternalArrowArray,
index: usize,
) -> Result<Buffer<T>> {
let len = buffer_len(array, data_type, index)?;

if len == 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c++ arrow can create null pointers when arrays are empty. This ensures we don't have to deal with that.

return Ok(Buffer::new());
}

let ptr = get_buffer_ptr(array, data_type, index)?;

let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));

Expand All @@ -258,9 +263,12 @@
// we can use the null count directly
is_validity: bool,
) -> Result<Bitmap> {
let len: usize = array.length.try_into().expect("length to fit in `usize`");
if len == 0 {
return Ok(Bitmap::new());

Check warning on line 268 in src/ffi/array.rs

View check run for this annotation

Codecov / codecov/patch

src/ffi/array.rs#L268

Added line #L268 was not covered by tests
}
let ptr = get_buffer_ptr(array, data_type, index)?;

let len: usize = array.length.try_into().expect("length to fit in `usize`");
let offset: usize = array.offset.try_into().expect("offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub mod append;
pub mod read;
pub mod write;

const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const ARROW_MAGIC_V1: [u8; 4] = [b'F', b'E', b'A', b'1'];
const ARROW_MAGIC_V2: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Struct containing `dictionary_id` and nested `IpcField`, allowing users
Expand Down
9 changes: 6 additions & 3 deletions src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use crate::error::{Error, Result};
use crate::io::ipc::IpcSchema;

use super::super::{ARROW_MAGIC, CONTINUATION_MARKER};
use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use super::common::*;
use super::schema::fb_to_schema;
use super::Dictionaries;
Expand Down Expand Up @@ -151,7 +151,7 @@
reader.read_exact(&mut footer)?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V2 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
let footer_len = footer_len
Expand Down Expand Up @@ -215,7 +215,10 @@
let mut magic_buffer: [u8; 6] = [0; 6];
let start = reader.stream_position()?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
if magic_buffer != ARROW_MAGIC_V2 {
if &magic_buffer[..4] == ARROW_MAGIC_V1 {
return Err(Error::NotYetImplemented("feather v1 not supported".into()));
}

Check warning on line 221 in src/io/ipc/read/file.rs

View check run for this annotation

Codecov / codecov/patch

src/io/ipc/read/file.rs#L219-L221

Added lines #L219 - L221 were not covered by tests
return Err(Error::from(OutOfSpecKind::InvalidHeader));
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{Error, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::file::{deserialize_footer, get_record_batch};
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> Re
reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V2 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
footer_len
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::schema::serialize_schema;
use super::{default_ipc_fields, schema_to_bytes, Record};
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::io::ipc::{IpcField, ARROW_MAGIC};
use crate::io::ipc::{IpcField, ARROW_MAGIC_V2};

type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);

Expand Down Expand Up @@ -105,7 +105,7 @@ where
}

async fn start(mut writer: W, encoded: EncodedData) -> Result<WriteOutput<W>> {
writer.write_all(&ARROW_MAGIC[..]).await?;
writer.write_all(&ARROW_MAGIC_V2[..]).await?;
writer.write_all(&[0, 0]).await?;
let (meta, data) = write_message(&mut writer, encoded).await?;

Expand Down Expand Up @@ -149,7 +149,7 @@ where
writer
.write_all(&(footer.len() as i32).to_le_bytes())
.await?;
writer.write_all(&ARROW_MAGIC).await?;
writer.write_all(&ARROW_MAGIC_V2).await?;
writer.close().await?;

Ok((0, None, vec![], None))
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_format::ipc::planus::Builder;

use super::{
super::IpcField,
super::ARROW_MAGIC,
super::ARROW_MAGIC_V2,
common::{DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
default_ipc_fields, schema, schema_to_bytes,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<W: Write> FileWriter<W> {
return Err(Error::oos("The IPC file can only be started once"));
}
// write magic to header
self.writer.write_all(&ARROW_MAGIC[..])?;
self.writer.write_all(&ARROW_MAGIC_V2[..])?;
// create an 8-byte boundary after the header
self.writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<W: Write> FileWriter<W> {
self.writer.write_all(footer_data)?;
self.writer
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.write_all(&ARROW_MAGIC_V2)?;
self.writer.flush()?;
self.state = State::Finished;

Expand Down
Loading