Skip to content

Commit

Permalink
Stop Vortex benchmarks from overriding each other (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Aug 8, 2024
1 parent 31bb3ba commit 2e6e6ba
Showing 1 changed file with 71 additions and 69 deletions.
140 changes: 71 additions & 69 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,84 +194,86 @@ async fn register_vortex_file(
schema: &Schema,
enable_compression: bool,
) -> anyhow::Result<()> {
let vtx_file = idempotent_async(
&file.with_extension("").with_extension("vtx"),
|vtx_file| async move {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;
let path = if enable_compression {
file.with_extension("").with_extension("vtxcmp")
} else {
file.with_extension("").with_extension("vtxucmp")
};
let vtx_file = idempotent_async(&path, |vtx_file| async move {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
CsvReadOptions::default()
.delimiter(b'|')
.has_header(false)
.file_extension("tbl")
.schema(schema),
)
.await?
.collect()
.await?;

// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.iter()
.cloned()
.map(Array::from)
.map(|a| a.into_struct().unwrap())
.collect::<Vec<_>>();
// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.iter()
.cloned()
.map(Array::from)
.map(|a| a.into_struct().unwrap())
.collect::<Vec<_>>();

let mut arrays_map: HashMap<Arc<str>, Vec<Array>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();
let mut arrays_map: HashMap<Arc<str>, Vec<Array>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();

for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();
for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();

for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name).unwrap());
for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name).unwrap());

types_map.insert(field_name.clone(), field_type.clone());
}
types_map.insert(field_name.clone(), field_type.clone());
}
}

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map.get(&name).unwrap().clone();
let chunks = arrays_map.remove(&name).unwrap();

(
name.clone(),
ChunkedArray::try_new(chunks, dtype).unwrap().into_array(),
)
})
.collect::<Vec<_>>();

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map.get(&name).unwrap().clone();
let chunks = arrays_map.remove(&name).unwrap();

(
name.clone(),
ChunkedArray::try_new(chunks, dtype).unwrap().into_array(),
)
})
.collect::<Vec<_>>();

let data = StructArray::from_fields(&fields).into_array();

let data = if enable_compression {
let compressor = SamplingCompressor::default();
compressor.compress(&data, None)?.into_array()
} else {
data
};

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;
let data = StructArray::from_fields(&fields).into_array();

let data = if enable_compression {
let compressor = SamplingCompressor::default();
compressor.compress(&data, None)?.into_array()
} else {
data
};

let mut writer = LayoutWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

anyhow::Ok(())
},
)
let mut writer = LayoutWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

anyhow::Ok(())
})
.await?;

let f = OpenOptions::new()
Expand Down

0 comments on commit 2e6e6ba

Please sign in to comment.