Skip to content

Commit

Permalink
arrow-ipc: Always set dict ID in IPC from dictionary tracker
Browse files Browse the repository at this point in the history
This decouples dictionary IDs that end up in IPC from the schema further
because the dictionary tracker always first gathers the dict ID for each
field whether it is pre-defined and preserved or not.

Then when actually writing the IPC bytes the dictionary ID is always
taken from the dictionary tracker as opposed to falling back to the
`Field` of the `Schema`.
  • Loading branch information
brancz committed Sep 24, 2024
1 parent 4edafeb commit e6a93fb
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 39 deletions.
4 changes: 3 additions & 1 deletion arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ pub struct IpcMessage(pub Bytes);

fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
let data_gen = writer::IpcDataGenerator::default();
data_gen.schema_to_bytes(arrow_schema, options)
let mut dict_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
data_gen.schema_to_bytes(arrow_schema, &mut dict_tracker, options)
}

fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
Expand Down
121 changes: 97 additions & 24 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,63 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::writer::DictionaryTracker;
use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
use DataType::*;

pub struct IpcSchemaConverter<'a> {
dictionary_tracker: &'a mut DictionaryTracker,
}

impl IpcSchemaConverter<'_> {
pub fn new(dictionary_tracker: &mut DictionaryTracker) -> IpcSchemaConverter<'_> {
IpcSchemaConverter { dictionary_tracker }
}

/// Serialize a schema in IPC format
pub fn schema_to_fb<'a>(&mut self, schema: &'a Schema) -> FlatBufferBuilder<'a> {
let mut fbb = FlatBufferBuilder::new();

let root = self.schema_to_fb_offset(&mut fbb, schema);

fbb.finish(root, None);

fbb
}

pub fn schema_to_fb_offset<'a>(
&mut self,
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
) -> WIPOffset<crate::Schema<'a>> {
let fields = schema
.fields()
.iter()
.map(|field| build_field(fbb, &mut Some(self.dictionary_tracker), field))
.collect::<Vec<_>>();
let fb_field_list = fbb.create_vector(&fields);

let fb_metadata_list =
(!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));

let mut builder = crate::SchemaBuilder::new(fbb);
builder.add_fields(fb_field_list);
if let Some(fb_metadata_list) = fb_metadata_list {
builder.add_custom_metadata(fb_metadata_list);
}
builder.finish()
}
}

/// Serialize a schema in IPC format
pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
#[deprecated(
since = "54.0.0",
note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release."
)]
pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> {
let mut fbb = FlatBufferBuilder::new();

#[allow(deprecated)]
let root = schema_to_fb_offset(&mut fbb, schema);

fbb.finish(root, None);
Expand All @@ -60,14 +110,18 @@ pub fn metadata_to_fb<'a>(
fbb.create_vector(&custom_metadata)
}

#[deprecated(
since = "54.0.0",
note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release."
)]
pub fn schema_to_fb_offset<'a>(
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
) -> WIPOffset<crate::Schema<'a>> {
let fields = schema
.fields()
.iter()
.map(|field| build_field(fbb, field))
.map(|field| build_field(fbb, &mut None, field))
.collect::<Vec<_>>();
let fb_field_list = fbb.create_vector(&fields);

Expand Down Expand Up @@ -424,6 +478,7 @@ pub(crate) struct FBFieldType<'b> {
/// Create an IPC Field from an Arrow Field
pub(crate) fn build_field<'a>(
fbb: &mut FlatBufferBuilder<'a>,
dictionary_tracker: &mut Option<&mut DictionaryTracker>,
field: &Field,
) -> WIPOffset<crate::Field<'a>> {
// Optional custom metadata.
Expand All @@ -433,19 +488,29 @@ pub(crate) fn build_field<'a>(
};

let fb_field_name = fbb.create_string(field.name().as_str());
let field_type = get_fb_field_type(field.data_type(), fbb);
let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb);

