From 363e55deeebd8e32b02f0025f9de2a6a35a7a1ea Mon Sep 17 00:00:00 2001 From: Yue Ni Date: Sat, 28 Sep 2024 22:31:29 +0800 Subject: [PATCH] Clear field meta data during reading so that it can be correctly processed later. --- rust/lance-table/src/format/manifest.rs | 5 +- rust/lance/src/dataset.rs | 77 ++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 9fd92c3703..8d9acaafe9 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -420,10 +420,13 @@ impl TryFrom for Manifest { .collect::>>()?, ); let fragment_offsets = compute_fragment_offsets(fragments.as_slice()); - let fields_with_meta = FieldsWithMeta { + let mut fields_with_meta = FieldsWithMeta { fields: Fields(p.fields), metadata: p.metadata, }; + for f in fields_with_meta.fields.0.iter_mut() { + f.metadata.clear(); + } if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0 && !fragments.iter().all(|frag| frag.row_id_meta.is_some()) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e593228c03..7c76ad1532 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1595,20 +1595,23 @@ mod tests { use arrow::array::as_struct_array; use arrow::compute::concat_batches; + use arrow_array::builder::ArrayBuilder; use arrow_array::{ builder::StringDictionaryBuilder, cast::as_string_array, types::{Float32Type, Int32Type}, - ArrayRef, DictionaryArray, Float32Array, Int32Array, Int64Array, Int8Array, - Int8DictionaryArray, RecordBatchIterator, StringArray, UInt16Array, UInt32Array, + ArrayRef, DictionaryArray, Float32Array, GenericListArray, Int32Array, Int64Array, + Int8Array, Int8DictionaryArray, RecordBatchIterator, RecordBatchOptions, StringArray, + UInt16Array, UInt32Array, }; use arrow_array::{FixedSizeListArray, Int16Array, Int16DictionaryArray, StructArray}; use arrow_ord::sort::sort_to_indices; use arrow_schema::{ - DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema, + DataType, Field as ArrowField, Field, Fields as ArrowFields, Schema as ArrowSchema, }; use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME}; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; + use lance_encoding::encoder::COMPRESSION_META_KEY; use lance_file::version::LanceFileVersion; use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; @@ -4123,4 +4126,72 @@ mod tests { dataset.latest_version_id().await.unwrap() ); } + + #[tokio::test] + async fn test_nested_field_meta_roundtrips() { + fn item_field_with_meta() -> Field { + let mut item_field = Field::new("item", DataType::Utf8, true); + item_field.set_metadata(HashMap::from([( + COMPRESSION_META_KEY.to_string(), + "zstd".to_string(), + )])); + item_field + } + + // generate a list array with metadata + fn generate_list_string(batch_size: usize) -> GenericListArray { + let mut item_field = item_field_with_meta(); + + let mut builder = arrow::array::ListBuilder::new(arrow::array::StringBuilder::new()) + .with_field(item_field); + while builder.len() < batch_size { + // foo_0, foo_1, foo_2, ... + let names: Vec> = + (0..20).map(|i| Some(format!("foo_{}", i))).collect(); + builder.append_value(names); + } + let names_array = builder.finish(); + names_array + } + + // write a dataset with a nested field with metadata + let item_field = item_field_with_meta(); + let names_field = Field::new("names", DataType::List(Arc::new(item_field)), false); + let schema = Arc::new(arrow::datatypes::Schema::new(vec![names_field])); + + let names = generate_list_string(10); + let schema_clone = schema.clone(); + let mut options = RecordBatchOptions::default(); + // options.match_field_names = false; + let batch = + RecordBatch::try_new_with_options(schema, vec![Arc::new(names)], &options).unwrap(); + + let batches = RecordBatchIterator::new([Ok(batch)], schema_clone); + + let write_params = WriteParams { + mode: Overwrite, + data_storage_version: Option::from(LanceFileVersion::V2_1), + ..Default::default() + }; + + let test_dir = tempdir().unwrap(); + let data_path = test_dir.path().to_str().unwrap(); + + Dataset::write(batches, data_path, Some(write_params)) + .await + .unwrap(); + + // read the dataset + let dataset = Dataset::open(data_path).await.unwrap(); + + let scanner = dataset.scan(); + let result = scanner.try_into_stream().await.unwrap(); + let mut batch_stream = result.map(|b| b); + let mut batches = Vec::new(); + while let Some(batch) = batch_stream.next().await { + assert!(batch.is_ok()); + batches.push(batch.unwrap()); + } + assert!(!batches.is_empty()); + } }