Skip to content

Commit

Permalink
fix: fix out-of-memory in tpch q21 (#850)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>

In `HashSemiJoinExecutor2`, data chunk builder for each key was reserved
with 1024 capacity. But they only have 4 rows on average in TPC-Q Q21.
Therefore, more than `4*1020*2*1500000 = 12GB` memory was wasted.

This PR fixes this bug by using `DataChunkBuilder` with dynamic
capacity. This PR also replaces vectors by boxed slices in array, making
sure that unused memory is released after build. Finally, it enables q21
in CI.

```
run-q21                 time:   [2.6544 s 2.6956 s 2.7059 s]
                        change: [-61.787% -61.416% -61.045%] (p = 0.05 < 0.05)
```

---------

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Apr 19, 2024
1 parent e3b34ff commit 7d733e6
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 44 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ jobs:
- name: Generate TPC-H 1GB dataset
run: make tpch
- name: Run benchmark
# FIXME: skip q21 as it will run out of memory
run: cargo bench --bench tpch -- --output-format bencher "q(1?\d|2[02])$" | tee output.txt
run: cargo bench --bench tpch -- --output-format bencher | tee output.txt
- name: Store benchmark result
if: github.event_name != 'pull_request'
uses: benchmark-action/github-action-benchmark@v1
Expand Down
25 changes: 1 addition & 24 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,4 @@ jobs:
run: |
./target/release/risinglight -f tests/sql/tpch/create.sql
./target/release/risinglight -f tests/sql/tpch/import.sql
./target/release/risinglight -f tests/sql/tpch-full/_q1.slt
./target/release/risinglight -f tests/sql/tpch-full/_q2.slt
./target/release/risinglight -f tests/sql/tpch-full/_q3.slt
./target/release/risinglight -f tests/sql/tpch-full/_q4.slt
./target/release/risinglight -f tests/sql/tpch-full/_q5.slt
./target/release/risinglight -f tests/sql/tpch-full/_q6.slt
./target/release/risinglight -f tests/sql/tpch-full/_q7.slt
./target/release/risinglight -f tests/sql/tpch-full/_q8.slt
./target/release/risinglight -f tests/sql/tpch-full/_q9.slt
./target/release/risinglight -f tests/sql/tpch-full/_q10.slt
./target/release/risinglight -f tests/sql/tpch-full/_q11.slt
./target/release/risinglight -f tests/sql/tpch-full/_q12.slt
./target/release/risinglight -f tests/sql/tpch-full/_q13.slt
./target/release/risinglight -f tests/sql/tpch-full/_q14.slt
./target/release/risinglight -f tests/sql/tpch-full/_q15.slt
./target/release/risinglight -f tests/sql/tpch-full/_q16.slt
./target/release/risinglight -f tests/sql/tpch-full/_q17.slt
./target/release/risinglight -f tests/sql/tpch-full/_q18.slt
./target/release/risinglight -f tests/sql/tpch-full/_q19.slt
# FIXME: sqllogictest says the query result is mismatch, but it is actually correct
# ./target/release/risinglight -f tests/sql/tpch-full/_q20.slt
# FIXME: q21 runs out of memory
# ./target/release/risinglight -f tests/sql/tpch-full/_q21.slt
./target/release/risinglight -f tests/sql/tpch-full/_q22.slt
./target/release/risinglight -f tests/sql/tpch-full/_tpch_full.slt
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ name = "array"
harness = false
name = "tpch"

[profile.bench]
codegen-units = 1
lto = 'thin'

[workspace]
members = ["proto"]

Expand Down
12 changes: 6 additions & 6 deletions src/array/bytes_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crate::types::BlobRef;
/// A collection of variable-length values.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct BytesArray<T: ValueRef + ?Sized> {
offset: Vec<usize>,
offset: Box<[usize]>,
valid: BitVec,
data: Vec<u8>,
data: Box<[u8]>,
_type: PhantomData<T>,
}