let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() {
Some(get_fb_dictionary(
index_type,
field
.dict_id()
.expect("All Dictionary types have `dict_id`"),
field
.dict_is_ordered()
.expect("All Dictionary types have `dict_is_ordered`"),
fbb,
))
match dictionary_tracker {
Some(tracker) => Some(get_fb_dictionary(
index_type,
tracker.set_dict_id(field),
field
.dict_is_ordered()
.expect("All Dictionary types have `dict_is_ordered`"),
fbb,
)),
None => Some(get_fb_dictionary(
index_type,
field
.dict_id()
.expect("Dictionary type must have a dictionary id"),
field
.dict_is_ordered()
.expect("All Dictionary types have `dict_is_ordered`"),
fbb,
)),
}
} else {
None
};
Expand Down Expand Up @@ -473,6 +538,7 @@ pub(crate) fn build_field<'a>(
/// Get the IPC type of a data type
pub(crate) fn get_fb_field_type<'a>(
data_type: &DataType,
dictionary_tracker: &mut Option<&mut DictionaryTracker>,
fbb: &mut FlatBufferBuilder<'a>,
) -> FBFieldType<'a> {
// some IPC implementations expect an empty list for child data, instead of a null value.
Expand Down Expand Up @@ -673,7 +739,7 @@ pub(crate) fn get_fb_field_type<'a>(
}
}
List(ref list_type) => {
let child = build_field(fbb, list_type);
let child = build_field(fbb, dictionary_tracker, list_type);
FBFieldType {
type_type: crate::Type::List,
type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
Expand All @@ -682,15 +748,15 @@ pub(crate) fn get_fb_field_type<'a>(
}
ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"),
LargeList(ref list_type) => {
let child = build_field(fbb, list_type);
let child = build_field(fbb, dictionary_tracker, list_type);
FBFieldType {
type_type: crate::Type::LargeList,
type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
}
FixedSizeList(ref list_type, len) => {
let child = build_field(fbb, list_type);
let child = build_field(fbb, dictionary_tracker, list_type);
let mut builder = crate::FixedSizeListBuilder::new(fbb);
builder.add_listSize(*len);
FBFieldType {
Expand All @@ -703,7 +769,7 @@ pub(crate) fn get_fb_field_type<'a>(
// struct's fields are children
let mut children = vec![];
for field in fields {
children.push(build_field(fbb, field));
children.push(build_field(fbb, dictionary_tracker, field));
}
FBFieldType {
type_type: crate::Type::Struct_,
Expand All @@ -712,8 +778,8 @@ pub(crate) fn get_fb_field_type<'a>(
}
}
RunEndEncoded(run_ends, values) => {
let run_ends_field = build_field(fbb, run_ends);
let values_field = build_field(fbb, values);
let run_ends_field = build_field(fbb, dictionary_tracker, run_ends);
let values_field = build_field(fbb, dictionary_tracker, values);
let children = [run_ends_field, values_field];
FBFieldType {
type_type: crate::Type::RunEndEncoded,
Expand All @@ -724,7 +790,7 @@ pub(crate) fn get_fb_field_type<'a>(
}
}
Map(map_field, keys_sorted) => {
let child = build_field(fbb, map_field);
let child = build_field(fbb, dictionary_tracker, map_field);
let mut field_type = crate::MapBuilder::new(fbb);
field_type.add_keysSorted(*keys_sorted);
FBFieldType {
Expand All @@ -737,7 +803,7 @@ pub(crate) fn get_fb_field_type<'a>(
// In this library, the dictionary "type" is a logical construct. Here we
// pass through to the value type, as we've already captured the index
// type in the DictionaryEncoding metadata in the parent field
get_fb_field_type(value_type, fbb)
get_fb_field_type(value_type, dictionary_tracker, fbb)
}
Decimal128(precision, scale) => {
let mut builder = crate::DecimalBuilder::new(fbb);
Expand All @@ -764,7 +830,7 @@ pub(crate) fn get_fb_field_type<'a>(
Union(fields, mode) => {
let mut children = vec![];
for (_, field) in fields.iter() {
children.push(build_field(fbb, field));
children.push(build_field(fbb, dictionary_tracker, field));
}

let union_mode = match mode {
Expand Down Expand Up @@ -1067,7 +1133,9 @@ mod tests {
md,
);

let fb = schema_to_fb(&schema);
let mut dictionary_tracker = DictionaryTracker::new(true);
let mut converter = IpcSchemaConverter::new(&mut dictionary_tracker);
let fb = converter.schema_to_fb(&schema);

// read back fields
let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
Expand Down Expand Up @@ -1098,9 +1166,14 @@ mod tests {

// generate same message with Rust
let data_gen = crate::writer::IpcDataGenerator::default();
let mut dictionary_tracker = DictionaryTracker::new(true);
let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
let bytes = data_gen
.schema_to_bytes(&arrow_schema, &crate::writer::IpcWriteOptions::default())
.schema_to_bytes_with_dictionary_tracker(
&arrow_schema,
&mut dictionary_tracker,
&crate::writer::IpcWriteOptions::default(),
)
.ipc_message;

let ipc2 = crate::root_as_message(&bytes).unwrap();
Expand Down
71 changes: 58 additions & 13 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec};
use arrow_schema::*;

use crate::compression::CompressionCodec;
use crate::convert::IpcSchemaConverter;
use crate::CONTINUATION_MARKER;

/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
Expand Down Expand Up @@ -199,9 +200,44 @@ impl Default for IpcWriteOptions {
pub struct IpcDataGenerator {}

impl IpcDataGenerator {
pub fn schema_to_bytes_with_dictionary_tracker(
&self,
schema: &Schema,
dictionary_tracker: &mut DictionaryTracker,
write_options: &IpcWriteOptions,
) -> EncodedData {
let mut fbb = FlatBufferBuilder::new();
let schema = {
let mut converter = IpcSchemaConverter::new(dictionary_tracker);
let fb = converter.schema_to_fb_offset(&mut fbb, schema);
fb.as_union_value()
};

let mut message = crate::MessageBuilder::new(&mut fbb);
message.add_version(write_options.metadata_version);
message.add_header_type(crate::MessageHeader::Schema);
message.add_bodyLength(0);
message.add_header(schema);
// TODO: custom metadata
let data = message.finish();
fbb.finish(data, None);

let data = fbb.finished_data();
EncodedData {
ipc_message: data.to_vec(),
arrow_data: vec![],
}
}

#[deprecated(
since = "54.0.0",
note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release."
)]
pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
let mut fbb = FlatBufferBuilder::new();
let schema = {
#[allow(deprecated)]
// This will be replaced with the IpcSchemaConverter in the next release.
let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
fb.as_union_value()
};
Expand Down Expand Up @@ -714,6 +750,7 @@ fn into_zero_offset_run_array<R: RunEndIndexType>(
///
/// Can optionally error if an update to an existing dictionary is attempted, which
/// isn't allowed in the `FileWriter`.
#[derive(Debug)]
pub struct DictionaryTracker {
written: HashMap<i64, ArrayData>,
dict_ids: Vec<i64>,
Expand Down Expand Up @@ -887,9 +924,15 @@ impl<W: Write> FileWriter<W> {
writer.write_all(&super::ARROW_MAGIC)?;
writer.write_all(&PADDING[..pad_len])?;
// write the schema, set the written bytes to the schema + header
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
let preserve_dict_id = write_options.preserve_dict_id;
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
schema,
&mut dictionary_tracker,
&write_options,
);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
Expand All @@ -898,10 +941,7 @@ impl<W: Write> FileWriter<W> {
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
true,
preserve_dict_id,
),
dictionary_tracker,
custom_metadata: HashMap::new(),
data_gen,
})
Expand Down Expand Up @@ -960,7 +1000,8 @@ impl<W: Write> FileWriter<W> {
let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
let mut converter = IpcSchemaConverter::new(&mut self.dictionary_tracker);
let schema = converter.schema_to_fb_offset(&mut fbb, &self.schema);
let fb_custom_metadata = (!self.custom_metadata.is_empty())
.then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));

Expand Down Expand Up @@ -1086,18 +1127,22 @@ impl<W: Write> StreamWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let preserve_dict_id = write_options.preserve_dict_id;
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id);

// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker(
schema,
&mut dictionary_tracker,
&write_options,
);
write_message(&mut writer, encoded_message, &write_options)?;
let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
writer,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
false,
preserve_dict_id,
),
dictionary_tracker,
data_gen,
})
}
Expand Down
4 changes: 3 additions & 1 deletion parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result<Schema> {
/// Encodes the Arrow schema into the IPC format, and base64 encodes it
fn encode_arrow_schema(schema: &Schema) -> String {
let options = writer::IpcWriteOptions::default();
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id());
let data_gen = writer::IpcDataGenerator::default();
let mut serialized_schema = data_gen.schema_to_bytes(schema, &options);
let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options);

// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
Expand Down

0 comments on commit e6a93fb

Please sign in to comment.