Skip to content

Commit

Permalink
inline deduped latest logic
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Oct 12, 2024
1 parent 5bc3ba6 commit 3b79af6
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use crate::{QueryEngine, RecordBatch};
// * [x] sampling support
// * [x] clears
// * [x] pagination (fast)
// * [x] take kernel duplicates all memory
// * [x] dedupe-latest without allocs/copies
// * [ ] allocate null arrays once
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
// * [ ] configurable cache bypass
// * [ ] allocate null arrays once
// * [ ] take kernel duplicates all memory
// * [ ] dedupe-latest without allocs/copies

/// A handle to a dataframe query, ready to be executed.
///
Expand Down Expand Up @@ -623,7 +623,10 @@ impl QueryHandle<'_> {
"the query cache should have already taken care of sorting (and densifying!) the chunk",
);

let chunk = chunk.deduped_latest_on_index(&query.timeline);
// TODO(cmc): That'd be more elegant, but right now there is no way to
// avoid allocations and copies when using Arrow's `ListArray`.
//
// let chunk = chunk.deduped_latest_on_index(&query.timeline);

(AtomicU64::default(), chunk)
})
Expand Down Expand Up @@ -854,13 +857,28 @@ impl QueryHandle<'_> {
// have an Arrow ListView at our disposal.
let cur_indices = cur_chunk.iter_indices(&state.filtered_index).collect_vec();
let (index_value, cur_row_id) = 'walk: loop {
let Some((index_value, cur_row_id)) =
let Some((mut index_value, mut cur_row_id)) =
cur_indices.get(cur_cursor_value as usize).copied()
else {
continue 'overlaps;
};

if index_value == *cur_index_value {
// TODO(cmc): Because of Arrow's `ListArray` limitations, we inline the
// "deduped_latest_on_index" logic here directly, which prevents a lot of
// unnecessary allocations and copies.
while let Some((next_index_value, next_row_id)) =
cur_indices.get(cur_cursor_value as usize + 1).copied()
{
if next_index_value == *cur_index_value {
index_value = next_index_value;
cur_row_id = next_row_id;
cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
} else {
break;
}
}

break 'walk (index_value, cur_row_id);
}

Expand Down

0 comments on commit 3b79af6

Please sign in to comment.