Skip to content

Commit

Permalink
Allow scalar indices to be disabled for a query
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Sep 26, 2024
1 parent 51126ab commit 83482f8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 2 deletions.
26 changes: 26 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def scanner(
fast_search: bool = False,
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
) -> LanceScanner:
"""Return a Scanner that can support various pushdowns.
Expand Down Expand Up @@ -310,6 +311,10 @@ def scanner(
number of rows (or be empty) if the rows closest to the query do not
match the filter. It's generally good when the filter is not very
selective.
use_scalar_index: bool, default True
Lance will automatically use scalar indices to optimize a query. In some
corner cases this can make query performance worse and this parameter can
be used to disable scalar indices in these cases.
late_materialization: bool or List[str], default None
Allows custom control over late materialization. Late materialization
fetches non-query columns using a take operation after the filter. This
Expand Down Expand Up @@ -378,6 +383,7 @@ def scanner(
.with_row_id(with_row_id)
.with_row_address(with_row_address)
.use_stats(use_stats)
.use_scalar_index(use_scalar_index)
.fast_search(fast_search)
)
if full_text_query is not None:
Expand Down Expand Up @@ -430,6 +436,7 @@ def to_table(
full_text_query: Optional[Union[str, dict]] = None,
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
) -> pa.Table:
"""Read the data into memory as a pyarrow Table.
Expand Down Expand Up @@ -479,6 +486,9 @@ def to_table(
late_materialization: bool or List[str], default None
Allows custom control over late materialization. See
``ScannerBuilder.late_materialization`` for more information.
use_scalar_index: bool, default True
Allows custom control over scalar index usage. See
``ScannerBuilder.use_scalar_index`` for more information.
with_row_id: bool, default False
Return row ID.
with_row_address: bool, default False
Expand Down Expand Up @@ -512,6 +522,7 @@ def to_table(
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead,
late_materialization=late_materialization,
use_scalar_index=use_scalar_index,
scan_in_order=scan_in_order,
prefilter=prefilter,
with_row_id=with_row_id,
Expand Down Expand Up @@ -572,6 +583,7 @@ def to_batches(
full_text_query: Optional[Union[str, dict]] = None,
io_buffer_size: Optional[int] = None,
late_materialization: Optional[bool | List[str]] = None,
use_scalar_index: Optional[bool] = None,
**kwargs,
) -> Iterator[pa.RecordBatch]:
"""Read the dataset as materialized record batches.
Expand All @@ -596,6 +608,7 @@ def to_batches(
batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead,
late_materialization=late_materialization,
use_scalar_index=use_scalar_index,
scan_in_order=scan_in_order,
prefilter=prefilter,
with_row_id=with_row_id,
Expand Down Expand Up @@ -2305,6 +2318,7 @@ def __init__(self, ds: LanceDataset):
self._use_stats = True
self._fast_search = None
self._full_text_query = None
self._use_scalar_index = None

def batch_size(self, batch_size: int) -> ScannerBuilder:
"""Set batch size for Scanner"""
Expand Down Expand Up @@ -2468,6 +2482,17 @@ def use_stats(self, use_stats: bool = True) -> ScannerBuilder:
self._use_stats = use_stats
return self

def use_scalar_index(self, use_scalar_index: bool = True) -> ScannerBuilder:
"""
Set whether scalar indices should be used in a query
Scans will use scalar indices, when available, to optimize queries with filters.
However, in some corner cases, scalar indices may make performance worse. This
parameter allows users to disable scalar indices in these cases.
"""
self._use_scalar_index = use_scalar_index
return self

def with_fragments(
self, fragments: Optional[Iterable[LanceFragment]]
) -> ScannerBuilder:
Expand Down Expand Up @@ -2583,6 +2608,7 @@ def to_scanner(self) -> LanceScanner:
self._fast_search,
self._full_text_query,
self._late_materialization,
self._use_scalar_index,
)
return LanceScanner(scanner, self.ds)

Expand Down
16 changes: 16 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,22 @@ def test_late_materialization_batch_size(tmp_path: Path):
assert batch.num_rows == 32


def test_use_scalar_index(tmp_path: Path):
table = pa.table({"filter": range(100)})
dataset = lance.write_dataset(table, tmp_path)
dataset.create_scalar_index("filter", "BTREE")

assert "MaterializeIndex" in dataset.scanner(filter="filter = 10").explain_plan(
True
)
assert "MaterializeIndex" in dataset.scanner(
filter="filter = 10", use_scalar_index=True
).explain_plan(True)
assert "MaterializeIndex" not in dataset.scanner(
filter="filter = 10", use_scalar_index=False
).explain_plan(True)


EXPECTED_DEFAULT_STORAGE_VERSION = "2.0"
EXPECTED_MAJOR_VERSION = 2
EXPECTED_MINOR_VERSION = 0
Expand Down
5 changes: 5 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ impl Dataset {
fast_search: Option<bool>,
full_text_query: Option<&PyDict>,
late_materialization: Option<PyObject>,
use_scalar_index: Option<bool>,
) -> PyResult<Scanner> {
let mut scanner: LanceScanner = self_.ds.scan();
match (columns, columns_with_transform) {
Expand Down Expand Up @@ -675,6 +676,10 @@ impl Dataset {
}
}

if let Some(use_scalar_index) = use_scalar_index {
scanner.use_scalar_index(use_scalar_index);
}

if let Some(nearest) = nearest {
let column = nearest
.get_item("column")?
Expand Down
56 changes: 54 additions & 2 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ pub struct Scanner {

nearest: Option<Query>,

/// If false, do not use any scalar indices for the scan
///
/// This can be used to pick a more efficient plan for certain queries where
/// scalar indices do not work well (though we should also improve our planning
/// to handle this better in the future as well)
use_scalar_index: bool,

/// Scan the dataset with a meta column: "_rowid"
with_row_id: bool,

Expand Down Expand Up @@ -296,6 +303,7 @@ impl Scanner {
ordered: true,
fragments: None,
fast_search: false,
use_scalar_index: true,
}
}

Expand Down Expand Up @@ -530,6 +538,16 @@ impl Scanner {
self
}

/// Set whether to use scalar index.
///
/// By default, scalar indices will be used to optimize a query if available.
/// However, in some corner cases, scalar indices may not be the best choice.
/// This option allows users to disable scalar indices for a query.
pub fn use_scalar_index(&mut self, use_scalar_index: bool) -> &mut Self {
self.use_scalar_index = use_scalar_index;
self
}

/// Set limit and offset.
///
/// If offset is set, the first offset rows will be skipped. If limit is set,
Expand Down Expand Up @@ -1016,8 +1034,7 @@ impl Scanner {
});
}
// Scalar indices are only used when prefiltering
// TODO: Should we use them when postfiltering if there is no vector search?
let use_scalar_index = self.prefilter || self.nearest.is_none();
let use_scalar_index = self.use_scalar_index && (self.prefilter || self.nearest.is_none());

let planner = Planner::new(Arc::new(self.dataset.schema().into()));

Expand Down Expand Up @@ -4602,6 +4619,26 @@ mod test {
)
.await?;

assert_plan_equals(
&dataset.dataset,
|scan| {
Ok(scan
.nearest("vec", &q, 5)?
.use_scalar_index(false)
.filter("i > 10")?
.prefilter(true))
},
"ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
CoalesceBatchesExec: target_batch_size=8192
SortExec: TopK(fetch=5), expr=...
ANNSubIndex: name=..., k=5, deltas=1
ANNIvfPartition: uuid=..., nprobes=1, deltas=1
FilterExec: i@0 > 10
LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false",
)
.await?;

dataset.append_new_data().await?;

assert_plan_equals(
Expand Down Expand Up @@ -4680,6 +4717,21 @@ mod test {
)
.await?;

assert_plan_equals(
&dataset.dataset,
|scan| {
scan.project(&["s"])?
.use_scalar_index(false)
.filter("i > 10")
},
"ProjectionExec: expr=[s@2 as s]
Take: columns=\"i, _rowid, (s)\"
CoalesceBatchesExec: target_batch_size=8192
FilterExec: i@0 > 10
LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=true",
)
.await?;

// Empty projection
assert_plan_equals(
&dataset.dataset,
Expand Down

0 comments on commit 83482f8

Please sign in to comment.