Skip to content

Commit

Permalink
[Data] Add read API for Delta sharing tables (#46072)
Browse files Browse the repository at this point in the history
Databricks prefers to use deltasharing rather than execution statement
SQL to share data to external.
Using execution statement SQL (current databricks_uc_datasource) will
have a 100GB limit, data will be truncated.

https://github.com/ray-project/ray/blob/master/python/ray/data/datasource/databricks_uc_datasource.py#L99-L103

This is the design/decision from databricks. However, using delta
sharing does not have this limit.
I have tested locally to pull 180+ GB data from databricks to ray data.

With this, transferring data from one of the most famous/trusted data
management system Databricks to Ray will be much easier. Otherwise, the
best solution is use a spark job dump data to cloud drive (s3) and then
read from there.

---------

Signed-off-by: Wenyue Liu <[email protected]>
  • Loading branch information
brucebismarck authored Jul 17, 2024
1 parent 424a876 commit 593d04a
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 2 deletions.
11 changes: 9 additions & 2 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ Databricks

read_databricks_tables

Delta Sharing
-------------

.. autosummary::
:nosignatures:
:toctree: doc/

read_delta_sharing_tables

Lance
-----

Expand Down Expand Up @@ -315,5 +324,3 @@ MetadataProvider API
datasource.DefaultFileMetadataProvider
datasource.ParquetMetadataProvider
datasource.FastFileMetadataProvider


8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_delta_sharing",
size = "small",
srcs = ["tests/test_delta_sharing.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
read_csv,
read_databricks_tables,
read_datasource,
read_delta_sharing_tables,
read_images,
read_json,
read_lance,
Expand Down Expand Up @@ -136,6 +137,7 @@
"read_binary_files",
"read_csv",
"read_datasource",
"read_delta_sharing_tables",
"read_images",
"read_json",
"read_lance",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Reader,
ReadTask,
)
from ray.data.datasource.delta_sharing_datasource import DeltaSharingDatasource
from ray.data.datasource.file_based_datasource import (
FileBasedDatasource,
_S3FileSystemWrapper,
Expand Down Expand Up @@ -38,6 +39,7 @@
"Connection",
"Datasink",
"Datasource",
"DeltaSharingDatasource",
"DefaultFileMetadataProvider",
"DummyOutputDatasink",
"FastFileMetadataProvider",
Expand Down
128 changes: 128 additions & 0 deletions python/ray/data/datasource/delta_sharing_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging
from json import loads
from typing import List, Optional, Tuple

import numpy as np

from ray.data._internal.util import _check_import
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

logger = logging.getLogger(__name__)


@DeveloperAPI
class DeltaSharingDatasource(Datasource):
def __init__(
self,
url: str,
json_predicate_hints: Optional[str] = None,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
):
_check_import(self, module="delta_sharing", package="delta-sharing")

if limit is not None:
assert (
isinstance(limit, int) and limit >= 0
), "'limit' must be a non-negative int"

self._url = url
self._json_predicate_hints = json_predicate_hints
self._limit = limit
self._version = version
self._timestamp = timestamp

def estimate_inmemory_data_size(self) -> Optional[int]:
return None

def _read_files(self, files, converters):
"""Read files with Delta Sharing."""
from delta_sharing.reader import DeltaSharingReader

for file in files:
yield DeltaSharingReader._to_pandas(
action=file, converters=converters, for_cdf=False, limit=None
)

def setup_delta_sharing_connections(self, url: str):
"""
Set up delta sharing connections based on the url.
:param url: a url under the format "<profile>#<share>.<schema>.<table>"
:
"""
from delta_sharing.protocol import DeltaSharingProfile, Table
from delta_sharing.rest_client import DataSharingRestClient

profile_str, share, schema, table_str = _parse_delta_sharing_url(url)
table = Table(name=table_str, share=share, schema=schema)

profile = DeltaSharingProfile.read_from_file(profile_str)
rest_client = DataSharingRestClient(profile)
return table, rest_client

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
assert parallelism > 0, f"Invalid parallelism {parallelism}"
from delta_sharing.converter import to_converters

self._table, self._rest_client = self.setup_delta_sharing_connections(self._url)
self._response = self._rest_client.list_files_in_table(
self._table,
jsonPredicateHints=self._json_predicate_hints,
limitHint=self._limit,
version=self._version,
timestamp=self._timestamp,
)

if len(self._response.add_files) == 0 or self._limit == 0:
logger.warning("No files found from the delta sharing table or limit is 0")

schema_json = loads(self._response.metadata.schema_string)
self._converters = to_converters(schema_json)

read_tasks = []
# get file list to be read in this task and preserve original chunk order
for files in np.array_split(self._response.add_files, parallelism):
files = files.tolist()
metadata = BlockMetadata(
num_rows=None,
schema=None,
input_files=files,
size_bytes=None,
exec_stats=None,
)
converters = self._converters
read_task = ReadTask(
lambda f=files: self._read_files(f, converters),
metadata,
)
read_tasks.append(read_task)

return read_tasks


def _parse_delta_sharing_url(url: str) -> Tuple[str, str, str, str]:
"""
Developed from delta_sharing's _parse_url function.
https://github.com/delta-io/delta-sharing/blob/main/python/delta_sharing/delta_sharing.py#L36
Args:
url: a url under the format "<profile>#<share>.<schema>.<table>"
Returns:
a tuple with parsed (profile, share, schema, table)
"""
shape_index = url.rfind("#")
if shape_index < 0:
raise ValueError(f"Invalid 'url': {url}")
profile = url[0:shape_index]
fragments = url[shape_index + 1 :].split(".")
if len(fragments) != 3:
raise ValueError(f"Invalid 'url': {url}")
share, schema, table = fragments
if len(profile) == 0 or len(share) == 0 or len(schema) == 0 or len(table) == 0:
raise ValueError(f"Invalid 'url': {url}")
return (profile, share, schema, table)
87 changes: 87 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
BaseFileMetadataProvider,
Connection,
Datasource,
DeltaSharingDatasource,
ParquetMetadataProvider,
PathPartitionFilter,
)
Expand Down Expand Up @@ -2644,6 +2645,92 @@ def from_arrow_refs(
)


@PublicAPI(stability="alpha")
def read_delta_sharing_tables(
url: str,
*,
limit: Optional[int] = None,
version: Optional[int] = None,
timestamp: Optional[str] = None,
json_predicate_hints: Optional[str] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
) -> Dataset:
"""
Read data from a Delta Sharing table.
Delta Sharing projct https://github.com/delta-io/delta-sharing/tree/main
This function reads data from a Delta Sharing table specified by the URL.
It supports various options such as limiting the number of rows, specifying
a version or timestamp, and configuring concurrency.
Before calling this function, ensure that the URL is correctly formatted
to point to the Delta Sharing table you want to access. Make sure you have
a valid delta_share profile in the working directory.
Examples:
.. testcode::
:skipif: True
import ray
ds = ray.data.read_delta_sharing_tables(
url=f"your-profile.json#your-share-name.your-schema-name.your-table-name",
limit=100000,
version=1,
)
Args:
url: A URL under the format
"<profile-file-path>#<share-name>.<schema-name>.<table-name>".
Example can be found at
https://github.com/delta-io/delta-sharing/blob/main/README.md#quick-start
limit: A non-negative integer. Load only the ``limit`` rows if the
parameter is specified. Use this optional parameter to explore the
shared table without loading the entire table into memory.
version: A non-negative integer. Load the snapshot of the table at
the specified version.
timestamp: A timestamp to specify the version of the table to read.
json_predicate_hints: Predicate hints to be applied to the table. For more
details, see:
https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering.
ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control the number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
Returns:
A :class:`Dataset` containing the queried data.
Raises:
ValueError: If the URL is not properly formatted or if there is an issue
with the Delta Sharing table connection.
"""

datasource = DeltaSharingDatasource(
url=url,
json_predicate_hints=json_predicate_hints,
limit=limit,
version=version,
timestamp=timestamp,
)
# DeltaSharing limit is at the add_files level, it will not return
# exactly the limit number of rows but it will return less files and rows.
return ray.data.read_datasource(
datasource=datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)


@PublicAPI
def from_spark(
df: "pyspark.sql.DataFrame",
Expand Down
Loading

0 comments on commit 593d04a

Please sign in to comment.