Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect int/float/boolean/date page-level statistics on write #1346

Merged
merged 20 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@ impl FileFragment {
let filename = format!("{}.lance", Uuid::new_v4());
let mut fragment = Fragment::with_file(id as u64, &filename, &schema, 0);
let full_path = base_path.child(DATA_DIR).child(filename.clone());
let mut writer = FileWriter::try_new(&object_store, &full_path, schema.clone()).await?;
let mut writer = FileWriter::try_new(
&object_store,
&full_path,
schema.clone(),
&Default::default(),
)
.await?;

progress.begin(&fragment, writer.multipart_id()).await?;

Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/dataset/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Updater {
self.fragment.dataset().object_store.as_ref(),
&full_path,
schema,
&Default::default(),
)
.await
}
Expand Down
10 changes: 7 additions & 3 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,13 @@ impl WriterGenerator {
fragment.add_file(&data_file_path, &self.schema);

let full_path = self.base_dir.child(DATA_DIR).child(data_file_path);
let writer =
FileWriter::try_new(self.object_store.as_ref(), &full_path, self.schema.clone())
.await?;
let writer = FileWriter::try_new(
self.object_store.as_ref(),
&full_path,
self.schema.clone(),
&Default::default(),
)
.await?;

Ok((writer, fragment))
}
Expand Down
33 changes: 21 additions & 12 deletions rust/lance/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,9 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();

let array = Int32Array::from_iter_values(0..1000);

Expand Down Expand Up @@ -794,9 +796,10 @@ mod tests {
true,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();

let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
for i in 0..10 {
Expand All @@ -812,7 +815,9 @@ mod tests {
assert_eq!(batch.column_by_name("b").unwrap().as_ref(), &array);

let array = BooleanArray::from(vec![Some(true), Some(false), None]);
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer
.write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array)]).unwrap()])
.await
Expand All @@ -838,7 +843,9 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();

for i in (0..100).step_by(4) {
let slice: FixedSizeListArray = fixed_size_list.slice(i, 4);
Expand Down Expand Up @@ -869,9 +876,10 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();

let array = BooleanArray::from((0..120).map(|v| v % 5 == 0).collect::<Vec<_>>());
let batch =
Expand Down Expand Up @@ -903,9 +911,10 @@ mod tests {
false,
)]));
let schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();

let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::<Vec<_>>());
let batch =
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/vector/graph/persisted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ pub async fn write_graph<V: Vertex + Clone + Sync + Send>(
]));
let schema = Schema::try_from(arrow_schema.as_ref())?;

let mut writer = FileWriter::try_new(object_store, path, schema).await?;
let mut writer = FileWriter::try_new(object_store, path, schema, &Default::default()).await?;
for nodes in graph.nodes.as_slice().chunks(params.batch_size) {
let mut vertex_builder =
FixedSizeBinaryBuilder::with_capacity(nodes.len(), binary_size as i32);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ShufflerBuilder {
flush_size: flush_threshold, // TODO: change to parameterized value later.
temp_dir,
parted_groups: BTreeMap::new(),
writer: FileWriter::with_object_writer(writer, lance_schema)?,
writer: FileWriter::with_object_writer(writer, lance_schema, &Default::default())?,
})
}

Expand Down
58 changes: 38 additions & 20 deletions rust/lance/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,9 @@ mod tests {
let path = Path::from("/foo");

// Write 10 batches.
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Expand Down Expand Up @@ -1083,7 +1085,9 @@ mod tests {
}
schema.set_dictionary(&batches[0]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
for batch in batches.iter() {
file_writer.write(&[batch.clone()]).await.unwrap();
}
Expand Down Expand Up @@ -1125,9 +1129,10 @@ mod tests {
let (store, path) = ObjectStore::from_uri("memory:///foo").await.unwrap();

// Write 10 batches.
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Expand Down Expand Up @@ -1183,9 +1188,10 @@ mod tests {
let (store, path) = ObjectStore::from_uri("memory:///foo").await.unwrap();

// Write 10 batches.
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();
for batch_id in 0..10 {
let value_range = batch_id * 10..batch_id * 10 + 10;
let columns: Vec<ArrayRef> = vec![
Expand Down Expand Up @@ -1252,7 +1258,9 @@ mod tests {
)]));
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer.write(&[batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1296,7 +1304,9 @@ mod tests {
.collect::<Vec<_>>();
let batches_ref = batches.iter().collect::<Vec<_>>();

let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer.write(&batches).await.unwrap();
file_writer.finish().await.unwrap();

Expand All @@ -1316,7 +1326,9 @@ mod tests {
let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap();

let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1494,9 +1506,10 @@ mod tests {
// write to a lance file
let store = ObjectStore::memory();
let path = Path::from("/takes");
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1604,9 +1617,10 @@ mod tests {
let store = ObjectStore::memory();
let path = Path::from("/take_list");
let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema.clone())
.await
.unwrap();
let mut file_writer =
FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1674,7 +1688,9 @@ mod tests {
.unwrap();

let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer.write(&[batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

Expand All @@ -1698,7 +1714,9 @@ mod tests {
// write to a lance file
let store = ObjectStore::memory();
let path = Path::from("/read_range");
let mut file_writer = FileWriter::try_new(&store, &path, schema).await.unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, schema, &Default::default())
.await
.unwrap();
file_writer.write(&[batch]).await.unwrap();
file_writer.finish().await.unwrap();

Expand Down Expand Up @@ -1761,7 +1779,7 @@ mod tests {

let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut writer = FileWriter::try_new(&store, &path, schema.clone())
let mut writer = FileWriter::try_new(&store, &path, schema.clone(), &Default::default())
.await
.unwrap();
for i in 0..10 {
Expand Down
Loading