Skip to content

Commit

Permalink
Add iceberg datasource (ray-project#46889)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

This PR adds the capability to load an Iceberg table into a Ray Dataset.
Compared to the PyIceberg functionality, which can only materialize the
entire Iceberg table into a single `pyarrow` table first, which is then
converted to a Ray dataset, this PR allows a streaming implementation,
where the Iceberg table can be distributed into a Ray Dataset.

## Related issue number

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Dev <[email protected]>
Signed-off-by: dev-goyal <[email protected]>
Signed-off-by: Alan Guo <[email protected]>
Signed-off-by: tungh2 <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Scott Lee <[email protected]>
Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Galen Wang <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Matthew Owen <[email protected]>
Signed-off-by: cristianjd <[email protected]>
Signed-off-by: Justin Yu <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Sven Mika <[email protected]>
Co-authored-by: Alan Guo <[email protected]>
Co-authored-by: tungh2 <[email protected]>
Co-authored-by: Jiajun Yao <[email protected]>
Co-authored-by: Scott Lee <[email protected]>
Co-authored-by: Ruiyang Wang <[email protected]>
Co-authored-by: Galen Wang <[email protected]>
Co-authored-by: Max van Dijck <[email protected]>
Co-authored-by: slfan1989 <[email protected]>
Co-authored-by: Deepyaman Datta <[email protected]>
Co-authored-by: Samuel Chan <[email protected]>
Co-authored-by: Matthew Owen <[email protected]>
Co-authored-by: cristianjd <[email protected]>
Co-authored-by: Justin Yu <[email protected]>
  • Loading branch information
16 people authored and simonsays1980 committed Aug 16, 2024
1 parent 8e0fd3d commit a7d4adf
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 14 deletions.
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ Delta Sharing

read_delta_sharing_tables

Iceberg
-------

.. autosummary::
:nosignatures:
:toctree: doc/

read_iceberg

Lance
-----

Expand Down
27 changes: 27 additions & 0 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,33 @@ Ray Data interoperates with distributed data processing frameworks like
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

.. tab-item:: Iceberg

To create a :class:`~ray.data.dataset.Dataset` from an `Iceberg Table
<https://iceberg.apache.org>`__,
call :func:`~ray.data.read_iceberg`. This function creates a ``Dataset`` backed by
the distributed files that underlie the Iceberg table.

..
.. testcode::
:skipif: True

>>> import ray
>>> from pyiceberg.expressions import EqualTo
>>> ds = ray.data.read_iceberg(
... table_identifier="db_name.table_name",
... row_filter=EqualTo("column_name", "literal_value"),
... catalog_kwargs={"name": "default", "type": "glue"}
... )


.. testoutput::

{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

.. tab-item:: Modin

To create a :class:`~ray.data.dataset.Dataset` from a Modin DataFrame, call
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_iceberg",
size = "medium",
srcs = ["tests/test_iceberg.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_mars",
size = "medium",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
read_databricks_tables,
read_datasource,
read_delta_sharing_tables,
read_iceberg,
read_images,
read_json,
read_lance,
Expand Down Expand Up @@ -139,6 +140,7 @@
"read_csv",
"read_datasource",
"read_delta_sharing_tables",
"read_iceberg",
"read_images",
"read_json",
"read_lance",
Expand Down
230 changes: 230 additions & 0 deletions python/ray/data/_internal/datasource/iceberg_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""
Module to read an iceberg table into a Ray Dataset, by using the Ray Datasource API.
"""

import heapq
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union

from ray.data._internal.util import _check_import
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog
from pyiceberg.expressions import BooleanExpression
from pyiceberg.manifest import DataFile, DataFileContent
from pyiceberg.table import DataScan, FileScanTask, Schema

logger = logging.getLogger(__name__)


@DeveloperAPI
class IcebergDatasource(Datasource):
"""
Iceberg datasource to read Iceberg tables into a Ray Dataset. This module heavily
uses PyIceberg to read iceberg tables. All the routines in this class override
`ray.data.Datasource`.
"""

def __init__(
self,
table_identifier: str,
row_filter: Union[str, "BooleanExpression"] = None,
selected_fields: Tuple[str, ...] = ("*",),
snapshot_id: Optional[int] = None,
scan_kwargs: Optional[Dict[str, Any]] = None,
catalog_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Initialize an IcebergDatasource.
Args:
table_identifier: Fully qualified table identifier (i.e.,
"db_name.table_name")
row_filter: A PyIceberg BooleanExpression to use to filter the data *prior*
to reading
selected_fields: Which columns from the data to read, passed directly to
PyIceberg's load functions
snapshot_id: Optional snapshot ID for the Iceberg table
scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan()
function
catalog_kwargs: Optional arguments to use when setting up the Iceberg
catalog
"""
_check_import(self, module="pyiceberg", package="pyiceberg")
from pyiceberg.expressions import AlwaysTrue

self._scan_kwargs = scan_kwargs if scan_kwargs is not None else {}
self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {}

if "name" in self._catalog_kwargs:
self._catalog_name = self._catalog_kwargs.pop("name")
else:
self._catalog_name = "default"

self.table_identifier = table_identifier

self._row_filter = row_filter if row_filter is not None else AlwaysTrue()
self._selected_fields = selected_fields

if snapshot_id:
self._scan_kwargs["snapshot_id"] = snapshot_id

self._plan_files = None

def _get_catalog(self) -> "Catalog":
from pyiceberg import catalog

return catalog.load_catalog(self._catalog_name, **self._catalog_kwargs)

@property
def plan_files(self) -> List["FileScanTask"]:
"""
Return the plan files specified by this query
"""
# Calculate and cache the plan_files if they don't already exist
if self._plan_files is None:
data_scan = self._get_data_scan()
self._plan_files = data_scan.plan_files()

return self._plan_files

def _get_data_scan(self) -> "DataScan":
catalog = self._get_catalog()
table = catalog.load_table(self.table_identifier)

data_scan = table.scan(
row_filter=self._row_filter,
selected_fields=self._selected_fields,
**self._scan_kwargs,
)

return data_scan

def estimate_inmemory_data_size(self) -> Optional[int]:
# Approximate the size by using the plan files - this will not
# incorporate the deletes, but that's a reasonable approximation
# task
return sum(task.file.file_size_in_bytes for task in self.plan_files)

@staticmethod
def _distribute_tasks_into_equal_chunks(
plan_files: Iterable["FileScanTask"], n_chunks: int
) -> List[List["FileScanTask"]]:
"""
Implement a greedy knapsack algorithm to distribute the files in the scan
across tasks, based on their file size, as evenly as possible
"""
chunks = [list() for _ in range(n_chunks)]

chunk_sizes = [(0, chunk_id) for chunk_id in range(n_chunks)]
heapq.heapify(chunk_sizes)

# From largest to smallest, add the plan files to the smallest chunk one at a
# time
for plan_file in sorted(
plan_files, key=lambda f: f.file.file_size_in_bytes, reverse=True
):
smallest_chunk = heapq.heappop(chunk_sizes)
chunks[smallest_chunk[1]].append(plan_file)
heapq.heappush(
chunk_sizes,
(
smallest_chunk[0] + plan_file.file.file_size_in_bytes,
smallest_chunk[1],
),
)

return chunks

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
from pyiceberg.io import pyarrow as pyi_pa_io

def _get_read_task(
tasks: Iterable["FileScanTask"],
table_identifier: str,
schema: "Schema",
) -> Iterable[Block]:
# Closure so we can pass this callable as an argument to ReadTask

# Both the catalog and tbl attributes cannot be pickled, which means they
# must be instantiated within this function (as opposed to being attributes
# of the IcebergDatasource class)
catalog = self._get_catalog()
tbl = catalog.load_table(table_identifier)

# Use the PyIceberg API to read only a single task (specifically, a
# FileScanTask) - note that this is not as simple as reading a single
# parquet file, as there might be delete files, etc. associated, so we
# must use the PyIceberg API for the projection.
yield pyi_pa_io.project_table(
tasks=tasks,
table_metadata=tbl.metadata,
io=tbl.io,
row_filter=self._row_filter,
projected_schema=schema,
case_sensitive=self._scan_kwargs.get("case_sensitive", True),
limit=self._scan_kwargs.get("limit"),
)

# Get the PyIceberg scan
data_scan = self._get_data_scan()
# Get the plan files in this query
plan_files = self.plan_files

# Get the projected schema for this scan, given all the row filters,
# snapshot ID, etc.
projected_schema = data_scan.projection()
# Get the arrow schema, to set in the metadata
pya_schema = pyi_pa_io.schema_to_pyarrow(projected_schema)

# Set the n_chunks to the min of the number of plan files and the actual
# requested n_chunks, so that there are no empty tasks
if parallelism > len(list(plan_files)):
parallelism = len(list(plan_files))
logger.warning(
f"Reducing the parallelism to {parallelism}, as that is the"
"number of files"
)

read_tasks = []
# Chunk the plan files based on the requested parallelism
for chunk_tasks in IcebergDatasource._distribute_tasks_into_equal_chunks(
plan_files, parallelism
):
unique_deletes: Set[DataFile] = set(
itertools.chain.from_iterable(
[task.delete_files for task in chunk_tasks]
)
)
# Get a rough estimate of the number of deletes by just looking at
# position deletes. Equality deletes are harder to estimate, as they
# can delete multiple rows.
position_delete_count = sum(
delete.record_count
for delete in unique_deletes
if delete.content == DataFileContent.POSITION_DELETES
)
metadata = BlockMetadata(
num_rows=sum(task.file.record_count for task in chunk_tasks)
- position_delete_count,
size_bytes=sum(task.length for task in chunk_tasks),
schema=pya_schema,
input_files=[task.file.file_path for task in chunk_tasks],
exec_stats=None,
)
read_tasks.append(
ReadTask(
read_fn=lambda tasks=chunk_tasks: _get_read_task(
tasks=tasks,
table_identifier=self.table_identifier,
schema=projected_schema,
),
metadata=metadata,
)
)

return read_tasks
Loading

0 comments on commit a7d4adf

Please sign in to comment.