Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear field meta data during reading so that it can be processed #2949

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,13 @@ impl TryFrom<pb::Manifest> for Manifest {
.collect::<Result<Vec<_>>>()?,
);
let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
let fields_with_meta = FieldsWithMeta {
let mut fields_with_meta = FieldsWithMeta {
fields: Fields(p.fields),
metadata: p.metadata,
};
for f in fields_with_meta.fields.0.iter_mut() {
f.metadata.clear();
}

if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
&& !fragments.iter().all(|frag| frag.row_id_meta.is_some())
Expand Down
77 changes: 74 additions & 3 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,20 +1595,23 @@

use arrow::array::as_struct_array;
use arrow::compute::concat_batches;
use arrow_array::builder::ArrayBuilder;
use arrow_array::{
builder::StringDictionaryBuilder,
cast::as_string_array,
types::{Float32Type, Int32Type},
ArrayRef, DictionaryArray, Float32Array, Int32Array, Int64Array, Int8Array,
Int8DictionaryArray, RecordBatchIterator, StringArray, UInt16Array, UInt32Array,
ArrayRef, DictionaryArray, Float32Array, GenericListArray, Int32Array, Int64Array,
Int8Array, Int8DictionaryArray, RecordBatchIterator, RecordBatchOptions, StringArray,
UInt16Array, UInt32Array,
};
use arrow_array::{FixedSizeListArray, Int16Array, Int16DictionaryArray, StructArray};
use arrow_ord::sort::sort_to_indices;
use arrow_schema::{
DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
DataType, Field as ArrowField, Field, Fields as ArrowFields, Schema as ArrowSchema,
};
use lance_arrow::bfloat16::{self, ARROW_EXT_META_KEY, ARROW_EXT_NAME_KEY, BFLOAT16_EXT_NAME};
use lance_datagen::{array, gen, BatchCount, Dimension, RowCount};
use lance_encoding::encoder::COMPRESSION_META_KEY;
use lance_file::version::LanceFileVersion;
use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, DatasetIndexExt, IndexType};
use lance_linalg::distance::MetricType;
Expand Down Expand Up @@ -4123,4 +4126,72 @@
dataset.latest_version_id().await.unwrap()
);
}

#[tokio::test]
async fn test_nested_field_meta_roundtrips() {
fn item_field_with_meta() -> Field {
let mut item_field = Field::new("item", DataType::Utf8, true);
item_field.set_metadata(HashMap::from([(
COMPRESSION_META_KEY.to_string(),
"zstd".to_string(),
)]));
item_field
}

// generate a list<utf8> array with metadata
fn generate_list_string(batch_size: usize) -> GenericListArray<i32> {
let mut item_field = item_field_with_meta();

Check warning on line 4143 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-arm

variable does not need to be mutable

Check warning on line 4143 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

variable does not need to be mutable

Check warning on line 4143 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

variable does not need to be mutable

let mut builder = arrow::array::ListBuilder::new(arrow::array::StringBuilder::new())
.with_field(item_field);
while builder.len() < batch_size {
// foo_0, foo_1, foo_2, ...
let names: Vec<Option<String>> =
(0..20).map(|i| Some(format!("foo_{}", i))).collect();
builder.append_value(names);
}
let names_array = builder.finish();
names_array
}

// write a dataset with a nested field with metadata
let item_field = item_field_with_meta();
let names_field = Field::new("names", DataType::List(Arc::new(item_field)), false);
let schema = Arc::new(arrow::datatypes::Schema::new(vec![names_field]));

let names = generate_list_string(10);
let schema_clone = schema.clone();
let mut options = RecordBatchOptions::default();

Check warning on line 4164 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-arm

variable does not need to be mutable

Check warning on line 4164 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

variable does not need to be mutable

Check warning on line 4164 in rust/lance/src/dataset.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

variable does not need to be mutable
// options.match_field_names = false;
let batch =
RecordBatch::try_new_with_options(schema, vec![Arc::new(names)], &options).unwrap();

let batches = RecordBatchIterator::new([Ok(batch)], schema_clone);

let write_params = WriteParams {
mode: Overwrite,
data_storage_version: Option::from(LanceFileVersion::V2_1),
..Default::default()
};

let test_dir = tempdir().unwrap();
let data_path = test_dir.path().to_str().unwrap();

Dataset::write(batches, data_path, Some(write_params))
.await
.unwrap();

// read the dataset
let dataset = Dataset::open(data_path).await.unwrap();

let scanner = dataset.scan();
let result = scanner.try_into_stream().await.unwrap();
let mut batch_stream = result.map(|b| b);
let mut batches = Vec::new();
while let Some(batch) = batch_stream.next().await {
assert!(batch.is_ok());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error occurs at this point, but for some reason, the exact error message is not reported by cargo test (it crashes without providing useful information). However, if the test case is placed in lance/examples/write_read_ds.rs, the error message described in issue #2947 will be displayed.

batches.push(batch.unwrap());
}
assert!(!batches.is_empty());
}
}
Loading