Skip to content

Commit

Permalink
Pipe through to delta lake read
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Jun 14, 2024
1 parent 8d9ea6e commit a7c7512
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
16 changes: 12 additions & 4 deletions daft/io/_delta_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from daft.daft import IOConfig, NativeStorageConfig, ScanOperatorHandle, StorageConfig
from daft.dataframe import DataFrame
from daft.io.catalog import DataCatalogTable
from daft.io.unity_catalog import UnityCatalogTable
from daft.logical.builder import LogicalPlanBuilder


Expand All @@ -25,7 +26,7 @@ def read_delta_lake(

@PublicAPI
def read_deltalake(
table: Union[str, DataCatalogTable],
table: Union[str, DataCatalogTable, UnityCatalogTable],
io_config: Optional["IOConfig"] = None,
_multithreaded_io: Optional[bool] = None,
) -> DataFrame:
Expand Down Expand Up @@ -56,20 +57,27 @@ def read_deltalake(
"""
from daft.delta_lake.delta_lake_scan import DeltaLakeScanOperator

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

# If running on Ray, we want to limit the amount of concurrency and requests being made.
# This is because each Ray worker process receives its own pool of thread workers and connections
multithreaded_io = not context.get_context().is_ray_runner if _multithreaded_io is None else _multithreaded_io

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, io_config))

if isinstance(table, str):
table_uri = table
elif isinstance(table, DataCatalogTable):
table_uri = table.table_uri(io_config)
elif isinstance(table, UnityCatalogTable):
table_uri = table.table_uri

Check warning on line 72 in daft/io/_delta_lake.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_delta_lake.py#L71-L72

Added lines #L71 - L72 were not covered by tests

# Override the storage_config with the one provided by Unity catalog
table_io_config = table.io_config
if table_io_config is not None:
storage_config = StorageConfig.native(NativeStorageConfig(multithreaded_io, table_io_config))

Check warning on line 77 in daft/io/_delta_lake.py

View check run for this annotation

Codecov / codecov/patch

daft/io/_delta_lake.py#L75-L77

Added lines #L75 - L77 were not covered by tests
else:
raise ValueError(
f"table argument must be a table URI string or a DataCatalogTable instance, but got: {type(table)}, {table}"
f"table argument must be a table URI string, DataCatalogTable or UnityCatalogTable instance, but got: {type(table)}, {table}"
)
delta_lake_operator = DeltaLakeScanOperator(table_uri, storage_config=storage_config)

Expand Down
57 changes: 57 additions & 0 deletions daft/io/unity_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

import dataclasses

import requests

from daft.io import IOConfig, S3Config


@dataclasses.dataclass(frozen=True)
class UnityCatalogTable:
table_uri: str
io_config: IOConfig | None


class UnityCatalog:
def __init__(self, endpoint: str, token: str | None = None):
self._endpoint = endpoint
self._token_header = {"Authorization": f"Bearer {token}"} if token else {}

Check warning on line 19 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L18-L19

Added lines #L18 - L19 were not covered by tests

def list_schemas(self):
raise NotImplementedError("Listing schemas not yet implemented.")

Check warning on line 22 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L22

Added line #L22 was not covered by tests

def list_tables(self, schema: str):
raise NotImplementedError("Listing tables not yet implemented.")

Check warning on line 25 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L25

Added line #L25 was not covered by tests

def load_table(self, name: str) -> UnityCatalogTable:
# Load the table ID
table_info = requests.get(

Check warning on line 29 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L29

Added line #L29 was not covered by tests
self._endpoint + f"/api/2.1/unity-catalog/tables/{name}", headers=self._token_header
).json()
table_id = table_info["table_id"]
table_uri = table_info["storage_location"]

Check warning on line 33 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L32-L33

Added lines #L32 - L33 were not covered by tests

# Grab credentials from Unity catalog and place it into the Table
temp_table_cred_endpoint = self._endpoint + "/api/2.1/unity-catalog/temporary-table-credentials"
response = requests.post(

Check warning on line 37 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L36-L37

Added lines #L36 - L37 were not covered by tests
temp_table_cred_endpoint, json={"table_id": table_id, "operation": "READ"}, headers=self._token_header
)

aws_temp_credentials = response.json()["aws_temp_credentials"]
io_config = (

Check warning on line 42 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L41-L42

Added lines #L41 - L42 were not covered by tests
IOConfig(
s3=S3Config(
key_id=aws_temp_credentials.get("access_key_id"),
access_key=aws_temp_credentials.get("secret_access_key"),
session_token=aws_temp_credentials.get("session_token"),
)
)
if aws_temp_credentials is not None
else None
)

return UnityCatalogTable(

Check warning on line 54 in daft/io/unity_catalog.py

View check run for this annotation

Codecov / codecov/patch

daft/io/unity_catalog.py#L54

Added line #L54 was not covered by tests
table_uri=table_uri,
io_config=io_config,
)

0 comments on commit a7c7512

Please sign in to comment.