diff --git a/Cargo.lock b/Cargo.lock index 17d91117815a..bcd19b27731d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3588,6 +3588,19 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308d96db8debc727c3fd9744aac51751243420e46edf401010908da7f8d5e57c" +[[package]] +name = "ndarray" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb12d4e967ec485a5f71c6311fe28158e9d6f4bc4a447b474184d0f91a8fa32" +dependencies = [ + "matrixmultiply", + "num-complex", + "num-integer", + "num-traits", + "rawpointer", +] + [[package]] name = "ndarray" version = "0.16.1" @@ -3609,7 +3622,7 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f093b3db6fd194718dcdeea6bd8c829417deae904e3fcc7732dabcd4416d25d8" dependencies = [ - "ndarray", + "ndarray 0.16.1", "rand", "rand_distr", ] @@ -3846,6 +3859,21 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "numpy" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec170733ca37175f5d75a5bea5911d6ff45d2cd52849ce98b685394e4f2f37f4" +dependencies = [ + "libc", + "ndarray 0.15.6", + "num-complex", + "num-integer", + "num-traits", + "pyo3", + "rustc-hash", +] + [[package]] name = "objc" version = "0.2.7" @@ -5457,7 +5485,7 @@ dependencies = [ "document-features", "itertools 0.13.0", "libc", - "ndarray", + "ndarray 0.16.1", "ndarray-rand", "once_cell", "parking_lot", @@ -5647,7 +5675,7 @@ dependencies = [ "bytemuck", "egui", "half 2.3.1", - "ndarray", + "ndarray 0.16.1", "re_chunk_store", "re_data_ui", "re_log_types", @@ -5798,7 +5826,7 @@ dependencies = [ "linked-hash-map", "mime_guess2", "mint", - "ndarray", + "ndarray 0.16.1", "nohash-hasher", "once_cell", "ply-rs", @@ -6015,7 +6043,7 @@ dependencies = [ "indexmap 2.1.0", "itertools 0.13.0", "linked-hash-map", - "ndarray", + "ndarray 0.16.1", "nohash-hasher", "once_cell", "parking_lot", @@ -6288,6 +6316,7 @@ dependencies = [ "infer", "itertools 0.13.0", "mimalloc", + "numpy", "once_cell", "parking_lot", "pyo3", @@ -6460,7 +6489,7 @@ dependencies = [ "clap", "half 2.3.1", "image", - "ndarray", + "ndarray 0.16.1", "re_log", "rerun", ] @@ -6532,7 +6561,7 @@ version = "0.19.0-alpha.1+dev" dependencies = [ "anyhow", "clap", - "ndarray", + "ndarray 0.16.1", "re_log", "rerun", ] @@ -7015,7 +7044,7 @@ name = "snippets" version = "0.19.0-alpha.1+dev" dependencies = [ "itertools 0.13.0", - "ndarray", + "ndarray 0.16.1", "rand", "rand_distr", "re_build_tools", @@ -7187,7 +7216,7 @@ dependencies = [ "clap", "glam", "itertools 0.13.0", - "ndarray", + "ndarray 0.16.1", "ndarray-rand", "rand", "re_log", diff --git a/Cargo.toml b/Cargo.toml index 59262497c974..4865b52f5196 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -195,7 +195,7 @@ memory-stats = "1.1" # This version is not pinned to avoid creating version requirement conflicts, # but other packages pin it to exactly "=0.1.37" mimalloc = "0.1.37" -mime_guess2 = "2.0" # infer MIME type by file extension, and map mime to file extension +mime_guess2 = "2.0" # infer MIME type by file extension, and map mime to file extension mint = "0.5.9" re_mp4 = "0.1.0" natord = "1.0.9" @@ -206,6 +206,9 @@ nohash-hasher = "0.2" notify = { version = "6.1.1", features = ["macos_kqueue"] } num-derive = "0.4" num-traits = "0.2" +# TODO(#7676) This pulls in an older ndarray. Remove it from the skip list in `deny.toml` and +# close the issue when updating to 0.22. +numpy = "0.21" once_cell = "1.17" # No lazy_static - use `std::sync::OnceLock` or `once_cell` instead ordered-float = "4.2" parking_lot = "0.12" diff --git a/deny.toml b/deny.toml index d99c623c3e5d..2e1166e5f5e7 100644 --- a/deny.toml +++ b/deny.toml @@ -55,6 +55,7 @@ skip = [ { name = "hashbrown" }, # Old version used by polar-rs { name = "libloading" }, # Old version used by ash (vulkan binding), newer version used by khronos-egl { name = "memoffset" }, # Small crate + { name = "ndarray" }, # Needed by `numpy<0.22` in `rerun_py` { name = "prettyplease" }, # Old version being used by prost { name = "pulldown-cmark" }, # Build-dependency via `ply-rs` (!). TODO(emilk): use a better crate for .ply parsing { name = "raw-window-handle" }, # Pretty small crate; some crates still on old version diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 9695a6bf9764..8a5ae242eb66 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -62,6 +62,7 @@ infer.workspace = true # TODO(#5875): `mimalloc` starts leaking OS pages starting with `0.1.38`. # When the bug is fixed, change this back to `mimalloc = { workspace = true, …`. mimalloc = { version = "=0.1.37", features = ["local_dynamic_tls"] } +numpy.workspace = true once_cell.workspace = true parking_lot.workspace = true pyo3 = { workspace = true, features = ["abi3-py38"] } diff --git a/rerun_py/rerun_bindings/rerun_bindings.pyi b/rerun_py/rerun_bindings/rerun_bindings.pyi index ed9d606eae2f..72dcdb44801a 100644 --- a/rerun_py/rerun_bindings/rerun_bindings.pyi +++ b/rerun_py/rerun_bindings/rerun_bindings.pyi @@ -3,7 +3,7 @@ from typing import Optional, Sequence import pyarrow as pa -from .types import AnyColumn, ComponentLike, ViewContentsLike +from .types import AnyColumn, ComponentLike, IndexValuesLike, ViewContentsLike class IndexColumnDescriptor: """A column containing the index values for when the component data was updated.""" @@ -57,6 +57,16 @@ class RecordingView: """Filter the view to only include data between the given index time values.""" ... + def filter_index_values(self, values: IndexValuesLike) -> RecordingView: + """ + Filter the view to only include data at the given index values. + + This requires index values to be a precise match. Index values in Rerun are + represented as i64 sequence counts or nanoseconds. This API does not expose an interface + in floating point seconds, as the numerical conversion would risk false mismatches. + """ + ... + def select(self, *args: AnyColumn, columns: Optional[Sequence[AnyColumn]] = None) -> pa.RecordBatchReader: ... class Recording: diff --git a/rerun_py/rerun_bindings/types.py b/rerun_py/rerun_bindings/types.py index 7ad4207a8b63..87c219f21c9c 100644 --- a/rerun_py/rerun_bindings/types.py +++ b/rerun_py/rerun_bindings/types.py @@ -2,23 +2,27 @@ from typing import TYPE_CHECKING, Sequence, TypeAlias, Union +import numpy as np +import numpy.typing as npt +import pyarrow as pa + if TYPE_CHECKING: from rerun._baseclasses import ComponentMixin from .rerun_bindings import ( ComponentColumnDescriptor as ComponentColumnDescriptor, ComponentColumnSelector as ComponentColumnSelector, - TimeColumnDescriptor as TimeColumnDescriptor, - TimeColumnSelector as TimeColumnSelector, + IndexColumnSelector as IndexColumnDescriptor, + IndexColumnSelector as IndexColumnSelector, ) ComponentLike: TypeAlias = Union[str, type["ComponentMixin"]] AnyColumn: TypeAlias = Union[ - "TimeColumnDescriptor", "ComponentColumnDescriptor", - "TimeColumnSelector", "ComponentColumnSelector", + "IndexColumnDescriptor", + "IndexColumnSelector", ] AnyComponentColumn: TypeAlias = Union[ @@ -30,3 +34,5 @@ str, dict[str, Union[AnyColumn, Sequence[ComponentLike]]], ] + +IndexValuesLike: TypeAlias = Union[npt.NDArray[np.int_], pa.Int64Array] diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index dc7b027537d3..76698e5b7657 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -5,9 +5,10 @@ use std::collections::{BTreeMap, BTreeSet}; use arrow::{ - array::{RecordBatchIterator, RecordBatchReader}, + array::{make_array, Array, ArrayData, Int64Array, RecordBatchIterator, RecordBatchReader}, pyarrow::PyArrowType, }; +use numpy::PyArrayMethods as _; use pyo3::{ exceptions::{PyRuntimeError, PyTypeError, PyValueError}, prelude::*, @@ -195,6 +196,108 @@ impl AnyComponentColumn { } } +#[derive(FromPyObject)] +enum IndexValuesLike<'py> { + PyArrow(PyArrowType), + NumPy(numpy::PyArrayLike1<'py, i64>), + + // Catch all to support ChunkedArray and other types + #[pyo3(transparent)] + CatchAll(Bound<'py, PyAny>), +} + +impl<'py> IndexValuesLike<'py> { + fn to_index_values(&self) -> PyResult> { + match self { + Self::PyArrow(array) => { + let array = make_array(array.0.clone()); + + let int_array = array.as_any().downcast_ref::().ok_or_else(|| { + PyTypeError::new_err("pyarrow.Array for IndexValuesLike must be of type int64.") + })?; + + let values: BTreeSet = int_array + .iter() + .map(|v| { + v.map_or_else( + || re_chunk_store::TimeInt::STATIC, + // The use of temporal here should be fine even if the data is + // not actually temporal. The important thing is we are converting + // from an i64 input + re_chunk_store::TimeInt::new_temporal, + ) + }) + .collect(); + + if values.len() != int_array.len() { + return Err(PyValueError::new_err("Index values must be unique.")); + } + + Ok(values) + } + Self::NumPy(array) => { + let values: BTreeSet = array + .readonly() + .as_array() + .iter() + // The use of temporal here should be fine even if the data is + // not actually temporal. The important thing is we are converting + // from an i64 input + .map(|v| re_chunk_store::TimeInt::new_temporal(*v)) + .collect(); + + if values.len() != array.len()? { + return Err(PyValueError::new_err("Index values must be unique.")); + } + + Ok(values) + } + Self::CatchAll(any) => { + // If any has the `.chunks` attribute, we can try to try each chunk as pyarrow array + if let Ok(chunks) = any.getattr("chunks") { + let mut values = BTreeSet::new(); + for chunk in chunks.iter()? { + let chunk = chunk?.extract::>()?; + let array = make_array(chunk.0.clone()); + + let int_array = + array.as_any().downcast_ref::().ok_or_else(|| { + PyTypeError::new_err( + "pyarrow.Array for IndexValuesLike must be of type int64.", + ) + })?; + + values.extend( + int_array + .iter() + .map(|v| { + v.map_or_else( + || re_chunk_store::TimeInt::STATIC, + // The use of temporal here should be fine even if the data is + // not actually temporal. The important thing is we are converting + // from an i64 input + re_chunk_store::TimeInt::new_temporal, + ) + }) + .collect::>(), + ); + } + + if values.len() != any.len()? { + return Err(PyValueError::new_err("Index values must be unique.")); + } + + Ok(values) + } else { + Err(PyTypeError::new_err( + "IndexValuesLike must be a pyarrow.Array, pyarrow.ChunkedArray, or numpy.ndarray", + )) + } + } + } + } +} + struct ComponentLike(re_sdk::ComponentName); impl FromPyObject<'_> for ComponentLike { @@ -438,6 +541,18 @@ impl PyRecordingView { query_expression, }) } + + fn filter_index_values(&self, values: IndexValuesLike<'_>) -> PyResult { + let values = values.to_index_values()?; + + let mut query_expression = self.query_expression.clone(); + query_expression.filtered_index_values = Some(values); + + Ok(Self { + recording: self.recording.clone(), + query_expression, + }) + } } impl PyRecording { diff --git a/rerun_py/tests/unit/test_dataframe.py b/rerun_py/tests/unit/test_dataframe.py index 93183f42f8d1..06b967bdb937 100644 --- a/rerun_py/tests/unit/test_dataframe.py +++ b/rerun_py/tests/unit/test_dataframe.py @@ -5,6 +5,7 @@ import uuid import pyarrow as pa +import pytest import rerun as rr APP_ID = "rerun_example_test_recording" @@ -59,6 +60,32 @@ def setup_method(self) -> None: self.recording = rr.dataframe.load_recording(rrd) + self.expected_index0 = pa.array( + [1], + type=pa.int64(), + ) + + self.expected_index1 = pa.array( + [7], + type=pa.int64(), + ) + + self.expected_pos0 = pa.array( + [ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9], + ], + type=rr.components.Position3D.arrow_type(), + ) + + self.expected_pos1 = pa.array( + [ + [10, 11, 12], + ], + type=rr.components.Position3D.arrow_type(), + ) + def test_recording_info(self) -> None: assert self.recording.application_id() == APP_ID assert self.recording.recording_id() == str(RECORDING_ID) @@ -75,48 +102,91 @@ def test_full_view(self) -> None: def test_select_columns(self) -> None: view = self.recording.view(index="my_index", contents="points") - log_time = rr.dataframe.IndexColumnSelector("my_index") + index_col = rr.dataframe.IndexColumnSelector("my_index") pos = rr.dataframe.ComponentColumnSelector("points", rr.components.Position3D) - batches = view.select(log_time, pos) + batches = view.select(index_col, pos) table = pa.Table.from_batches(batches, batches.schema) # points assert table.num_columns == 2 assert table.num_rows == 2 - expected_index0 = pa.array( - [1], - type=pa.int64(), - ) + assert table.column("my_index")[0].equals(self.expected_index0[0]) + assert table.column("my_index")[1].equals(self.expected_index1[0]) + assert table.column("/points:Position3D")[0].values.equals(self.expected_pos0) + assert table.column("/points:Position3D")[1].values.equals(self.expected_pos1) - expected_index1 = pa.array( - [7], - type=pa.int64(), - ) + def test_index_values(self) -> None: + view = self.recording.view(index="my_index", contents="points") + view = view.filter_index_values([1, 7, 9]) - expected_pos0 = pa.array( - [ - [1, 2, 3], - [4, 5, 6], - [7, 8, 9], - ], - type=rr.components.Position3D.arrow_type(), - ) + batches = view.select() + table = pa.Table.from_batches(batches, batches.schema) - expected_pos1 = pa.array( - [ - [10, 11, 12], - ], - type=rr.components.Position3D.arrow_type(), - ) + # my_index, log_time, log_tick, points, colors + assert table.num_columns == 5 + assert table.num_rows == 2 + + assert table.column("my_index")[0].equals(self.expected_index0[0]) + assert table.column("my_index")[1].equals(self.expected_index1[0]) + + # This is a chunked array + new_selection_chunked = table.column("my_index").take([1]) + + # This is a single array + new_selection = new_selection_chunked.combine_chunks() + + view2 = view.filter_index_values(new_selection_chunked) + batches = view2.select() + table2 = pa.Table.from_batches(batches, batches.schema) + + # my_index, log_time, log_tick, points, colors + assert table2.num_columns == 5 + assert table2.num_rows == 1 + + assert table2.column("my_index")[0].equals(self.expected_index1[0]) + + view3 = view.filter_index_values(new_selection) + batches = view3.select() + table3 = pa.Table.from_batches(batches, batches.schema) + + assert table3 == table2 + + # Manually create a pyarrow array with no matches + view4 = view.filter_index_values(pa.array([8], type=pa.int64())) + batches = view4.select() + table4 = pa.Table.from_batches(batches, batches.schema) + + assert table4.num_rows == 0 + + # Manually create a chunked array with 1 match + manual_chunked_selection = pa.chunked_array([ + pa.array([2], type=pa.int64()), + pa.array([3, 7, 8], type=pa.int64()), + pa.array([], type=pa.int64()), + pa.array([9, 10, 11], type=pa.int64()), + ]) + + # Confirm len is num elements, not num chunks + assert len(manual_chunked_selection) == 7 + assert len(manual_chunked_selection.chunks) == 4 + + view5 = view.filter_index_values(manual_chunked_selection) + batches = view5.select() + table5 = pa.Table.from_batches(batches, batches.schema) + + assert table5.num_rows == 1 + + # Exceptions + with pytest.raises(ValueError): + view.filter_index_values(pa.array([8, 8], type=pa.int64())) - print(table.schema) + with pytest.raises(TypeError): + view.filter_index_values("1") - assert table.column("my_index")[0].equals(expected_index0[0]) - assert table.column("my_index")[1].equals(expected_index1[0]) - assert table.column("/points:Position3D")[0].values.equals(expected_pos0) - assert table.column("/points:Position3D")[1].values.equals(expected_pos1) + with pytest.raises(TypeError): + view.filter_index_values(pa.array([1.0, 2.0], type=pa.float64())) def test_view_syntax(self) -> None: good_content_expressions = [