Skip to content

Commit

Permalink
CLDN-1784 - Refactor code to pyiceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-eric committed Jan 13, 2023
1 parent cf5d295 commit 06f8205
Show file tree
Hide file tree
Showing 40 changed files with 991 additions and 1,809 deletions.
31 changes: 27 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,38 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "IcebergSource":
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
for dataset_path, dataset_name in self.config.get_paths(): # Tuple[str, str]
try:
catalog = self.config.get_catalog()
datasets = []
if catalog:
for namespace in catalog.list_namespaces():
for tableName in catalog.list_tables(namespace):
dataset_name = ".".join(tableName)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
continue
else:
datasets.append((tableName, dataset_name))
else:
# Will be obsolete once HadoopCatalog is supported by PyIceberg, or we migrate to REST catalog
for (
dataset_path,
dataset_name,
) in self.config.get_paths(): # Tuple[str, str]
if not self.config.table_pattern.allowed(dataset_name):
# Path is rejected by pattern, report as dropped.
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
continue
else:
datasets.append((dataset_path, dataset_name))

for dataset_path, dataset_name in datasets:
try:
# Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError.
table = self.config.load_table(dataset_name, dataset_path)
if isinstance(dataset_path, str):
table = self.config.load_table(dataset_name, dataset_path)
else:
table = catalog.load_table(dataset_path)
yield from self._create_iceberg_workunit(dataset_name, table)
dataset_urn: str = make_dataset_urn_with_platform_instance(
self.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pydantic
from fsspec import AbstractFileSystem, filesystem
from pydantic import Field, root_validator
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.exceptions import NoSuchIcebergTableError
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.serializers import FromInputFile
Expand Down Expand Up @@ -58,6 +59,21 @@ class IcebergSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
_entity_types: List[str] = pydantic.Field(default=["table"])


class IcebergCatalogConfig(ConfigModel):
"""
Iceberg catalog config.
https://py.iceberg.apache.org/configuration/
"""

name: str = Field(
description="Name of catalog",
)
conf: Dict[str, str] = Field(
description="Catalog specific configuration. See [PyIceberg documentation](https://py.iceberg.apache.org/configuration/) for details.",
)


class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
# Override the stateful_ingestion config param with the Iceberg custom stateful ingestion config in the IcebergSourceConfig
stateful_ingestion: Optional[IcebergSourceStatefulIngestionConfig] = pydantic.Field(
Expand All @@ -71,6 +87,10 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
default=None,
description="Local path to crawl for Iceberg tables. This is one filesystem type supported by this source and **only one can be configured**.",
)
catalog: Optional[IcebergCatalogConfig] = Field(
default=None,
description="Catalog configuration where to find Iceberg tables.",
)
max_path_depth: int = Field(
default=2,
description="Maximum folder depth to crawl for Iceberg tables. Folders deeper than this value will be silently ignored.",
Expand Down Expand Up @@ -106,13 +126,24 @@ def validate_platform_instance(cls: "IcebergSourceConfig", values: Dict) -> Dict
def _ensure_one_filesystem_is_configured(
cls: "IcebergSourceConfig", values: Dict
) -> Dict:
if values.get("adls") and values.get("localfs"):
count = sum(
[
1
for x in [
values.get("catalog"),
values.get("adls"),
values.get("localfs"),
]
if x is not None
]
)
if count == 0:
raise ConfigurationError(
"Only one filesystem can be configured: adls or localfs"
"One filesystem (catalog or adls or localfs) needs to be configured."
)
elif not values.get("adls") and not values.get("localfs"):
elif count > 1:
raise ConfigurationError(
"One filesystem (adls or localfs) needs to be configured."
"Only one filesystem can be configured: catalog or adls or localfs"
)
return values

Expand Down Expand Up @@ -206,6 +237,13 @@ def get_paths(self) -> Iterable[Tuple[str, str]]:
else:
raise ConfigurationError("No filesystem client configured")

def get_catalog(self) -> Catalog:
return (
load_catalog(name=self.catalog.name, **self.catalog.conf)
if self.catalog
else None
)


@dataclass
class IcebergSourceReport(StaleEntityRemovalSourceReport):
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/tests/integration/iceberg/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Folders created by Iceberg's docker-compose
notebooks/
warehouse/
63 changes: 63 additions & 0 deletions metadata-ingestion/tests/integration/iceberg/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
version: "3"

services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
- ./setup:/home/iceberg/setup
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001
links:
- rest:rest
- minio:minio
rest:
image: tabulario/iceberg-rest:0.2.0
ports:
- 8181:8181
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3a://warehouse/wh/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
exit 0;
"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": true}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00"
}
}
]
Loading

0 comments on commit 06f8205

Please sign in to comment.