Skip to content

Commit

Permalink
feat(ingestion/trino): Add sibling support in ingestion (#9853)
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 authored Feb 26, 2024
1 parent 1c5e66d commit 5921a33
Show file tree
Hide file tree
Showing 6 changed files with 4,834 additions and 222 deletions.
9 changes: 9 additions & 0 deletions metadata-ingestion/docs/sources/trino/trino_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ source:
# options:
# connect_args:
# http_scheme: http

# Optional -- A mapping of trino catalog to its connector details like connector database, env and platform instance.
# This configuration is used to ingest lineage of datasets to connectors. Use catalog name as key.
# catalog_to_connector_details:
# catalog_name:
# connector_database: db_name
# connector_platform: connector_platform_name
# platform_instance: cloud_instance
# env: DEV

sink:
# sink configs
261 changes: 231 additions & 30 deletions metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
import functools
import json
import logging
import uuid
from textwrap import dedent
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

import sqlalchemy
import trino
from packaging import version
from pydantic.fields import Field
from sqlalchemy import exc, sql
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes
from sqlalchemy.types import TypeEngine
from trino.exceptions import TrinoQueryError
from trino.sqlalchemy import datatype
from trino.sqlalchemy.dialect import TrinoDialect

from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -25,12 +33,23 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SqlWorkUnit,
register_custom_type,
)
from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig
from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig,
SQLCommonConfig,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
Upstream,
UpstreamLineage,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
MapTypeClass,
NumberTypeClass,
Expand All @@ -42,11 +61,46 @@
register_custom_type(datatype.MAP, MapTypeClass)
register_custom_type(datatype.DOUBLE, NumberTypeClass)


KNOWN_CONNECTOR_PLATFORM_MAPPING = {
"clickhouse": "clickhouse",
"hive": "hive",
"glue": "glue",
"iceberg": "iceberg",
"mysql": "mysql",
"postgresql": "postgres",
"redshift": "redshift",
"bigquery": "bigquery",
"snowflake_distributed": "snowflake",
"snowflake_parallel": "snowflake",
"snowflake_jdbc": "snowflake",
}

TWO_TIER_CONNECTORS = ["clickhouse", "hive", "glue", "mysql", "iceberg"]

PROPERTIES_TABLE_SUPPORTED_CONNECTORS = ["hive", "iceberg"]

# Type JSON was introduced in trino sqlalchemy dialect in version 0.317.0
if version.parse(trino.__version__) >= version.parse("0.317.0"):
register_custom_type(datatype.JSON, RecordTypeClass)


@functools.lru_cache()
def gen_catalog_connector_dict(engine: Engine) -> Dict[str, str]:
query = dedent(
"""
SELECT *
FROM "system"."metadata"."catalogs"
"""
).strip()
res = engine.execute(sql.text(query))
return {row.catalog_name: row.connector_name for row in res}


def get_catalog_connector_name(engine: Engine, catalog_name: str) -> Optional[str]:
return gen_catalog_connector_dict(engine).get(catalog_name)


# Read only table names and skip view names, as view names will also be returned
# from get_view_names
@reflection.cache # type: ignore
Expand All @@ -69,26 +123,27 @@ def get_table_names(self, connection, schema: str = None, **kw): # type: ignore
@reflection.cache # type: ignore
def get_table_comment(self, connection, table_name: str, schema: str = None, **kw): # type: ignore
try:
properties_table = self._get_full_table(f"{table_name}$properties", schema)
query = f"SELECT * FROM {properties_table}"
row = connection.execute(sql.text(query)).fetchone()

# Generate properties dictionary.
properties = {}
if row:
for col_name, col_value in row.items():
if col_value is not None:
properties[col_name] = col_value

return {"text": properties.get("comment", None), "properties": properties}
# Fallback to default trino-sqlalchemy behaviour if `$properties` table doesn't exist
except TrinoQueryError:
return self.get_table_comment_default(connection, table_name, schema)
# Exception raised when using Starburst Delta Connector that falls back to a Hive Catalog
except exc.ProgrammingError as e:
if isinstance(e.orig, TrinoQueryError):
catalog_name = self._get_default_catalog_name(connection)
if catalog_name is None:
raise exc.NoSuchTableError("catalog is required in connection")
connector_name = get_catalog_connector_name(connection.engine, catalog_name)
if connector_name is None:
return {}
if connector_name in PROPERTIES_TABLE_SUPPORTED_CONNECTORS:
properties_table = self._get_full_table(f"{table_name}$properties", schema)
query = f"SELECT * FROM {properties_table}"
row = connection.execute(sql.text(query)).fetchone()