Expand Down Expand Up @@ -108,8 +108,8 @@ impl<T: ValueRef + ?Sized> ArrayFromDataExt for BytesArray<T> {
}
Self {
valid,
data,
offset,
data: data.into(),
offset: offset.into(),
_type: PhantomData,
}
}
Expand Down Expand Up @@ -197,8 +197,8 @@ impl<T: ValueRef + ?Sized> ArrayBuilder for BytesArrayBuilder<T> {
fn take(&mut self) -> BytesArray<T> {
BytesArray {
valid: mem::take(&mut self.valid),
data: mem::take(&mut self.data),
offset: mem::replace(&mut self.offset, vec![0]),
data: mem::take(&mut self.data).into(),
offset: mem::replace(&mut self.offset, vec![0]).into(),
_type: PhantomData,
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/array/data_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ impl DataChunkBuilder {
}
}

/// Create a [`DataChunkBuilder`] with unbounded capacity.
pub fn unbounded<'a>(data_types: impl IntoIterator<Item = &'a DataType>) -> Self {
let array_builders = data_types.into_iter().map(ArrayBuilderImpl::new).collect();
DataChunkBuilder {
array_builders,
size: 0,
capacity: usize::MAX,
}
}

/// Push a row in the Iterator.
///
/// The row is accepted as an iterator of [`DataValue`], and it's required that the size of row
Expand Down Expand Up @@ -86,7 +96,9 @@ impl DataChunkBuilder {
.iter_mut()
.map(|builder| {
let chunk = builder.take();
builder.reserve(capacity);
if capacity != usize::MAX {
builder.reserve(capacity);
}
chunk
})
.collect(),
Expand Down
12 changes: 6 additions & 6 deletions src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::types::{NativeType, F32, F64};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct PrimitiveArray<T: NativeType> {
valid: BitVec,
data: Vec<T>,
data: Box<[T]>,
}

// Enable `collect()` an array from iterator of `Option<T>`.
Expand All @@ -34,7 +34,7 @@ impl<T: NativeType> FromIterator<Option<T>> for PrimitiveArray<T> {
// Enable `collect()` an array from iterator of `T`.
impl<T: NativeType> FromIterator<T> for PrimitiveArray<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let data: Vec<T> = iter.into_iter().collect();
let data: Box<[T]> = iter.into_iter().collect();
let size = data.len();
Self {
data,
Expand All @@ -45,7 +45,7 @@ impl<T: NativeType> FromIterator<T> for PrimitiveArray<T> {

impl FromIterator<f32> for PrimitiveArray<F32> {
fn from_iter<I: IntoIterator<Item = f32>>(iter: I) -> Self {
let data: Vec<F32> = iter.into_iter().map(F32::from).collect();
let data: Box<[F32]> = iter.into_iter().map(F32::from).collect();
let size = data.len();
Self {
data,
Expand All @@ -56,7 +56,7 @@ impl FromIterator<f32> for PrimitiveArray<F32> {

impl FromIterator<f64> for PrimitiveArray<F64> {
fn from_iter<I: IntoIterator<Item = f64>>(iter: I) -> Self {
let data: Vec<F64> = iter.into_iter().map(F64::from).collect();
let data: Box<[F64]> = iter.into_iter().map(F64::from).collect();
let size = data.len();
Self {
data,
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<T: NativeType> ArrayBuilder for PrimitiveArrayBuilder<T> {
fn take(&mut self) -> PrimitiveArray<T> {
PrimitiveArray {
valid: mem::take(&mut self.valid),
data: mem::take(&mut self.data),
data: mem::take(&mut self.data).into(),
}
}
}
Expand All @@ -192,7 +192,7 @@ impl PrimitiveArray<bool> {
impl PrimitiveArray<Decimal> {
/// Rescale the decimals.
pub fn rescale(&mut self, scale: u8) {
for v in &mut self.data {
for v in self.data.iter_mut() {
v.rescale(scale as u32);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl HashSemiJoinExecutor2 {
for (key, row) in keys_chunk.rows().zip(chunk.rows()) {
let chunk = key_set
.entry(key.values().collect())
.or_insert_with(|| DataChunkBuilder::new(&self.right_types, 1024))
.or_insert_with(|| DataChunkBuilder::unbounded(&self.right_types))
.push_row(row.values());
assert!(chunk.is_none());
}
Expand Down

0 comments on commit 7d733e6

Please sign in to comment.