Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lineage source): add fine grained lineage support #7904

Merged
merged 15 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/examples/mce_files/bootstrap_mce.json
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@
"upstreamType": "FIELD_SET",
"upstreams": ["urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD),event_data)"],
"downstreamType": "FIELD",
"upstreams": ["urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"],
"downstreams": ["urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_name)"],
anshbansal marked this conversation as resolved.
Show resolved Hide resolved
"confidenceScore": 1.0
}]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
)

logger = logging.getLogger(__name__)

Expand All @@ -49,9 +53,44 @@ def type_must_be_supported(cls, v: str) -> str:
return v


class FineGrainedLineageConfig(ConfigModel):
upstreamType: str = "FIELD_SET"
upstreams: Optional[List[str]]
downstreamType: str = "FIELD"
downstreams: Optional[List[str]]
transformOperation: Optional[str]
confidenceScore: Optional[float] = 1.0

@validator("upstreamType")
def upstream_type_must_be_supported(cls, v: str) -> str:
allowed_types = [
FineGrainedLineageUpstreamType.FIELD_SET,
FineGrainedLineageUpstreamType.DATASET,
FineGrainedLineageUpstreamType.NONE,
]
if v not in allowed_types:
raise ConfigurationError(
anshbansal marked this conversation as resolved.
Show resolved Hide resolved
f"Upstream Type must be one of {allowed_types}, {v} is not yet supported."
)
return v

@validator("downstreamType")
def downstream_type_must_be_supported(cls, v: str) -> str:
allowed_types = [
FineGrainedLineageDownstreamType.FIELD_SET,
FineGrainedLineageDownstreamType.FIELD,
]
if v not in allowed_types:
raise ValueError(
f"Downstream Type must be one of {allowed_types}, {v} is not yet supported."
)
return v


class EntityNodeConfig(ConfigModel):
entity: EntityConfig
upstream: Optional[List["EntityNodeConfig"]]
fineGrainedLineages: Optional[List[FineGrainedLineageConfig]]


# https://pydantic-docs.helpmanual.io/usage/postponed_annotations/ required for when you reference a model within itself
Expand Down Expand Up @@ -133,6 +172,7 @@ def _get_lineage_mcp(
entity_node: EntityNodeConfig, preserve_upstream: bool
) -> Optional[MetadataChangeProposalWrapper]:
new_upstreams: List[models.UpstreamClass] = []
new_fine_grained_lineages: List[models.FineGrainedLineageClass] = []
# if this entity has upstream nodes defined, we'll want to do some work.
# if no upstream nodes are present, we don't emit an MCP for it.
if not entity_node.upstream:
Expand Down Expand Up @@ -179,8 +219,22 @@ def _get_lineage_mcp(
f"Entity type: {upstream_entity.type} is unsupported. "
f"Upstream lineage will be skipped for {upstream_entity.name}->{entity.name}"
)
for fine_grained_lineage in entity_node.fineGrainedLineages or []:
new_fine_grained_lineages.append(
models.FineGrainedLineageClass(
upstreams=fine_grained_lineage.upstreams,
upstreamType=fine_grained_lineage.upstreamType,
downstreams=fine_grained_lineage.downstreams,
downstreamType=fine_grained_lineage.downstreamType,
confidenceScore=fine_grained_lineage.confidenceScore,
transformOperation=fine_grained_lineage.transformOperation,
)
)

return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=models.UpstreamLineageClass(upstreams=new_upstreams),
aspect=models.UpstreamLineageClass(
upstreams=new_upstreams,
fineGrainedLineages=new_fine_grained_lineages,
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"fineGrainedLineages": [],
"upstreams": [
{
"auditStamp": {
Expand Down Expand Up @@ -38,6 +39,7 @@
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"fineGrainedLineages": [],
"upstreams": [
{
"auditStamp": {
Expand Down
24 changes: 23 additions & 1 deletion metadata-ingestion/tests/unit/test_file_lineage_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.configuration.common import ConfigurationError
from datahub.ingestion.source.metadata.lineage import LineageConfig, _get_lineage_mcp
from datahub.metadata.schema_classes import UpstreamClass
from datahub.metadata.schema_classes import FineGrainedLineageClass, UpstreamClass

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,6 +38,15 @@ def basic_mcp():
type: dataset
env: DEV
platform: kafka
fineGrainedLineages:
- upstreamType: FIELD_SET
upstreams:
- urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD),user_id)
downstreamType: FIELD_SET
downstreams:
- urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,topic3,PROD),user_id)
confidenceScore: 0.9
transformOperation: func1
"""
config = yaml.safe_load(sample_lineage)
lineage_config: LineageConfig = LineageConfig.parse_obj(config)
Expand Down Expand Up @@ -171,6 +180,19 @@ def test_basic_lineage_upstream_urns(basic_mcp):
)


def test_basic_lineage_finegrained_upstream_urns(basic_mcp):
"""
Checks to see if the finegrained urns are correct for a basic_mcp example
"""
fine_grained_lineage: FineGrainedLineageClass = (
basic_mcp.aspect.fineGrainedLineages[0]
)
assert fine_grained_lineage.upstreamType == "FIELD_SET"
assert fine_grained_lineage.downstreamType == "FIELD_SET"
assert fine_grained_lineage.confidenceScore == 0.9
assert fine_grained_lineage.transformOperation == "func1"


def test_unsupported_entity_type():
"""
Checks to see how we handle the case of unsupported entity types.
Expand Down