Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iceberg datasource #46889

Merged
merged 87 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
6dd7660
Add iceberg datasource
dev-goyal Jan 8, 2024
f91ba16
Split files
dev-goyal Jan 10, 2024
a89e31e
Formatting
dev-goyal Jan 10, 2024
1f60623
Add documentation
dev-goyal Jan 10, 2024
39852a8
Update python/ray/data/datasource/iceberg_datasource.py
dev-goyal Mar 20, 2024
12ed16e
Update python/ray/data/datasource/iceberg_datasource.py
dev-goyal Mar 20, 2024
0e16956
PR review comments incorporated
dev-goyal Mar 20, 2024
847fa8b
Fix imports
dev-goyal Apr 21, 2024
36bc297
Add tests
dev-goyal Apr 22, 2024
9cac4ac
test commit
dev-goyal Apr 22, 2024
9ff359a
Remove catalog type param
dev-goyal Apr 22, 2024
8364af7
Lint
dev-goyal Apr 22, 2024
7faa45f
Rebase
dev-goyal Apr 22, 2024
1aca250
Revert irrelevant files
dev-goyal Apr 22, 2024
484b615
Add docs and test dependency
dev-goyal Apr 22, 2024
9f2e5a3
Add catalog example
dev-goyal Apr 22, 2024
24ad9aa
Fix pre-build
dev-goyal Apr 22, 2024
8fa746c
PR review comments
dev-goyal Apr 27, 2024
fab9cac
Painfully rebase
dev-goyal Jul 31, 2024
314a15e
Restore
dev-goyal Jul 31, 2024
eff9cb3
Update read_api
dev-goyal Jul 31, 2024
698cf9c
Incorporate comments
dev-goyal Aug 7, 2024
cb2a20e
Rebase
dev-goyal Aug 7, 2024
cb8fc10
Add back iceberg
dev-goyal Aug 7, 2024
00537a4
Add read_api back
dev-goyal Aug 7, 2024
a230c8f
Add doc to loading_data
dev-goyal Aug 7, 2024
529d491
Update API for lint error
dev-goyal Aug 7, 2024
8e0d82a
Add test
dev-goyal Aug 7, 2024
1ee28b8
Intermediate commit with updated tests
dev-goyal Aug 7, 2024
cdc99ee
Intermediate commit with updated tests
dev-goyal Aug 7, 2024
c324290
Try test
dev-goyal Aug 7, 2024
e9bbabd
Try solo test
dev-goyal Aug 7, 2024
4304925
Linter
dev-goyal Aug 7, 2024
318da73
[RLlib; docs] RLlib docs redo: New API stack Episodes (SingleAgentEpi…
sven1977 Aug 7, 2024
6b95f77
fix broken asyncio tests for aiohttp 3.10.0 (#46988)
alanwguo Aug 7, 2024
ba62c8d
Clean up tests
dev-goyal Aug 7, 2024
ba012d1
Clean up tests
dev-goyal Aug 7, 2024
91c7b22
PR review comments aug 7
dev-goyal Aug 7, 2024
27f8f48
[serve] Fix Router race condition (#46864)
tungh2 Aug 7, 2024
1e06c21
[Core] Remove actor_register_async config (#46998)
jjyao Aug 7, 2024
5edc25a
[Data] Correctly refresh elapsed time in `ProgressBar` output (#46974)
scottjlee Aug 7, 2024
71f0cae
Clean up tests, bump sqlalchemy
dev-goyal Aug 8, 2024
ece8f17
Linter
dev-goyal Aug 8, 2024
0b28033
[core] change all callbacks to move to save copies. (#46971)
rynewang Aug 7, 2024
d55d5c9
[core] Fix raylet CHECK failure from runtime env creation failure. (#…
rynewang Aug 7, 2024
0927a33
[Serve/Logs] Break serve status details across multiple lines (#46486)
galenhwang Aug 7, 2024
1cf5707
Manually bump sqlalchemy
dev-goyal Aug 8, 2024
304b98d
Try pyiceberg with sqlite
dev-goyal Aug 8, 2024
6ba08b2
[dashbord] fix typo in memray command (#47006)
rynewang Aug 8, 2024
c333af7
Try pyiceberg with sqlite
dev-goyal Aug 8, 2024
919007a
[RLlib] Add missing `__init__.py` to `algorithms/dqn` folder. (#46898)
MaxVanDijck Aug 8, 2024
c4c72d9
[Core] Rename ReferenceCounter::Reference::on_delete to ReferenceCoun…
jjyao Aug 8, 2024
537e5ff
Try just in time pyiceberg install
dev-goyal Aug 8, 2024
af38948
Try just in time pyiceberg install
dev-goyal Aug 8, 2024
9b6c720
[Doc] Fix Typo in build-wheel-manylinux2014.sh (#44666)
slfan1989 Aug 8, 2024
9a5b922
[data][doc] update authenticaion to authentication (#45478)
deepyaman Aug 8, 2024
5521609
[data] Only dump state once on progress bar close (#46928)
omatthew98 Aug 8, 2024
a978710
Version Switcher Part 3 (#47019)
cristianjd Aug 8, 2024
5666df5
[train] Updates to support `xgboost==2.1.0` (#46667)
justinvyu Aug 8, 2024
bde712c
Remove SQL extra
dev-goyal Aug 8, 2024
8b365e9
Merge branch 'master' into restored_iceberg
dev-goyal Aug 8, 2024
6654abc
Bump ax-platform
dev-goyal Aug 8, 2024
324a735
Bump aim
dev-goyal Aug 8, 2024
ac43efb
Update requirements_compiled using CI
dev-goyal Aug 8, 2024
c08cc71
Remove ax sql optional
dev-goyal Aug 9, 2024
6655438
Merge branch 'master' into restored_iceberg
dev-goyal Aug 9, 2024
65042ee
Unpin ax-platform
dev-goyal Aug 9, 2024
36a3389
Merge branch 'master' into restored_iceberg
dev-goyal Aug 9, 2024
bd07635
Remove ax sql optional
dev-goyal Aug 9, 2024
2c5be90
Merge branch 'master' into restored_iceberg
dev-goyal Aug 10, 2024
9291620
Merge branch 'master' into restored_iceberg
dev-goyal Aug 13, 2024
e9e4159
Update requirements_compiled by hand
dev-goyal Aug 13, 2024
c84357a
Merge branch 'master' into restored_iceberg
dev-goyal Aug 13, 2024
3e5a115
Update requirements_compiled using CI
dev-goyal Aug 13, 2024
7eadfa5
Update min pyarrow version
dev-goyal Aug 13, 2024
c6cad80
Remove newline
dev-goyal Aug 13, 2024
9e803cf
Merge branch 'master' into restored_iceberg
dev-goyal Aug 13, 2024
a369681
User fixture to exclude pyarrow < 9
dev-goyal Aug 13, 2024
6959444
Merge branch 'master' into restored_iceberg
dev-goyal Aug 13, 2024
a466acf
Merge branch 'master' into restored_iceberg
dev-goyal Aug 13, 2024
02ade4f
PR review comments
dev-goyal Aug 15, 2024
bd6f032
Merge branch 'master' into restored_iceberg
dev-goyal Aug 15, 2024
7a724d3
Merge branch 'master' into restored_iceberg
dev-goyal Aug 15, 2024
14caa19
Update python/ray/data/_internal/datasource/iceberg_datasource.py
dev-goyal Aug 15, 2024
2342d18
Update python/ray/data/_internal/datasource/iceberg_datasource.py
dev-goyal Aug 15, 2024
acf2c2b
Merge branch 'master' into restored_iceberg
dev-goyal Aug 15, 2024
0031576
Updates
dev-goyal Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ Delta Sharing

read_delta_sharing_tables

Iceberg
-------

.. autosummary::
:nosignatures:
:toctree: doc/

read_iceberg

Lance
-----

Expand Down
27 changes: 27 additions & 0 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,33 @@ Ray Data interoperates with distributed data processing frameworks like
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

.. tab-item:: Iceberg

To create a :class:`~ray.data.dataset.Dataset` from an `Iceberg Table
<https://iceberg.apache.org>`__,
call :func:`~ray.data.read_iceberg`. This function creates a ``Dataset`` backed by
the distributed files that underlie the Iceberg table.

..
.. testcode::
:skipif: True
dev-goyal marked this conversation as resolved.
Show resolved Hide resolved

>>> import ray
>>> from pyiceberg.expressions import EqualTo
>>> ds = ray.data.read_iceberg(
... table_identifier="db_name.table_name",
... row_filter=EqualTo("column_name", "literal_value"),
... catalog_kwargs={"name": "default", "type": "glue"}
... )


.. testoutput::

{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

.. tab-item:: Modin

To create a :class:`~ray.data.dataset.Dataset` from a Modin DataFrame, call
Expand Down
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_iceberg",
size = "medium",
srcs = ["tests/test_iceberg.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_mars",
size = "medium",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
read_databricks_tables,
read_datasource,
read_delta_sharing_tables,
read_iceberg,
read_images,
read_json,
read_lance,
Expand Down Expand Up @@ -139,6 +140,7 @@
"read_csv",
"read_datasource",
"read_delta_sharing_tables",
"read_iceberg",
"read_images",
"read_json",
"read_lance",
Expand Down
230 changes: 230 additions & 0 deletions python/ray/data/_internal/datasource/iceberg_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
"""
Module to read an iceberg table into a Ray Dataset, by using the Ray Datasource API.
"""

import heapq
import itertools
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union

from ray.data._internal.util import _check_import
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog
from pyiceberg.expressions import BooleanExpression
from pyiceberg.manifest import DataFile, DataFileContent
from pyiceberg.table import DataScan, FileScanTask, Schema

logger = logging.getLogger(__name__)


@DeveloperAPI
class IcebergDatasource(Datasource):
"""
Iceberg datasource to read Iceberg tables into a Ray Dataset. This module heavily
uses PyIceberg to read iceberg tables. All the routines in this class override
`ray.data.Datasource`.
"""

def __init__(
self,
table_identifier: str,
row_filter: Union[str, "BooleanExpression"] = None,
selected_fields: Tuple[str, ...] = ("*",),
snapshot_id: Optional[int] = None,
scan_kwargs: Optional[Dict[str, Any]] = None,
catalog_kwargs: Optional[Dict[str, Any]] = None,
):
"""
Initialize an IcebergDatasource.
Args:
table_identifier: Fully qualified table identifier (i.e.,
"db_name.table_name")
row_filter: A PyIceberg BooleanExpression to use to filter the data *prior*
to reading
selected_fields: Which columns from the data to read, passed directly to
PyIceberg's load functions
snapshot_id: Optional snapshot ID for the Iceberg table
scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan()
function
catalog_kwargs: Optional arguments to use when setting up the Iceberg
catalog
"""
_check_import(self, module="pyiceberg", package="pyiceberg")
from pyiceberg.expressions import AlwaysTrue
dev-goyal marked this conversation as resolved.
Show resolved Hide resolved

self._scan_kwargs = scan_kwargs if scan_kwargs is not None else {}
self._catalog_kwargs = catalog_kwargs if catalog_kwargs is not None else {}

if "name" in self._catalog_kwargs:
self._catalog_name = self._catalog_kwargs.pop("name")
else:
self._catalog_name = "default"
dev-goyal marked this conversation as resolved.
Show resolved Hide resolved

self.table_identifier = table_identifier

self._row_filter = row_filter if row_filter is not None else AlwaysTrue()
self._selected_fields = selected_fields

if snapshot_id:
self._scan_kwargs["snapshot_id"] = snapshot_id

self._plan_files = None

def _get_catalog(self) -> "Catalog":
from pyiceberg import catalog

return catalog.load_catalog(self._catalog_name, **self._catalog_kwargs)

@property
def plan_files(self) -> List["FileScanTask"]:
"""
Return the plan files specified by this query
"""
# Calculate and cache the plan_files if they don't already exist
if self._plan_files is None:
data_scan = self._get_data_scan()
self._plan_files = data_scan.plan_files()

return self._plan_files

def _get_data_scan(self) -> "DataScan":
catalog = self._get_catalog()
table = catalog.load_table(self.table_identifier)

data_scan = table.scan(
row_filter=self._row_filter,
selected_fields=self._selected_fields,
**self._scan_kwargs,
)

return data_scan

def estimate_inmemory_data_size(self) -> Optional[int]:
# Approximate the size by using the plan files - this will not
# incorporate the deletes, but that's a reasonable approximation
# task
return sum(task.file.file_size_in_bytes for task in self.plan_files)
dev-goyal marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _distribute_tasks_into_equal_chunks(
plan_files: Iterable["FileScanTask"], n_chunks: int
) -> List[List["FileScanTask"]]:
"""
Implement a greedy knapsack algorithm to distribute the files in the scan
across tasks, based on their file size, as evenly as possible
"""
chunks = [list() for _ in range(n_chunks)]

chunk_sizes = [(0, chunk_id) for chunk_id in range(n_chunks)]
heapq.heapify(chunk_sizes)

# From largest to smallest, add the plan files to the smallest chunk one at a
# time
for plan_file in sorted(
plan_files, key=lambda f: f.file.file_size_in_bytes, reverse=True
):
smallest_chunk = heapq.heappop(chunk_sizes)
chunks[smallest_chunk[1]].append(plan_file)
heapq.heappush(
chunk_sizes,
(
smallest_chunk[0] + plan_file.file.file_size_in_bytes,
smallest_chunk[1],
),
)

return chunks

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
from pyiceberg.io import pyarrow as pyi_pa_io

def _get_read_task(
tasks: Iterable["FileScanTask"],
table_identifier: str,
schema: "Schema",
) -> Iterable[Block]:
# Closure so we can pass this callable as an argument to ReadTask

# Both the catalog and tbl attributes cannot be pickled, which means they
# must be instantiated within this function (as opposed to being attributes
# of the IcebergDatasource class)
catalog = self._get_catalog()
tbl = catalog.load_table(table_identifier)

# Use the PyIceberg API to read only a single task (specifically, a
# FileScanTask) - note that this is not as simple as reading a single
# parquet file, as there might be delete files, etc. associated, so we
# must use the PyIceberg API for the projection.
yield pyi_pa_io.project_table(
tasks=tasks,
table_metadata=tbl.metadata,
io=tbl.io,
row_filter=self._row_filter,
projected_schema=schema,
case_sensitive=self._scan_kwargs.get("case_sensitive", True),
limit=self._scan_kwargs.get("limit"),
)

# Get the PyIceberg scan
data_scan = self._get_data_scan()
# Get the plan files in this query
plan_files = self.plan_files

# Get the projected schema for this scan, given all the row filters,
# snapshot ID, etc.
projected_schema = data_scan.projection()
# Get the arrow schema, to set in the metadata
pya_schema = pyi_pa_io.schema_to_pyarrow(projected_schema)

# Set the n_chunks to the min of the number of plan files and the actual
# requested n_chunks, so that there are no empty tasks
if parallelism > len(list(plan_files)):
parallelism = len(list(plan_files))
logger.warning(
f"Reducing the parallelism to {parallelism}, as that is the"
"number of files"
)

read_tasks = []
# Chunk the plan files based on the requested parallelism
for chunk_tasks in IcebergDatasource._distribute_tasks_into_equal_chunks(
plan_files, parallelism
):
unique_deletes: Set[DataFile] = set(
itertools.chain.from_iterable(
[task.delete_files for task in chunk_tasks]
)
)
# Get a rough estimate of the number of deletes by just looking at
# position deletes. Equality deletes are harder to estimate, as they
# can delete multiple rows.
position_delete_count = sum(
delete.record_count
for delete in unique_deletes
if delete.content == DataFileContent.POSITION_DELETES
)
metadata = BlockMetadata(
num_rows=sum(task.file.record_count for task in chunk_tasks)
- position_delete_count,
size_bytes=sum(task.length for task in chunk_tasks),
dev-goyal marked this conversation as resolved.
Show resolved Hide resolved
schema=pya_schema,
input_files=[task.file.file_path for task in chunk_tasks],
exec_stats=None,
)
read_tasks.append(
ReadTask(
read_fn=lambda tasks=chunk_tasks: _get_read_task(
tasks=tasks,
table_identifier=self.table_identifier,
schema=projected_schema,
),
metadata=metadata,
)
)

return read_tasks
Loading