From 4edafeb9a9920475119c533be46b41f4b32251ce Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 23 Sep 2024 17:56:24 +0200 Subject: [PATCH 1/6] arrow-ipc: Add test for non preserving dict ID behavior with same ID --- arrow-ipc/src/reader.rs | 55 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 3e07c95afb23..0820e3590827 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -2283,4 +2283,59 @@ mod tests { let err = reader.next().unwrap().unwrap_err(); assert!(matches!(err, ArrowError::InvalidArgumentError(_))); } + + #[test] + fn test_same_dict_id_without_preserve() { + let batch = RecordBatch::try_new( + Arc::new(Schema::new( + ["a", "b"] + .iter() + .map(|name| { + Field::new_dict( + name.to_string(), + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + true, + 0, + false, + ) + }) + .collect::>(), + )), + vec![ + Arc::new( + vec![Some("c"), Some("d")] + .into_iter() + .collect::>(), + ) as ArrayRef, + Arc::new( + vec![Some("e"), Some("f")] + .into_iter() + .collect::>(), + ) as ArrayRef, + ], + ) + .expect("Failed to create RecordBatch"); + + // serialize the record batch as an IPC stream + let mut buf = vec![]; + { + let mut writer = crate::writer::StreamWriter::try_new_with_options( + &mut buf, + batch.schema().as_ref(), + crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false), + ) + .expect("Failed to create StreamWriter"); + writer.write(&batch).expect("Failed to write RecordBatch"); + writer.finish().expect("Failed to finish StreamWriter"); + } + + StreamReader::try_new(std::io::Cursor::new(buf), None) + .expect("Failed to create StreamReader") + .for_each(|decoded_batch| { + assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch); + }); + } } From d21569eb3d242c44ca7c531a679ef2093cdf1cc9 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 23 Sep 2024 17:58:05 +0200 Subject: [PATCH 2/6] arrow-ipc: Always set dict ID in IPC from dictionary tracker This decouples dictionary IDs that end up in IPC from the schema further because the dictionary tracker always first gathers the dict ID for each field whether it is pre-defined and preserved or not. Then when actually writing the IPC bytes the dictionary ID is always taken from the dictionary tracker as opposed to falling back to the `Field` of the `Schema`. --- arrow-flight/src/lib.rs | 4 +- arrow-ipc/src/convert.rs | 121 +++++++++++++++++++++++++------- arrow-ipc/src/writer.rs | 71 +++++++++++++++---- parquet/src/arrow/schema/mod.rs | 5 +- 4 files changed, 162 insertions(+), 39 deletions(-) diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index ff9e387dab0b..64e3ba01c5bd 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -137,7 +137,9 @@ pub struct IpcMessage(pub Bytes); fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { let data_gen = writer::IpcDataGenerator::default(); - data_gen.schema_to_bytes(arrow_schema, options) + let mut dict_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options) } fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage { diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index d8889a361dfc..7403c9d5d090 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -27,13 +27,63 @@ use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use crate::writer::DictionaryTracker; use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER}; use DataType::*; +pub struct IpcSchemaConverter<'a> { + dictionary_tracker: &'a mut DictionaryTracker, +} + +impl IpcSchemaConverter<'_> { + pub fn new(dictionary_tracker: &mut DictionaryTracker) -> IpcSchemaConverter<'_> { + IpcSchemaConverter { dictionary_tracker } + } + + /// Serialize a schema in IPC format + pub fn schema_to_fb<'a>(&mut self, schema: &'a Schema) -> FlatBufferBuilder<'a> { + let mut fbb = FlatBufferBuilder::new(); + + let root = self.schema_to_fb_offset(&mut fbb, schema); + + fbb.finish(root, None); + + fbb + } + + pub fn schema_to_fb_offset<'a>( + &mut self, + fbb: &mut FlatBufferBuilder<'a>, + schema: &Schema, + ) -> WIPOffset> { + let fields = schema + .fields() + .iter() + .map(|field| build_field(fbb, &mut Some(self.dictionary_tracker), field)) + .collect::>(); + let fb_field_list = fbb.create_vector(&fields); + + let fb_metadata_list = + (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata())); + + let mut builder = crate::SchemaBuilder::new(fbb); + builder.add_fields(fb_field_list); + if let Some(fb_metadata_list) = fb_metadata_list { + builder.add_custom_metadata(fb_metadata_list); + } + builder.finish() + } +} + /// Serialize a schema in IPC format -pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { +#[deprecated( + since = "54.0.0", + note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release." +)] +pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> { let mut fbb = FlatBufferBuilder::new(); + #[allow(deprecated)] let root = schema_to_fb_offset(&mut fbb, schema); fbb.finish(root, None); @@ -60,6 +110,10 @@ pub fn metadata_to_fb<'a>( fbb.create_vector(&custom_metadata) } +#[deprecated( + since = "54.0.0", + note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release." +)] pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, schema: &Schema, @@ -67,7 +121,7 @@ pub fn schema_to_fb_offset<'a>( let fields = schema .fields() .iter() - .map(|field| build_field(fbb, field)) + .map(|field| build_field(fbb, &mut None, field)) .collect::>(); let fb_field_list = fbb.create_vector(&fields); @@ -424,6 +478,7 @@ pub(crate) struct FBFieldType<'b> { /// Create an IPC Field from an Arrow Field pub(crate) fn build_field<'a>( fbb: &mut FlatBufferBuilder<'a>, + dictionary_tracker: &mut Option<&mut DictionaryTracker>, field: &Field, ) -> WIPOffset> { // Optional custom metadata. @@ -433,19 +488,29 @@ pub(crate) fn build_field<'a>( }; let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = get_fb_field_type(field.data_type(), fbb); + let field_type = get_fb_field_type(field.data_type(), dictionary_tracker, fbb); let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { - Some(get_fb_dictionary( - index_type, - field - .dict_id() - .expect("All Dictionary types have `dict_id`"), - field - .dict_is_ordered() - .expect("All Dictionary types have `dict_is_ordered`"), - fbb, - )) + match dictionary_tracker { + Some(tracker) => Some(get_fb_dictionary( + index_type, + tracker.set_dict_id(field), + field + .dict_is_ordered() + .expect("All Dictionary types have `dict_is_ordered`"), + fbb, + )), + None => Some(get_fb_dictionary( + index_type, + field + .dict_id() + .expect("Dictionary type must have a dictionary id"), + field + .dict_is_ordered() + .expect("All Dictionary types have `dict_is_ordered`"), + fbb, + )), + } } else { None }; @@ -473,6 +538,7 @@ pub(crate) fn build_field<'a>( /// Get the IPC type of a data type pub(crate) fn get_fb_field_type<'a>( data_type: &DataType, + dictionary_tracker: &mut Option<&mut DictionaryTracker>, fbb: &mut FlatBufferBuilder<'a>, ) -> FBFieldType<'a> { // some IPC implementations expect an empty list for child data, instead of a null value. @@ -673,7 +739,7 @@ pub(crate) fn get_fb_field_type<'a>( } } List(ref list_type) => { - let child = build_field(fbb, list_type); + let child = build_field(fbb, dictionary_tracker, list_type); FBFieldType { type_type: crate::Type::List, type_: crate::ListBuilder::new(fbb).finish().as_union_value(), @@ -682,7 +748,7 @@ pub(crate) fn get_fb_field_type<'a>( } ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"), LargeList(ref list_type) => { - let child = build_field(fbb, list_type); + let child = build_field(fbb, dictionary_tracker, list_type); FBFieldType { type_type: crate::Type::LargeList, type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(), @@ -690,7 +756,7 @@ pub(crate) fn get_fb_field_type<'a>( } } FixedSizeList(ref list_type, len) => { - let child = build_field(fbb, list_type); + let child = build_field(fbb, dictionary_tracker, list_type); let mut builder = crate::FixedSizeListBuilder::new(fbb); builder.add_listSize(*len); FBFieldType { @@ -703,7 +769,7 @@ pub(crate) fn get_fb_field_type<'a>( // struct's fields are children let mut children = vec![]; for field in fields { - children.push(build_field(fbb, field)); + children.push(build_field(fbb, dictionary_tracker, field)); } FBFieldType { type_type: crate::Type::Struct_, @@ -712,8 +778,8 @@ pub(crate) fn get_fb_field_type<'a>( } } RunEndEncoded(run_ends, values) => { - let run_ends_field = build_field(fbb, run_ends); - let values_field = build_field(fbb, values); + let run_ends_field = build_field(fbb, dictionary_tracker, run_ends); + let values_field = build_field(fbb, dictionary_tracker, values); let children = [run_ends_field, values_field]; FBFieldType { type_type: crate::Type::RunEndEncoded, @@ -724,7 +790,7 @@ pub(crate) fn get_fb_field_type<'a>( } } Map(map_field, keys_sorted) => { - let child = build_field(fbb, map_field); + let child = build_field(fbb, dictionary_tracker, map_field); let mut field_type = crate::MapBuilder::new(fbb); field_type.add_keysSorted(*keys_sorted); FBFieldType { @@ -737,7 +803,7 @@ pub(crate) fn get_fb_field_type<'a>( // In this library, the dictionary "type" is a logical construct. Here we // pass through to the value type, as we've already captured the index // type in the DictionaryEncoding metadata in the parent field - get_fb_field_type(value_type, fbb) + get_fb_field_type(value_type, dictionary_tracker, fbb) } Decimal128(precision, scale) => { let mut builder = crate::DecimalBuilder::new(fbb); @@ -764,7 +830,7 @@ pub(crate) fn get_fb_field_type<'a>( Union(fields, mode) => { let mut children = vec![]; for (_, field) in fields.iter() { - children.push(build_field(fbb, field)); + children.push(build_field(fbb, dictionary_tracker, field)); } let union_mode = match mode { @@ -1067,7 +1133,9 @@ mod tests { md, ); - let fb = schema_to_fb(&schema); + let mut dictionary_tracker = DictionaryTracker::new(true); + let mut converter = IpcSchemaConverter::new(&mut dictionary_tracker); + let fb = converter.schema_to_fb(&schema); // read back fields let ipc = crate::root_as_schema(fb.finished_data()).unwrap(); @@ -1098,9 +1166,14 @@ mod tests { // generate same message with Rust let data_gen = crate::writer::IpcDataGenerator::default(); + let mut dictionary_tracker = DictionaryTracker::new(true); let arrow_schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]); let bytes = data_gen - .schema_to_bytes(&arrow_schema, &crate::writer::IpcWriteOptions::default()) + .schema_to_bytes_with_dictionary_tracker( + &arrow_schema, + &mut dictionary_tracker, + &crate::writer::IpcWriteOptions::default(), + ) .ipc_message; let ipc2 = crate::root_as_message(&bytes).unwrap(); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 6ef70cdeaa2c..270a33956ad2 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -37,6 +37,7 @@ use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec}; use arrow_schema::*; use crate::compression::CompressionCodec; +use crate::convert::IpcSchemaConverter; use crate::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] @@ -199,9 +200,44 @@ impl Default for IpcWriteOptions { pub struct IpcDataGenerator {} impl IpcDataGenerator { + pub fn schema_to_bytes_with_dictionary_tracker( + &self, + schema: &Schema, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + let schema = { + let mut converter = IpcSchemaConverter::new(dictionary_tracker); + let fb = converter.schema_to_fb_offset(&mut fbb, schema); + fb.as_union_value() + }; + + let mut message = crate::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(crate::MessageHeader::Schema); + message.add_bodyLength(0); + message.add_header(schema); + // TODO: custom metadata + let data = message.finish(); + fbb.finish(data, None); + + let data = fbb.finished_data(); + EncodedData { + ipc_message: data.to_vec(), + arrow_data: vec![], + } + } + + #[deprecated( + since = "54.0.0", + note = "Use `schema_to_bytes_with_dictionary_tracker` instead. This function signature of `schema_to_bytes_with_dictionary_tracker` in the next release." + )] pub fn schema_to_bytes(&self, schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let schema = { + #[allow(deprecated)] + // This will be replaced with the IpcSchemaConverter in the next release. let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema); fb.as_union_value() }; @@ -714,6 +750,7 @@ fn into_zero_offset_run_array( /// /// Can optionally error if an update to an existing dictionary is attempted, which /// isn't allowed in the `FileWriter`. +#[derive(Debug)] pub struct DictionaryTracker { written: HashMap, dict_ids: Vec, @@ -887,9 +924,15 @@ impl FileWriter { writer.write_all(&super::ARROW_MAGIC)?; writer.write_all(&PADDING[..pad_len])?; // write the schema, set the written bytes to the schema + header - let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; let preserve_dict_id = write_options.preserve_dict_id; + let mut dictionary_tracker = + DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); + let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( + schema, + &mut dictionary_tracker, + &write_options, + ); + let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -898,10 +941,7 @@ impl FileWriter { dictionary_blocks: vec![], record_blocks: vec![], finished: false, - dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id( - true, - preserve_dict_id, - ), + dictionary_tracker, custom_metadata: HashMap::new(), data_gen, }) @@ -960,7 +1000,8 @@ impl FileWriter { let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); let record_batches = fbb.create_vector(&self.record_blocks); - let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema); + let mut converter = IpcSchemaConverter::new(&mut self.dictionary_tracker); + let schema = converter.schema_to_fb_offset(&mut fbb, &self.schema); let fb_custom_metadata = (!self.custom_metadata.is_empty()) .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata)); @@ -1086,18 +1127,22 @@ impl StreamWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); + let preserve_dict_id = write_options.preserve_dict_id; + let mut dictionary_tracker = + DictionaryTracker::new_with_preserve_dict_id(false, preserve_dict_id); + // write the schema, set the written bytes to the schema - let encoded_message = data_gen.schema_to_bytes(schema, &write_options); + let encoded_message = data_gen.schema_to_bytes_with_dictionary_tracker( + schema, + &mut dictionary_tracker, + &write_options, + ); write_message(&mut writer, encoded_message, &write_options)?; - let preserve_dict_id = write_options.preserve_dict_id; Ok(Self { writer, write_options, finished: false, - dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id( - false, - preserve_dict_id, - ), + dictionary_tracker, data_gen, }) } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index fab8966952b2..3ed3bd24e0a8 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -178,8 +178,11 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Result { /// Encodes the Arrow schema into the IPC format, and base64 encodes it fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); + let mut dictionary_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(true, options.preserve_dict_id()); let data_gen = writer::IpcDataGenerator::default(); - let mut serialized_schema = data_gen.schema_to_bytes(schema, &options); + let mut serialized_schema = + data_gen.schema_to_bytes_with_dictionary_tracker(schema, &mut dictionary_tracker, &options); // manually prepending the length to the schema as arrow uses the legacy IPC format // TODO: change after addressing ARROW-9777 From 200d1f5ace7901c57f393ad5d1a2e16114924d49 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 23 Sep 2024 18:01:44 +0200 Subject: [PATCH 3/6] arrow-ipc: Read dictionary IDs from dictionary tracker in correct order When dictionary IDs are not preserved, then they are assigned depth first, however, when reading them from the dictionary tracker to write the IPC bytes, they were previously read from the dictionary tracker in the order that the schema is traversed (first come first serve), which caused an incorrect order of dictionaries serialized in IPC. --- arrow-ipc/src/writer.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 270a33956ad2..135d3e9028d4 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -395,13 +395,6 @@ impl IpcDataGenerator { ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { - let dict_id = dict_id_seq - .next() - .or_else(|| field.dict_id()) - .ok_or_else(|| { - ArrowError::IpcError(format!("no dict id for field {}", field.name())) - })?; - let dict_data = column.to_data(); let dict_values = &dict_data.child_data()[0]; @@ -415,6 +408,16 @@ impl IpcDataGenerator { dict_id_seq, )?; + // It's importnat to only take the dict_id at this point, because the dict ID + // sequence is assigned depth-first, so we need to first encode children and have + // them take their assigned dict IDs before we take the dict ID for this field. + let dict_id = dict_id_seq + .next() + .or_else(|| field.dict_id()) + .ok_or_else(|| { + ArrowError::IpcError(format!("no dict id for field {}", field.name())) + })?; + let emit = dictionary_tracker.insert(dict_id, column)?; if emit { From 9f2380c852f5329afefd56303d0332f45f7b5ec0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Sep 2024 16:05:04 -0400 Subject: [PATCH 4/6] Refine IpcSchemaEncoder API and docs --- arrow-ipc/src/convert.rs | 75 ++++++++++++++++++++++++++++++++-------- arrow-ipc/src/writer.rs | 12 ++++--- 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 7403c9d5d090..33674e4acb82 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -31,17 +31,62 @@ use crate::writer::DictionaryTracker; use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER}; use DataType::*; -pub struct IpcSchemaConverter<'a> { - dictionary_tracker: &'a mut DictionaryTracker, +/// Low level Arrow [Schema] to IPC bytes converter +/// +/// See also [`fb_to_schema`] for the reverse operation +/// +/// # Example +/// ``` +/// # use arrow_ipc::convert::{fb_to_schema, IpcSchemaEncoder}; +/// # use arrow_ipc::root_as_schema; +/// # use arrow_ipc::writer::DictionaryTracker; +/// # use arrow_schema::{DataType, Field, Schema}; +/// // given an arrow schema to serialize +/// let schema = Schema::new(vec![ +/// Field::new("a", DataType::Int32, false), +/// ]); +/// +/// // Use a dictionary tracker to track dictionary id if needed +/// let mut dictionary_tracker = DictionaryTracker::new(true); +/// // create a FlatBuffersBuilder that contains the encoded bytes +/// let fb = IpcSchemaEncoder::new() +/// .with_dictionary_tracker(&mut dictionary_tracker) +/// .schema_to_fb(&schema); +/// +/// // the bytes are in `fb.finished_data()` +/// let ipc_bytes = fb.finished_data(); +/// +/// // convert the IPC bytes back to an Arrow schema +/// let ipc_schema = root_as_schema(ipc_bytes).unwrap(); +/// let schema2 = fb_to_schema(ipc_schema); +/// assert_eq!(schema, schema2); +/// ``` +#[derive(Debug)] +pub struct IpcSchemaEncoder<'a> { + dictionary_tracker: Option<&'a mut DictionaryTracker>, } -impl IpcSchemaConverter<'_> { - pub fn new(dictionary_tracker: &mut DictionaryTracker) -> IpcSchemaConverter<'_> { - IpcSchemaConverter { dictionary_tracker } +impl<'a> IpcSchemaEncoder<'a> { + /// Create a new schema encoder + pub fn new() -> IpcSchemaEncoder<'a> { + IpcSchemaEncoder { + dictionary_tracker: None, + } + } + + /// Specify a dictionary tracker to use + pub fn with_dictionary_tracker( + mut self, + dictionary_tracker: &'a mut DictionaryTracker, + ) -> Self { + self.dictionary_tracker = Some(dictionary_tracker); + self } - /// Serialize a schema in IPC format - pub fn schema_to_fb<'a>(&mut self, schema: &'a Schema) -> FlatBufferBuilder<'a> { + /// Serialize a schema in IPC format, returning a completed [`FlatBufferBuilder`] + /// + /// Note: Call [`FlatBufferBuilder::finished_data`] to get the serialized bytes + pub fn schema_to_fb<'b>(&mut self, schema: &Schema) -> FlatBufferBuilder<'b> { let mut fbb = FlatBufferBuilder::new(); let root = self.schema_to_fb_offset(&mut fbb, schema); @@ -51,15 +96,16 @@ impl IpcSchemaConverter<'_> { fbb } - pub fn schema_to_fb_offset<'a>( + /// Serialize a schema to an in progress [`FlatBufferBuilder`], returning the in progress offset. + pub fn schema_to_fb_offset<'b>( &mut self, - fbb: &mut FlatBufferBuilder<'a>, + fbb: &mut FlatBufferBuilder<'b>, schema: &Schema, - ) -> WIPOffset> { + ) -> WIPOffset> { let fields = schema .fields() .iter() - .map(|field| build_field(fbb, &mut Some(self.dictionary_tracker), field)) + .map(|field| build_field(fbb, &mut self.dictionary_tracker, field)) .collect::>(); let fb_field_list = fbb.create_vector(&fields); @@ -168,7 +214,7 @@ impl<'a> From> for Field { } } -/// Deserialize a Schema table from flat buffer format to Schema data type +/// Deserialize an ipc [1crate::Schema`] from flat buffers to an arrow [Schema]. pub fn fb_to_schema(fb: crate::Schema) -> Schema { let mut fields: Vec = vec![]; let c_fields = fb.fields().unwrap(); @@ -1134,8 +1180,9 @@ mod tests { ); let mut dictionary_tracker = DictionaryTracker::new(true); - let mut converter = IpcSchemaConverter::new(&mut dictionary_tracker); - let fb = converter.schema_to_fb(&schema); + let fb = IpcSchemaEncoder::new() + .with_dictionary_tracker(&mut dictionary_tracker) + .schema_to_fb(&schema); // read back fields let ipc = crate::root_as_schema(fb.finished_data()).unwrap(); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 135d3e9028d4..b5cf20ef337f 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -37,7 +37,7 @@ use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec}; use arrow_schema::*; use crate::compression::CompressionCodec; -use crate::convert::IpcSchemaConverter; +use crate::convert::IpcSchemaEncoder; use crate::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] @@ -208,8 +208,9 @@ impl IpcDataGenerator { ) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let schema = { - let mut converter = IpcSchemaConverter::new(dictionary_tracker); - let fb = converter.schema_to_fb_offset(&mut fbb, schema); + let fb = IpcSchemaEncoder::new() + .with_dictionary_tracker(dictionary_tracker) + .schema_to_fb_offset(&mut fbb, schema); fb.as_union_value() }; @@ -1003,8 +1004,9 @@ impl FileWriter { let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); let record_batches = fbb.create_vector(&self.record_blocks); - let mut converter = IpcSchemaConverter::new(&mut self.dictionary_tracker); - let schema = converter.schema_to_fb_offset(&mut fbb, &self.schema); + let schema = IpcSchemaEncoder::new() + .with_dictionary_tracker(&mut self.dictionary_tracker) + .schema_to_fb_offset(&mut fbb, &self.schema); let fb_custom_metadata = (!self.custom_metadata.is_empty()) .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata)); From 6bc335e54400a3354077138daafd126e2b9963b3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Sep 2024 16:23:56 -0400 Subject: [PATCH 5/6] reduce repeated code --- arrow-ipc/src/convert.rs | 36 ++++-------------------------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 33674e4acb82..51d37602a960 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -122,19 +122,9 @@ impl<'a> IpcSchemaEncoder<'a> { } /// Serialize a schema in IPC format -#[deprecated( - since = "54.0.0", - note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release." -)] +#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")] pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder<'_> { - let mut fbb = FlatBufferBuilder::new(); - - #[allow(deprecated)] - let root = schema_to_fb_offset(&mut fbb, schema); - - fbb.finish(root, None); - - fbb + IpcSchemaEncoder::new().schema_to_fb(schema) } pub fn metadata_to_fb<'a>( @@ -156,30 +146,12 @@ pub fn metadata_to_fb<'a>( fbb.create_vector(&custom_metadata) } -#[deprecated( - since = "54.0.0", - note = "Use `IpcSchemaConverter` instead. This function will be removed in the next release." -)] +#[deprecated(since = "54.0.0", note = "Use `IpcSchemaConverter`.")] pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, schema: &Schema, ) -> WIPOffset> { - let fields = schema - .fields() - .iter() - .map(|field| build_field(fbb, &mut None, field)) - .collect::>(); - let fb_field_list = fbb.create_vector(&fields); - - let fb_metadata_list = - (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata())); - - let mut builder = crate::SchemaBuilder::new(fbb); - builder.add_fields(fb_field_list); - if let Some(fb_metadata_list) = fb_metadata_list { - builder.add_custom_metadata(fb_metadata_list); - } - builder.finish() + IpcSchemaEncoder::new().schema_to_fb_offset(fbb, schema) } /// Convert an IPC Field to Arrow Field From 41ca359e47e0faed58894425581fffcce9be44fd Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Wed, 25 Sep 2024 11:58:06 +0200 Subject: [PATCH 6/6] Fix lints --- arrow-ipc/src/convert.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 51d37602a960..52c6a0d614d0 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -66,6 +66,12 @@ pub struct IpcSchemaEncoder<'a> { dictionary_tracker: Option<&'a mut DictionaryTracker>, } +impl<'a> Default for IpcSchemaEncoder<'a> { + fn default() -> Self { + Self::new() + } +} + impl<'a> IpcSchemaEncoder<'a> { /// Create a new schema encoder pub fn new() -> IpcSchemaEncoder<'a> { @@ -186,7 +192,7 @@ impl<'a> From> for Field { } } -/// Deserialize an ipc [1crate::Schema`] from flat buffers to an arrow [Schema]. +/// Deserialize an ipc [crate::Schema`] from flat buffers to an arrow [Schema]. pub fn fb_to_schema(fb: crate::Schema) -> Schema { let mut fields: Vec = vec![]; let c_fields = fb.fields().unwrap();