Skip to content

Commit

Permalink
feat(ingest): support view lineage for all sqlalchemy sources
Browse files Browse the repository at this point in the history
Additional Changes:
1. Support incremental lineage for all sqlalchemy sources
2. Keep column level lineage enabled and incremental lineage disabled by default
3. Monkey-patch hive dialect to extract hive view definitions to extract lineage
4. Fix incremental_lineage_helper for empty upstreams

Pending Followup Changes:
1. Support postgres-like partial view definitions
  • Loading branch information
mayurinehate committed Oct 19, 2023
1 parent 269c4ea commit 75f63a2
Show file tree
Hide file tree
Showing 19 changed files with 1,473 additions and 292 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel):

class LineageConfig(ConfigModel):
incremental_lineage: bool = Field(
default=True,
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def process_sql_parsing_result(
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.
Expand All @@ -130,7 +131,9 @@ def process_sql_parsing_result(
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
if lineage_aspect.fineGrainedLineages:
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
Expand Down Expand Up @@ -215,12 +216,35 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns

incremental_lineage_processor: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.source
and self.ctx.pipeline_config.source.config
):
incremental_lineage = (
hasattr(
self.ctx.pipeline_config.source.config,
"incremental_lineage",
)
and self.ctx.pipeline_config.source.config.incremental_lineage
) or (
hasattr(self.ctx.pipeline_config.source.config, "get")
and self.ctx.pipeline_config.source.config.get("incremental_lineage")
)
incremental_lineage_processor = partial(
auto_incremental_lineage,
self.ctx.graph,
incremental_lineage,
)
return [
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
incremental_lineage_processor,
]

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ def _gen_workunit_from_sql_parsing_result(
self,
dataset_identifier: str,
result: SqlParsingResult,
) -> MetadataWorkUnit:
) -> Iterable[MetadataWorkUnit]:
upstreams, fine_upstreams = self.get_upstreams_from_sql_parsing_result(
self.dataset_urn_builder(dataset_identifier), result
)
self.report.num_views_with_upstreams += 1
return self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)
if upstreams:
self.report.num_views_with_upstreams += 1
yield self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)

def _gen_workunits_from_query_result(
self,
Expand Down Expand Up @@ -251,7 +252,7 @@ def get_view_upstream_workunits(
)
if result:
views_processed.add(view_identifier)
yield self._gen_workunit_from_sql_parsing_result(
yield from self._gen_workunit_from_sql_parsing_result(
view_identifier, result
)
self.report.view_lineage_parse_secs = timer.elapsed_seconds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os.path
import platform
from dataclasses import dataclass
from functools import partial
from typing import Callable, Dict, Iterable, List, Optional, Union

import pandas as pd
Expand All @@ -27,7 +26,6 @@
platform_name,
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -513,11 +511,6 @@ def _init_schema_resolver(self) -> SchemaResolver:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down
86 changes: 75 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

from pydantic.class_validators import validator
from pydantic.fields import Field

# This import verifies that the dependencies are available.
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -18,8 +21,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand All @@ -31,6 +36,7 @@
SchemaField,
TimeTypeClass,
)
from datahub.metadata.schema_classes import ViewPropertiesClass
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

Expand Down Expand Up @@ -90,19 +96,39 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
logger.warning(f"Failed to patch method due to {e}")


try:
from pyhive.sqlalchemy_hive import HiveDialect

@reflection.cache # type: ignore
def get_view_names_patched(self, connection, schema=None, **kw):
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
return [row[0] for row in connection.execute(query)]

@reflection.cache # type: ignore
def get_view_definition_patched(self, connection, view_name, schema=None, **kw):
full_table = self.identifier_preparer.quote_identifier(view_name)
if schema:
full_table = "{}.{}".format(
self.identifier_preparer.quote_identifier(schema),
self.identifier_preparer.quote_identifier(view_name),
)
row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone()
return row[0]

HiveDialect.get_view_names = get_view_names_patched
HiveDialect.get_view_definition = get_view_definition_patched
except ModuleNotFoundError:
pass
except Exception as e:
logger.warning(f"Failed to patch method due to {e}")


class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)

# Hive SQLAlchemy connector returns views as tables.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# Disabling views helps us prevent this duplication.
include_views = Field(
default=False,
hidden_from_docs=True,
description="Hive SQLAlchemy connector returns views as tables. See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273. Disabling views helps us prevent this duplication.",
)

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)
Expand Down Expand Up @@ -174,3 +200,41 @@ def get_schema_fields_for_column(
return new_fields

return fields

# Hive SQLAlchemy connector returns views as tables in get_table_names.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# This override makes sure that we ingest view definitions for views
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

if view_definition:
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
Loading

0 comments on commit 75f63a2

Please sign in to comment.