Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(delta-lake): extract table history into operation aspect #5277

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c76ba53
feat(delta-lake): add ingestion source delta-lake
MugdhaHardikar-GSLab Jun 27, 2022
42ae6c3
feat(delta-lake): refactor config
MugdhaHardikar-GSLab Jun 27, 2022
c0fd016
Update data_platforms.json
MugdhaHardikar-GSLab Jun 27, 2022
6f04654
Enhance documentation
MugdhaHardikar-GSLab Jun 27, 2022
51b15a1
add cred file format
MugdhaHardikar-GSLab Jun 27, 2022
1a61ee3
merge
shirshanka Jun 27, 2022
a715366
minor doc fixes
shirshanka Jun 27, 2022
58a7946
fix(delta-lake): fix dependency issue for snowflake due to s3_util
MugdhaHardikar-GSLab Jun 28, 2022
40300d5
Merge branch 'master' into delta-lake-base
MugdhaHardikar-GSLab Jun 28, 2022
1670ced
Update s3_util.py
MugdhaHardikar-GSLab Jun 28, 2022
e63491b
fix(delta-lake): add missed file
MugdhaHardikar-GSLab Jun 28, 2022
4c3601e
Merge branch 'datahub-project:master' into delta-lake-base
MugdhaHardikar-GSLab Jun 28, 2022
5c646cf
lint fix
MugdhaHardikar-GSLab Jun 28, 2022
f4c244d
lint fix
MugdhaHardikar-GSLab Jun 28, 2022
9cea8f4
feat(delta-lake): extract table history into operation aspect
MugdhaHardikar-GSLab Jun 28, 2022
8d64a43
support limitted version history
MugdhaHardikar-GSLab Jul 8, 2022
e80d21d
modify operation_custom_properties population
MugdhaHardikar-GSLab Jul 12, 2022
280ebd4
Merge remote-tracking branch 'linkedinoss/master' into delta-lake-base
shirshanka Aug 7, 2022
524a6ae
address review comments
shirshanka Aug 7, 2022
144b332
changing config name and defaulting to lookback of 1
shirshanka Aug 7, 2022
3aa7ce8
Merge branch 'master' into delta-lake-base
shirshanka Aug 7, 2022
9c50fef
fix lint
shirshanka Aug 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class Config:
default=AllowDenyPattern.allow_all(),
description="regex patterns for tables to filter in ingestion.",
)
version_history_lookback: Optional[int] = Field(
default=1,
description="Number of previous version histories to be ingested. Defaults to 1. If set to -1 all version history will be ingested.",
)

s3: Optional[S3] = Field()

Expand All @@ -72,14 +76,20 @@ def is_s3(self):
def get_complete_path(self):
return self._complete_path

@pydantic.validator("version_history_lookback")
def negative_version_history_implies_no_limit(cls, v):
if v and v < 0:
return None
return v

@pydantic.root_validator()
def validate_config(cls, values: Dict) -> Dict[str, Any]:
values["_is_s3"] = is_s3_uri(values["base_path"])
values["_is_s3"] = is_s3_uri(values.get("base_path", ""))
if values["_is_s3"]:
if values["s3"] is None:
if values.get("s3") is None:
raise ValueError("s3 config must be set for s3 path")
values["_complete_path"] = values["base_path"]
if values["relative_path"] is not None:
values["_complete_path"] = values.get("base_path")
shirshanka marked this conversation as resolved.
Show resolved Hide resolved
if values.get("relative_path") is not None:
values[
"_complete_path"
] = f"{values['_complete_path'].rstrip('/')}/{values['relative_path'].lstrip('/')}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ def read_delta_table(

except PyDeltaTableError as e:
if "Not a Delta table" not in str(e):
import pdb

pdb.set_trace()
raise e

return delta_table


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
import time
from datetime import datetime
from typing import Callable, Iterable, List

from deltalake import DeltaTable
Expand All @@ -8,6 +10,7 @@
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, WorkUnit
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -41,8 +44,11 @@
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
NullTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
)
from datahub.telemetry import telemetry
Expand All @@ -54,6 +60,19 @@
"platform",
]

OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
"DELETE": OperationTypeClass.DELETE,
"MERGE": OperationTypeClass.UPDATE,
"CREATE": OperationTypeClass.CREATE,
"CREATE_TABLE_AS_SELECT": OperationTypeClass.CREATE,
"CREATE_SCHEMA": OperationTypeClass.CREATE,
"DROP_TABLE": OperationTypeClass.DROP,
"REPLACE TABLE AS SELECT": OperationTypeClass.UPDATE,
"COPY INTO": OperationTypeClass.UPDATE,
}


@platform_name("Delta Lake", id="delta-lake")
@config_class(DeltaLakeSourceConfig)
Expand Down Expand Up @@ -122,6 +141,58 @@ def get_fields(self, delta_table: DeltaTable) -> List[SchemaField]:

return fields

def _create_operation_aspect_wu(
self, delta_table: DeltaTable, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
for hist in delta_table.history(
limit=self.source_config.version_history_lookback
):

# History schema picked up from https://docs.delta.io/latest/delta-utility.html#retrieve-delta-table-history
reported_time: int = int(time.time() * 1000)
last_updated_timestamp: int = hist["timestamp"]
statement_type = OPERATION_STATEMENT_TYPES.get(
hist.get("operation"), OperationTypeClass.CUSTOM
)
custom_type = (
hist.get("operation")
if statement_type == OperationTypeClass.CUSTOM
else None
)

operation_custom_properties = dict()
for key, val in hist.items():
if val is not None:
if isinstance(val, dict):
for k, v in val:
if v is not None:
operation_custom_properties[f"{key}_{k}"] = str(v)
else:
operation_custom_properties[key] = str(val)
operation_custom_properties.pop("timestamp", None)
operation_custom_properties.pop("operation", None)
operation_aspect = OperationClass(
timestampMillis=reported_time,
lastUpdatedTimestamp=last_updated_timestamp,
operationType=statement_type,
customOperationType=custom_type,
customProperties=operation_custom_properties,
)

mcp = MetadataChangeProposalWrapper(
entityType="dataset",
aspectName="operation",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspect=operation_aspect,
)
operational_wu = MetadataWorkUnit(
id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{dataset_urn}",
mcp=mcp,
)
self.report.report_workunit(operational_wu)
yield operational_wu

def ingest_table(
self, delta_table: DeltaTable, path: str
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -164,11 +235,6 @@ def ingest_table(
"version": str(delta_table.version()),
"location": self.source_config.get_complete_path(),
}
customProperties.update(delta_table.history()[-1])
customProperties["version_creation_time"] = customProperties["timestamp"]
del customProperties["timestamp"]
for key in customProperties.keys():
customProperties[key] = str(customProperties[key])

dataset_properties = DatasetPropertiesClass(
description=delta_table.metadata().description,
Expand Down Expand Up @@ -221,6 +287,8 @@ def ingest_table(
self.report.report_workunit(wu)
yield wu

yield from self._create_operation_aspect_wu(delta_table, dataset_urn)

def process_folder(
self, path: str, get_folders: Callable[[str], Iterable[str]]
) -> Iterable[MetadataWorkUnit]:
Expand Down
Loading