Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): optionally emit all upstreams irrespective of recipe pattern #7842

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,36 +228,32 @@ def _populate_table_lineage(self):
def get_table_upstream_workunits(self, discovered_tables):
if self.config.include_table_lineage:
for dataset_name in discovered_tables:
if self._is_dataset_pattern_allowed(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern check is not required, as its already present on discovered_tables here.

dataset_name, SnowflakeObjectDomain.TABLE
):
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(dataset_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(dataset_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()

def get_view_upstream_workunits(self, discovered_views):
if self.config.include_view_lineage:
for view_name in discovered_views:
if self._is_dataset_pattern_allowed(view_name, "view"):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern check is not required, as its already present on discovered_views here.

dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
view_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(view_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()
dataset_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
view_name,
self.config.platform_instance,
self.config.env,
)
upstream_lineage = self._get_upstream_lineage_info(view_name)
if upstream_lineage is not None:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=upstream_lineage
).as_workunit()

def _get_upstream_lineage_info(
self, dataset_name: str
Expand Down Expand Up @@ -442,13 +438,7 @@ def _process_table_lineage_row(self, db_row):
upstream_table_name = self.get_dataset_identifier_from_qualified_name(
db_row["UPSTREAM_TABLE_NAME"]
)
if not self._is_dataset_pattern_allowed(
key, SnowflakeObjectDomain.TABLE
) or not (
self._is_dataset_pattern_allowed(
upstream_table_name, SnowflakeObjectDomain.TABLE
)
):
if not self._is_dataset_pattern_allowed(key, SnowflakeObjectDomain.TABLE):
return
self._lineage_map[key].update_lineage(
# (<upstream_table_name>, <json_list_of_upstream_columns>, <json_list_of_downstream_columns>)
Expand Down Expand Up @@ -499,8 +489,6 @@ def _process_view_upstream_lineage_row(self, db_row):
if not self._is_dataset_pattern_allowed(
dataset_name=view_name,
dataset_type=db_row["REFERENCING_OBJECT_DOMAIN"],
) or not self._is_dataset_pattern_allowed(
view_upstream, db_row["REFERENCED_OBJECT_DOMAIN"]
):
return
# key is the downstream view name
Expand Down Expand Up @@ -554,8 +542,6 @@ def _process_view_downstream_lineage_row(self, db_row):
db_row["DOWNSTREAM_TABLE_NAME"]
)
if not self._is_dataset_pattern_allowed(
view_name, db_row["VIEW_DOMAIN"]
) or not self._is_dataset_pattern_allowed(
downstream_table, db_row["DOWNSTREAM_TABLE_DOMAIN"]
):
return
Expand Down Expand Up @@ -651,13 +637,7 @@ def build_finegrained_lineage_upstreams(
) -> List[str]:
column_upstreams = []
for upstream_col in fine_upstream.inputColumns:
if (
upstream_col.objectName
and upstream_col.columnName
and self._is_dataset_pattern_allowed(
upstream_col.objectName, upstream_col.objectDomain
)
):
if upstream_col.objectName and upstream_col.columnName:
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
upstream_col.objectName
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ def _process_add_single_upstream(self, upstreams, upstream_table):
upstream_name = self.get_dataset_identifier_from_qualified_name(
upstream_table["upstream_object_name"]
)
if upstream_name and self._is_dataset_pattern_allowed(
upstream_name, upstream_table["upstream_object_domain"]
):
if upstream_name:
upstreams.append(
UpstreamClass(
dataset=builder.make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -482,13 +480,7 @@ def build_finegrained_lineage_upstreams(
) -> List[str]:
column_upstreams = []
for upstream_col in fine_upstream.inputColumns:
if (
upstream_col.objectName
and upstream_col.columnName
and self._is_dataset_pattern_allowed(
upstream_col.objectName, upstream_col.objectDomain
)
):
if upstream_col.objectName and upstream_col.columnName:
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
upstream_col.objectName
)
Expand Down
60 changes: 58 additions & 2 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,30 @@ def default_query_results(query): # noqa: C901
),
}
for op_idx in range(1, NUM_OPS + 1)
] + [
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is in in mocked query result for legacy lineage method.

{
"DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
"UPSTREAM_TABLE_NAME": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"UPSTREAM_TABLE_COLUMNS": json.dumps(
[{"columnId": 0, "columnName": "COL_1"}]
),
"DOWNSTREAM_TABLE_COLUMNS": json.dumps(
[
{
"columnId": 0,
"columnName": "COL_1",
"directSources": [
{
"columnName": "COL_1",
"objectDomain": "Table",
"objectId": 0,
"objectName": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
}
],
}
]
),
}
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
Expand All @@ -307,7 +331,11 @@ def default_query_results(query): # noqa: C901
{
"upstream_object_name": "TEST_DB.TEST_SCHEMA.VIEW_1",
"upstream_object_domain": "VIEW",
}
},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is in in mocked query result for new optimised lineage method.

{
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"upstream_object_domain": "TABLE",
},
]
if op_idx == 1
else []
Expand All @@ -329,6 +357,24 @@ def default_query_results(query): # noqa: C901
}
for col_idx in range(1, NUM_COLS + 1)
]
+ ( # This additional upstream is only for TABLE_1
[
{
"column_name": "COL_1",
"upstreams": [
[
{
"object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"object_domain": "Table",
"column_name": "COL_1",
}
]
],
}
]
if op_idx == 1
else []
)
),
}
for op_idx in range(1, NUM_OPS + 1)
Expand All @@ -347,8 +393,18 @@ def default_query_results(query): # noqa: C901
{
"upstream_object_name": "TEST_DB.TEST_SCHEMA.TABLE_2",
"upstream_object_domain": "TABLE",
}
},
]
+ ( # This additional upstream is only for TABLE_1
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is in in mocked query result for new optimised lineage method, table lineage only.

[
{
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"upstream_object_domain": "TABLE",
},
]
if op_idx == 1
else []
)
),
}
for op_idx in range(1, NUM_OPS + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3589,7 +3589,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
Expand All @@ -3600,25 +3600,9 @@
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,other_db.other_schema.table_1,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
},
{
"auditStamp": {
"time": 0,
Expand All @@ -3637,6 +3621,17 @@
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,other_db.other_schema.table_1,PROD),col_1)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
Expand Down Expand Up @@ -4979,6 +4974,30 @@
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"type": "TRANSFORMED"
}
]
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-2022_06_07-17_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
Expand Down
Loading