Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Add LanceDB Datasource #44853

Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
be6051e
add: New LanceDB datasource for Ray Data
brent-anyscale Apr 18, 2024
2db26aa
upd: datasource __init__
brent-anyscale Apr 18, 2024
50f2bef
upd: read_api.py - fix linting errors with line length
brent-anyscale Apr 18, 2024
43f3dd2
upd: rename lancedb resources to lance resources
brent-anyscale Apr 23, 2024
4577925
upd: Additional updates to remove DB from Lance resources
brent-anyscale Apr 23, 2024
e2d7419
upd: Additional updates to remove DB from lance name
brent-anyscale Apr 23, 2024
94ce5f0
Merge branch 'ray-project:master' into anyscalebrent/lancedb_datasource
brent-anyscale Apr 23, 2024
5f4b253
upd: Lance ReadAPI comment for AZ support
brent-anyscale Apr 23, 2024
486b71e
upd: Include limk to LanceDB docs in read_api.py
brent-anyscale Apr 23, 2024
55ce8e6
upd: lance_datasource - remove header comment
brent-anyscale Apr 23, 2024
2afef9a
upd: Change init params to Optional instead of Unions
brent-anyscale Apr 23, 2024
757113a
upd: lance_datasource - change to use to_batches
brent-anyscale Apr 24, 2024
931c6fe
upd: lance_datasource - set parallelism based on number of fragments
brent-anyscale Apr 24, 2024
5de8caa
upd: lance_datasource - change from yield to return
brent-anyscale Apr 24, 2024
db4c528
upd: lance_dataset comment - changing for consistent naming
brent-anyscale Apr 24, 2024
f39b18e
upd: lance_datasource - changed how fragment reading is performed
brent-anyscale Apr 24, 2024
9b07881
upd: lance datasource - comments updated
brent-anyscale Apr 24, 2024
cf3e700
upd: lance_datasource Add storage options to pass to Lance
brent-anyscale Apr 24, 2024
02b4835
Merge branch 'ray-project:master' into anyscalebrent/lancedb_datasource
brent-anyscale Apr 26, 2024
072ca1c
upd: lance datasource
brent-anyscale Apr 26, 2024
eb07726
upd: lance tests linting updates
brent-anyscale Apr 26, 2024
a26625a
upd: data-test-requirements - add lancedb
brent-anyscale Apr 26, 2024
9b0d4d1
upd: data BUILD - and Lance test
brent-anyscale Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
read_datasource,
read_images,
read_json,
read_lance,
read_mongo,
read_numpy,
read_parquet,
Expand Down Expand Up @@ -128,6 +129,7 @@
"read_datasource",
"read_images",
"read_json",
"read_lance",
"read_numpy",
"read_mongo",
"read_parquet",
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from python.ray.data.datasource.lance_datasource import LanceDatasource

from ray.data.datasource.avro_datasource import AvroDatasource
from ray.data.datasource.bigquery_datasink import _BigQueryDatasink
from ray.data.datasource.bigquery_datasource import BigQueryDatasource
Expand Down Expand Up @@ -91,6 +93,7 @@
"ImageDatasource",
"_JSONDatasink",
"JSONDatasource",
"LanceDatasource",
"_NumpyDatasink",
"NumpyDatasource",
"ParquetBaseDatasource",
Expand Down
78 changes: 78 additions & 0 deletions python/ray/data/datasource/lance_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import logging
from typing import List, Optional

import lance
from lance import LanceFragment

from ray.data import ReadTask
from ray.data.block import Block, BlockMetadata
from ray.data.datasource import Datasource
from ray.util.annotations import DeveloperAPI

import pyarrow as pa

logger = logging.getLogger(__name__)


@DeveloperAPI
class LanceDatasource(Datasource):
"""Lance Datasource
Read a Lance table as a Ray Dataset
brent-anyscale marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
uri : str
The base URI of the Lance dataset.
columns: list
A list of columns to return from the dataset.
filter: str
A standard SQL expressions as predicates for dataset filtering.
"""

def __init__(
self,
uri: str,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
):
self.uri = uri
self.columns = columns
self.filter = filter

self.lance_ds = lance.dataset(uri)
self.fragments = self.lance_ds.get_fragments()

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
# To begin with, read one Fragment at a time
# Each Ray Data Block contains a Pandas RecordBatch
def _read_single_fragment(fragment: LanceFragment) -> Block:
# Fetch table from the fragment
batches = fragment.to_batches(columns=self.columns, filter=self.filter)
for batch in batches:
yield pa.Table.from_batches([batch])

read_tasks = []
for fragment in self.fragments:
data_files = ", ".join(
[data_file.path() for data_file in fragment.data_files()]
)

metadata = BlockMetadata(
num_rows=fragment.count_rows(),
size_bytes=None,
schema=fragment.schema,
input_files=[data_files],
exec_stats=None,
)

read_task = ReadTask(
lambda fragment=fragment: [_read_single_fragment(fragment)],
metadata,
)
read_tasks.append(read_task)

return read_tasks

def estimate_inmemory_data_size(self) -> Optional[int]:
# TODO: Add memory size estimation to improve auto-tune of parallelism.
return None
54 changes: 54 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
Datasource,
ImageDatasource,
JSONDatasource,
LanceDBDatasource,
MongoDatasource,
NumpyDatasource,
ParquetBaseDatasource,
Expand Down Expand Up @@ -2900,6 +2901,59 @@ def from_torch(
)


@PublicAPI
def read_lance(
*,
uri: str,
columns: Optional[List[str]] = None,
filter: Optional[str] = None,
parallelism: int = -1,
ray_remote_args: Optional[Dict[str, Any]] = None,
) -> Dataset:
"""
Create a :class:`~ray.data.Dataset` from a Lance dataset. The dataset to read from
is specified using a fully qualified ```uri```. Using Lance, any
intended columns or filters are applied, and the files that satisfy
the query are distributed across Ray read tasks. The number of tasks is
determined by ``parallelism`` which can be requested from this interface or
automatically chosen if unspecified (see the``parallelism`` arg below).

Examples:
>>> import ray
>>> ds = ray.data.read_lance(
... uri="./db_name.lance",
... columns=["column_name", "label"],
... filter="label = 2 AND column_name IS NOT NULL",
... parallelism=64
... )

Tip:

For more details about these Lance concepts, see the following:
- URI: https://lancedb.github.io/lance/read_and_write.html#object-store-configuration

Args:
uri: The URI of the source Lance dataset to read from.
Currently supports local file paths, S3, GCS, and AZ URIs are supported.
columns: The columns to read from the dataset.
If not specified, all columns are read.
filter: The filter to apply to the dataset.
If not specified, no filter is applied.
parallelism: Degree of parallelism to use for the Dataset

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Ray work with datasets that do their own parallelism? For example, you could read a fragment with batch_readahead and lance will do multi-threading on its own. This is a bit more efficient than kicking off a bunch of "read_batch" tasks since we only have to read/parse/decode the metadata a single time instead of multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, within each Ray task (per-fragment), multi-threading is allowed.

ray_remote_args: Optional arguments to pass to `ray.remote` in the read tasks

Returns:
A :class:`~ray.data.Dataset` the Lance dataset from the results of executing the read.
""" # noqa: E501
datasource = LanceDBDatasource(uri=uri, columns=columns, filter=filter)

dataset = read_datasource(
datasource=datasource, parallelism=parallelism, ray_remote_args=ray_remote_args
)

return dataset


def _get_datasource_or_legacy_reader(
ds: Datasource,
ctx: DataContext,
Expand Down
Loading