# Generate properties dictionary.
properties = {}
if row:
for col_name, col_value in row.items():
if col_value is not None:
properties[col_name] = col_value

return {"text": properties.get("comment", None), "properties": properties}
else:
return self.get_table_comment_default(connection, table_name, schema)
raise
except Exception:
return {}

Expand Down Expand Up @@ -131,19 +186,38 @@ def _get_columns(self, connection, table_name, schema: str = None, **kw): # typ
TrinoDialect._get_columns = _get_columns


class ConnectorDetail(PlatformInstanceConfigMixin, EnvConfigMixin):
connector_database: Optional[str] = Field(default=None, description="")
connector_platform: Optional[str] = Field(
default=None,
description="A connector's actual platform name. If not provided, will take from metadata tables"
"Eg: hive catalog can have a connector platform as 'hive' or 'glue' or some other metastore.",
)


class TrinoConfig(BasicSQLAlchemyConfig):
# defaults
scheme: str = Field(default="trino", description="", hidden_from_docs=True)
database: str = Field(description="database (catalog)")

catalog_to_connector_details: Dict[str, ConnectorDetail] = Field(
default={},
description="A mapping of trino catalog to its connector details like connector database, env and platform instance."
"This configuration is used to build lineage to the underlying connector. Use catalog name as key.",
)

ingest_lineage_to_connectors: bool = Field(
default=True,
description="Whether lineage of datasets to connectors should be ingested",
)

trino_as_primary: bool = Field(
default=True,
description="Experimental feature. Whether trino dataset should be primary entity of the set of siblings",
)

def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
identifier = f"{schema}.{table}"
if self.database: # TODO: this should be required field
identifier = f"{self.database}.{identifier}"
return (
f"{self.platform_instance}.{identifier}"
if self.platform_instance
else identifier
)
return f"{self.database}.{schema}.{table}"


@platform_name("Trino", doc_order=1)
Expand Down Expand Up @@ -175,6 +249,133 @@ def get_db_name(self, inspector: Inspector) -> str:
else:
return super().get_db_name(inspector)

def _get_source_dataset_urn(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
table: str,
) -> Optional[str]:
catalog_name = dataset_name.split(".")[0]
connector_name = get_catalog_connector_name(inspector.engine, catalog_name)
if not connector_name:
return None
connector_details = self.config.catalog_to_connector_details.get(
catalog_name, ConnectorDetail()
)
connector_platform_name = KNOWN_CONNECTOR_PLATFORM_MAPPING.get(
connector_details.connector_platform or connector_name
)
if not connector_platform_name:
logging.debug(f"Platform '{connector_platform_name}' is not yet supported.")
return None

if connector_platform_name in TWO_TIER_CONNECTORS: # connector is two tier
return make_dataset_urn_with_platform_instance(
platform=connector_platform_name,
name=f"{schema}.{table}",
platform_instance=connector_details.platform_instance,
env=connector_details.env,
)
elif connector_details.connector_database: # else connector is three tier
return make_dataset_urn_with_platform_instance(
platform=connector_platform_name,
name=f"{connector_details.connector_database}.{schema}.{table}",
platform_instance=connector_details.platform_instance,
env=connector_details.env,
)
else:
logging.warning(f"Connector database missing for Catalog '{catalog_name}'.")
return None

def gen_siblings_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate sibling workunit for both trino dataset and its connector source dataset
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=Siblings(
primary=self.config.trino_as_primary, siblings=[source_dataset_urn]
),
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=source_dataset_urn,
aspect=Siblings(
primary=not self.config.trino_as_primary, siblings=[dataset_urn]
),
).as_workunit()

def gen_lineage_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate dataset to source connector lineage workunit
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(
upstreams=[
Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW)
]
),
).as_workunit()

def _process_table(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
table: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
yield from super()._process_table(
dataset_name, inspector, schema, table, sql_config
)
if self.config.ingest_lineage_to_connectors:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
source_dataset_urn = self._get_source_dataset_urn(
dataset_name, inspector, schema, table
)
if source_dataset_urn:
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)

def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
yield from super()._process_view(
dataset_name, inspector, schema, view, sql_config
)
if self.config.ingest_lineage_to_connectors:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
source_dataset_urn = self._get_source_dataset_urn(
dataset_name, inspector, schema, view
)
if source_dataset_urn:
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)

@classmethod
def create(cls, config_dict, ctx):
config = TrinoConfig.parse_obj(config_dict)
Expand Down
Loading

0 comments on commit 5921a33

Please sign in to comment.