Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataframe v2: inline deduped latest logic #7705

Merged
merged 2 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use crate::{QueryEngine, RecordBatch};
// * [x] sampling support
// * [x] clears
// * [x] pagination (fast)
// * [x] take kernel duplicates all memory
// * [x] dedupe-latest without allocs/copies
// * [ ] allocate null arrays once
// * [ ] overlaps (less dumb)
// * [ ] selector-based `filtered_index`
// * [ ] configurable cache bypass
// * [ ] allocate null arrays once
// * [ ] take kernel duplicates all memory
// * [ ] dedupe-latest without allocs/copies

/// A handle to a dataframe query, ready to be executed.
///
Expand Down Expand Up @@ -623,7 +623,10 @@ impl QueryHandle<'_> {
"the query cache should have already taken care of sorting (and densifying!) the chunk",
);

let chunk = chunk.deduped_latest_on_index(&query.timeline);
// TODO(cmc): That'd be more elegant, but right now there is no way to
// avoid allocations and copies when using Arrow's `ListArray`.
//
// let chunk = chunk.deduped_latest_on_index(&query.timeline);

(AtomicU64::default(), chunk)
})
Expand Down Expand Up @@ -854,13 +857,28 @@ impl QueryHandle<'_> {
// have an Arrow ListView at our disposal.
let cur_indices = cur_chunk.iter_indices(&state.filtered_index).collect_vec();
let (index_value, cur_row_id) = 'walk: loop {
let Some((index_value, cur_row_id)) =
let Some((mut index_value, mut cur_row_id)) =
cur_indices.get(cur_cursor_value as usize).copied()
else {
continue 'overlaps;
};

if index_value == *cur_index_value {
// TODO(cmc): Because of Arrow's `ListArray` limitations, we inline the
// "deduped_latest_on_index" logic here directly, which prevents a lot of
// unnecessary allocations and copies.
while let Some((next_index_value, next_row_id)) =
cur_indices.get(cur_cursor_value as usize + 1).copied()
{
if next_index_value == *cur_index_value {
index_value = next_index_value;
cur_row_id = next_row_id;
cur_cursor_value = cur_cursor.fetch_add(1, Ordering::Relaxed) + 1;
} else {
break;
}
}

break 'walk (index_value, cur_row_id);
}

Expand Down
3 changes: 2 additions & 1 deletion scripts/ci/check_large_files_allow_list.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Cargo.lock
CHANGELOG.md
Cargo.lock
crates/build/re_types_builder/src/reflection.rs
crates/store/re_dataframe/src/query.rs
crates/store/re_types/src/datatypes/tensor_buffer.rs
crates/viewer/re_ui/data/Inter-Medium.otf
crates/viewer/re_viewer/src/reflection/mod.rs
Expand Down
Loading