-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] Add LanceDB Datasource #44853
[data] Add LanceDB Datasource #44853
Conversation
This PR adds a new datasource for Ray Data that reads from LanceDB. This datasource is a thin wrapper around the LanceDB Python client that allows users to read data from LanceDB into Ray Data. On branch anyscalebrent/lancedb_datasource Changes to be committed: modified: python/ray/data/__init__.py modified: python/ray/data/datasource/__init__.py new file: python/ray/data/datasource/lancedb_datasource.py modified: python/ray/data/read_api.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/__init__.py modified: python/ray/data/datasource/lancedb_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/read_api.py
self.columns = columns | ||
self.filter = filter | ||
|
||
self.lance_ds = lance.dataset(uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if pickling preserves this today (we should fix that if it doesn't), but it might also be worth exposing the storage_options
parameter from lance.dataset()
. That will allow users to pass down credentials and other configurations for using object store (such as S3).
|
||
def __init__( | ||
self, | ||
uri: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth just supporting passing in a already configured LanceDataset
. Then you don't have to reproduce all the same options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, either that or allowing kwargs
to get passed down.
read_task = ReadTask( | ||
lambda fragment=fragment: [_read_single_fragment(fragment)], | ||
metadata, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fragments could be 100 GB or more. Maybe something we can implement later, but IMO it would be nice to let the user set the block size they want (in terms of # of rows), and then just slice the files according to that block size. We support partial scans of files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, "per-fragment" is probably too course for the long term. Right now though it might be tricky to know, up front, exactly how many batches will be generated (will depend on the row group size of the fragment which I'm not sure we make available)
I think we could probably add an API that, given a read size, will tell you how many batches will be generated for a fragment. Would that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if ray supported the idea of a streaming source (e.g. a ReadTask
that returns a RecordBatchReader
) then a per-fragment API might actually work quite well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Ray Data supports streaming read in batches. Should we use https://lancedb.github.io/lance/read_and_write.html#iterative-read for per-fragment API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this. Here are a few thoughts from a lance perspective.
|
||
def __init__( | ||
self, | ||
uri: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, either that or allowing kwargs
to get passed down.
read_task = ReadTask( | ||
lambda fragment=fragment: [_read_single_fragment(fragment)], | ||
metadata, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, "per-fragment" is probably too course for the long term. Right now though it might be tricky to know, up front, exactly how many batches will be generated (will depend on the row group size of the fragment which I'm not sure we make available)
I think we could probably add an API that, given a read size, will tell you how many batches will be generated for a fragment. Would that work?
read_task = ReadTask( | ||
lambda fragment=fragment: [_read_single_fragment(fragment)], | ||
metadata, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, if ray supported the idea of a streaming source (e.g. a ReadTask
that returns a RecordBatchReader
) then a per-fragment API might actually work quite well.
If not specified, all columns are read. | ||
filter: The filter to apply to the dataset. | ||
If not specified, no filter is applied. | ||
parallelism: Degree of parallelism to use for the Dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can Ray work with datasets that do their own parallelism? For example, you could read a fragment with batch_readahead
and lance will do multi-threading on its own. This is a bit more efficient than kicking off a bunch of "read_batch" tasks since we only have to read/parse/decode the metadata a single time instead of multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, within each Ray task (per-fragment), multi-threading is allowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @brent-anyscale working on this!
read_task = ReadTask( | ||
lambda fragment=fragment: [_read_single_fragment(fragment)], | ||
metadata, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Ray Data supports streaming read in batches. Should we use https://lancedb.github.io/lance/read_and_write.html#iterative-read for per-fragment API?
If not specified, all columns are read. | ||
filter: The filter to apply to the dataset. | ||
If not specified, no filter is applied. | ||
parallelism: Degree of parallelism to use for the Dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, within each Ray task (per-fragment), multi-threading is allowed.
return table | ||
|
||
read_tasks = [] | ||
for fragment in self.fragments: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism
is not respected here. We should compare the value of parallelism
and fragments
, to make sure the number of ReadTask
is no more than parallelism
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be resolved - can you review?
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/__init__.py modified: python/ray/data/datasource/__init__.py renamed: python/ray/data/datasource/lancedb_datasource.py -> python/ray/data/datasource/lance_datasource.py modified: python/ray/data/read_api.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/__init__.py modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/__init__.py modified: python/ray/data/datasource/__init__.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/read_api.py Signed-off-by: Brent Bain <[email protected]>
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/read_api.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> The __init__ method of the LanceDatasource class now uses Optional instead of Union for the parameters. Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
This change updates lance_datasource to a simpler implementation of to_batches. Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Yield isn't working as expected. Changing back to return. Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py
if parallelism > len(self.fragments): | ||
parallelism = len(self.fragments) | ||
logger.warning( | ||
f"Reducing the parallelism to {parallelism}, as that is the " | ||
"number of files" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the behavior we want, let's remove this.
if len(fragments) <= 0: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when this can happen?
Changes to lance_datasource parallelism handling. Added initial test for lance_datasource. Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py modified: python/ray/data/read_api.py new file: python/ray/data/tests/test_lance.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/datasource/lance_datasource.py modified: python/ray/data/tests/test_lance.py
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/requirements/ml/data-test-requirements.txt
Signed-off-by: Brent Bain <[email protected]> Changes to be committed: modified: python/ray/data/BUILD
Closing in favor of #45106 |
Why are these changes needed?
This PR adds the capability to load a LanceDB dataset into a Ray Dataset.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.