-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Add LanceDB Datasource #44853
Changes from all commits
be6051e
2db26aa
50f2bef
43f3dd2
4577925
e2d7419
94ce5f0
5f4b253
486b71e
55ce8e6
2afef9a
757113a
931c6fe
5de8caa
db4c528
f39b18e
9b07881
cf3e700
02b4835
072ca1c
eb07726
a26625a
9b0d4d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import logging | ||
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional | ||
|
||
import lance | ||
import numpy as np | ||
import pyarrow as pa | ||
from lance import LanceFragment | ||
|
||
from ray.data import ReadTask | ||
from ray.data.block import BlockMetadata | ||
from ray.data.datasource import Datasource | ||
from ray.util.annotations import DeveloperAPI | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
if TYPE_CHECKING: | ||
import pyarrow | ||
|
||
|
||
@DeveloperAPI | ||
class LanceDatasource(Datasource): | ||
"""Lance Datasource | ||
Read a Lance Dataset as a Ray Dataset | ||
|
||
Parameters | ||
---------- | ||
uri : str | ||
The base URI of the Lance dataset. | ||
columns: list | ||
A list of columns to return from the dataset. | ||
filter: str | ||
A standard SQL expressions as predicates for dataset filtering. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
uri: str, | ||
columns: Optional[List[str]] = None, | ||
filter: Optional[str] = None, | ||
storage_options: Optional[Dict[str, str]] = None, | ||
): | ||
self.uri = uri | ||
self.columns = columns | ||
self.filter = filter | ||
self.storage_options = storage_options | ||
|
||
self.lance_ds = lance.dataset(uri=uri, storage_options=storage_options) | ||
self.fragments = self.lance_ds.get_fragments() | ||
|
||
def get_read_tasks(self, parallelism: int) -> List[ReadTask]: | ||
# Read multiple fragments in parallel | ||
# Each Ray Data Block contains a Pandas RecordBatch | ||
def _read_fragments( | ||
fragments: List[LanceFragment], | ||
) -> Iterator["pyarrow.Table"]: | ||
for fragment in fragments: | ||
batches = fragment.to_batches(columns=self.columns, filter=self.filter) | ||
for batch in batches: | ||
yield pa.Table.from_batches([batch]) | ||
|
||
read_tasks = [] | ||
for fragments in np.array_split(self.fragments, parallelism): | ||
if len(fragments) <= 0: | ||
continue | ||
|
||
metadata = BlockMetadata( | ||
num_rows=None, | ||
schema=None, | ||
input_files=None, | ||
size_bytes=None, | ||
exec_stats=None, | ||
) | ||
|
||
read_task = ReadTask( | ||
lambda fragments=fragments: _read_fragments(fragments), | ||
metadata, | ||
) | ||
read_tasks.append(read_task) | ||
|
||
return read_tasks | ||
|
||
def estimate_inmemory_data_size(self) -> Optional[int]: | ||
# TODO: Add memory size estimation to improve auto-tune of parallelism. | ||
return None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
Datasource, | ||
ImageDatasource, | ||
JSONDatasource, | ||
LanceDatasource, | ||
MongoDatasource, | ||
NumpyDatasource, | ||
ParquetBaseDatasource, | ||
|
@@ -2900,6 +2901,65 @@ def from_torch( | |
) | ||
|
||
|
||
@PublicAPI | ||
def read_lance( | ||
uri: str, | ||
*, | ||
columns: Optional[List[str]] = None, | ||
filter: Optional[str] = None, | ||
storage_options: Optional[Dict[str, str]] = None, | ||
parallelism: int = -1, | ||
ray_remote_args: Optional[Dict[str, Any]] = None, | ||
concurrency: Optional[int] = None, | ||
override_num_blocks: Optional[int] = None, | ||
) -> Dataset: | ||
""" | ||
Create a :class:`~ray.data.Dataset` from a Lance dataset. The dataset to read from | ||
is specified using a fully qualified ```uri```. Using Lance, any | ||
intended columns or filters are applied, and the files that satisfy | ||
the query are distributed across Ray read tasks. The number of tasks is | ||
determined by ``parallelism`` which can be requested from this interface or | ||
automatically chosen if unspecified (see the``parallelism`` arg below). | ||
|
||
Examples: | ||
>>> import ray | ||
>>> ds = ray.data.read_lance( | ||
... uri="./db_name.lance", | ||
... columns=["column_name", "label"], | ||
... filter="label = 2 AND column_name IS NOT NULL", | ||
... parallelism=64 | ||
... ) | ||
|
||
Tip: | ||
|
||
For more details about these Lance concepts, see the following: | ||
- URI: https://lancedb.github.io/lance/read_and_write.html#object-store-configuration | ||
|
||
Args: | ||
uri: The URI of the source Lance dataset to read from. | ||
Currently supports local file paths, S3, GCS, and AZ URIs are supported. | ||
columns: The columns to read from the dataset. | ||
If not specified, all columns are read. | ||
filter: The filter to apply to the dataset. | ||
If not specified, no filter is applied. | ||
parallelism: Degree of parallelism to use for the Dataset | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can Ray work with datasets that do their own parallelism? For example, you could read a fragment with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, within each Ray task (per-fragment), multi-threading is allowed. |
||
ray_remote_args: Optional arguments to pass to `ray.remote` in the read tasks | ||
|
||
Returns: | ||
A :class:`~ray.data.Dataset` the Lance dataset from the results of executing the read. | ||
""" # noqa: E501 | ||
datasource = LanceDatasource(uri=uri, columns=columns, filter=filter) | ||
|
||
return read_datasource( | ||
datasource=datasource, | ||
parallelism=parallelism, | ||
storage_options=storage_options, | ||
ray_remote_args=ray_remote_args, | ||
concurrency=concurrency, | ||
override_num_locks=override_num_blocks, | ||
) | ||
|
||
|
||
def _get_datasource_or_legacy_reader( | ||
ds: Datasource, | ||
ctx: DataContext, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import os | ||
|
||
import lance | ||
import pyarrow as pa | ||
import pytest | ||
from pytest_lazyfixture import lazy_fixture | ||
from read_api import read_lance | ||
|
||
from ray.data.datasource.path_util import _unwrap_protocol | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"fs,data_path", | ||
[ | ||
(None, lazy_fixture("local_path")), | ||
(lazy_fixture("local_fs"), lazy_fixture("local_path")), | ||
(lazy_fixture("s3_fs"), lazy_fixture("s3_path")), | ||
( | ||
lazy_fixture("s3_fs_with_space"), | ||
lazy_fixture("s3_path_with_space"), | ||
), # Path contains space. | ||
( | ||
lazy_fixture("s3_fs_with_anonymous_crendential"), | ||
lazy_fixture("s3_path_with_anonymous_crendential"), | ||
), | ||
], | ||
) | ||
def test_lance_read_basic(fs, data_path): | ||
|
||
df1 = pa.table({"one": [2, 1, 3, 4, 6, 5], "two": ["b", "a", "c", "e", "g", "f"]}) | ||
setup_data_path = _unwrap_protocol(data_path) | ||
path = os.path.join(setup_data_path, "test.lance") | ||
lance.write_dataset(df1, path) | ||
|
||
ds_lance = lance.dataset(path) | ||
df2 = pa.table( | ||
{ | ||
"one": [1, 2, 3, 4, 5, 6], | ||
"three": [4, 5, 8, 9, 12, 13], | ||
"four": ["u", "v", "w", "x", "y", "z"], | ||
} | ||
) | ||
ds_lance.merge(df2, "one") | ||
|
||
ds = read_lance(path) | ||
|
||
# Test metadata-only lance ops. | ||
assert ds.count() == 6 | ||
assert ds.size_bytes() > 0 | ||
assert ds.schema() is not None | ||
|
||
# todo: brentb input_files test | ||
# input_files = ds.input_files() | ||
# assert len(input_files) == 2, input_files | ||
# assert ".lance" in str(input_files) | ||
|
||
assert ( | ||
" ".join(str(ds).split()) | ||
== "Dataset( num_rows=6, schema={one: int64, two: string, three: int64, four: string} )" # noqa: E501 | ||
), ds | ||
assert ( | ||
" ".join(repr(ds).split()) | ||
== "Dataset( num_rows=6, schema={one: int64, two: string, three: int64, four: string} )" # noqa: E501 | ||
), ds | ||
|
||
# Forces a data read. | ||
values = [[s["one"], s["two"]] for s in ds.take_all()] | ||
assert sorted(values) == [ | ||
[1, "a"], | ||
[2, "b"], | ||
[3, "c"], | ||
[4, "e"], | ||
[5, "f"], | ||
[6, "g"], | ||
] | ||
|
||
# Test column selection. | ||
ds = read_lance(path, columns=["one"]) | ||
values = [s["one"] for s in ds.take()] | ||
assert sorted(values) == [1, 2, 3, 4, 5, 6] | ||
print(ds.schema().names) | ||
assert ds.schema().names == ["one"] | ||
|
||
# Test concurrency. | ||
ds = read_lance(path, concurrency=1) | ||
values = [s["one"] for s in ds.take()] | ||
assert sorted(values) == [1, 2, 3, 4, 5, 6] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when this can happen?