From 850bc2cc9d1ed8841c7173e8a1e15bbd5a925b13 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 27 Jun 2022 12:25:27 +0200 Subject: [PATCH 1/2] Option to define path spec for Redshift lineage generation Fixing s3 urn generation for Redshift copy in lineage generation --- .../datahub/configuration/source_common.py | 2 +- .../datahub/ingestion/source/aws/path_spec.py | 188 ++++++++++++++++++ .../src/datahub/ingestion/source/s3/config.py | 166 +--------------- .../src/datahub/ingestion/source/s3/source.py | 45 ++--- .../datahub/ingestion/source/sql/redshift.py | 38 +++- 5 files changed, 245 insertions(+), 194 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/aws/path_spec.py diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index db71e3b068d55..b65be96ac3e91 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Dict, List, Optional from pydantic import validator from pydantic.fields import Field diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/aws/path_spec.py new file mode 100644 index 0000000000000..0f3be16b396ff --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/path_spec.py @@ -0,0 +1,188 @@ +import logging +import os +import re +from typing import Any, Dict, List, Optional, Tuple, Union + +import parse +import pydantic +from pydantic.fields import Field +from wcmatch import pathlib + +from datahub.configuration.common import ConfigModel +from datahub.ingestion.source.aws.s3_util import is_s3_uri + +# hide annoying debug errors from py4j +logging.getLogger("py4j").setLevel(logging.ERROR) +logger: logging.Logger = logging.getLogger(__name__) + +SUPPORTED_FILE_TYPES: List[str] = ["csv", "tsv", "json", "parquet", "avro"] +SUPPORTED_COMPRESSIONS: List[str] = ["gz", "bz2"] + + +class PathSpec(ConfigModel): + class Config: + arbitrary_types_allowed = True + + include: str = Field( + description="Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details." + ) + exclude: Optional[List[str]] = Field( + default=None, + description="list of paths in glob pattern which will be excluded while scanning for the datasets", + ) + file_types: List[str] = Field( + default=SUPPORTED_FILE_TYPES, + description="Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.", + ) + + default_extension: Optional[str] = Field( + description="For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.", + ) + + table_name: Optional[str] = Field( + default=None, + description="Display name of the dataset.Combination of named variableds from include path and strings", + ) + + enable_compression: bool = Field( + default=True, + description="Enable or disable processing compressed files. Currenly .gz and .bz files are supported.", + ) + + sample_files: bool = Field( + default=True, + description="Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled", + ) + + # to be set internally + _parsable_include: str + _compiled_include: parse.Parser + _glob_include: str + _is_s3: bool + + def allowed(self, path: str) -> bool: + logger.debug(f"Checking file to inclusion: {path}") + if not pathlib.PurePath(path).globmatch( + self._glob_include, flags=pathlib.GLOBSTAR + ): + return False + logger.debug(f"{path} matched include ") + if self.exclude: + for exclude_path in self.exclude: + if pathlib.PurePath(path).globmatch( + exclude_path, flags=pathlib.GLOBSTAR + ): + return False + logger.debug(f"{path} is not excluded") + ext = os.path.splitext(path)[1].strip(".") + + if (ext == "" and self.default_extension is None) and ( + ext != "*" and ext not in self.file_types + ): + return False + + logger.debug(f"{path} had selected extension {ext}") + logger.debug(f"{path} allowed for dataset creation") + return True + + def is_s3(self): + return self._is_s3 + + @classmethod + def get_parsable_include(cls, include: str) -> str: + parsable_include = include + for i in range(parsable_include.count("*")): + parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1) + return parsable_include + + def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]: + return self._compiled_include.parse(path) + + @pydantic.root_validator() + def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: + + if "**" in values["include"]: + raise ValueError("path_spec.include cannot contain '**'") + + if values.get("file_types") is None: + values["file_types"] = SUPPORTED_FILE_TYPES + else: + for file_type in values["file_types"]: + if file_type not in SUPPORTED_FILE_TYPES: + raise ValueError( + f"file type {file_type} not in supported file types. Please specify one from {SUPPORTED_FILE_TYPES}" + ) + + if values.get("default_extension") is not None: + if values.get("default_extension") not in SUPPORTED_FILE_TYPES: + raise ValueError( + f"default extension {values.get('default_extension')} not in supported default file extension. Please specify one from {SUPPORTED_FILE_TYPES}" + ) + + include_ext = os.path.splitext(values["include"])[1].strip(".") + if ( + include_ext not in values["file_types"] + and include_ext != "*" + and not values["default_extension"] + and include_ext not in SUPPORTED_COMPRESSIONS + ): + raise ValueError( + f"file type specified ({include_ext}) in path_spec.include is not in specified file " + f'types. Please select one from {values.get("file_types")} or specify ".*" to allow all types' + ) + + values["_parsable_include"] = PathSpec.get_parsable_include(values["include"]) + logger.debug(f'Setting _parsable_include: {values.get("_parsable_include")}') + compiled_include_tmp = parse.compile(values["_parsable_include"]) + values["_compiled_include"] = compiled_include_tmp + logger.debug(f'Setting _compiled_include: {values["_compiled_include"]}') + values["_glob_include"] = re.sub(r"\{[^}]+\}", "*", values["include"]) + logger.debug(f'Setting _glob_include: {values.get("_glob_include")}') + + if values.get("table_name") is None: + if "{table}" in values["include"]: + values["table_name"] = "{table}" + else: + logger.debug(f"include fields: {compiled_include_tmp.named_fields}") + logger.debug( + f"table_name fields: {parse.compile(values['table_name']).named_fields}" + ) + if not all( + x in values["_compiled_include"].named_fields + for x in parse.compile(values["table_name"]).named_fields + ): + raise ValueError( + "Not all named variables used in path_spec.table_name are specified in " + "path_spec.include" + ) + + if values.get("exclude") is not None: + for exclude_path in values["exclude"]: + if len(parse.compile(exclude_path).named_fields) != 0: + raise ValueError( + "path_spec.exclude should not contain any named variables" + ) + + values["_is_s3"] = is_s3_uri(values["include"]) + if not values["_is_s3"]: + # Sampling only makes sense on s3 currently + values["sample_files"] = False + logger.debug(f'Setting _is_s3: {values.get("_is_s3")}') + return values + + def _extract_table_name(self, named_vars: dict) -> str: + if self.table_name is None: + raise ValueError("path_spec.table_name is not set") + return self.table_name.format_map(named_vars) + + def extract_table_name_and_path(self, path: str) -> Tuple[str, str]: + parsed_vars = self.get_named_vars(path) + if parsed_vars is None or "table" not in parsed_vars.named: + return os.path.basename(path), path + else: + include = self.include + depth = include.count("/", 0, include.find("{table}")) + table_path = ( + "/".join(path.split("/")[:depth]) + "/" + parsed_vars.named["table"] + ) + return self._extract_table_name(parsed_vars.named), table_path diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index 43fa4a5c4bede..dfffdcaa9708d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -1,181 +1,23 @@ import logging -import os -import re -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional -import parse import pydantic from pydantic.fields import Field -from wcmatch import pathlib -from datahub.configuration.common import AllowDenyPattern, ConfigModel +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import ( EnvBasedSourceConfigBase, PlatformSourceConfigBase, ) from datahub.ingestion.source.aws.aws_common import AwsSourceConfig -from datahub.ingestion.source.aws.s3_util import get_bucket_name, is_s3_uri +from datahub.ingestion.source.aws.path_spec import PathSpec +from datahub.ingestion.source.aws.s3_util import get_bucket_name from datahub.ingestion.source.s3.profiling import DataLakeProfilerConfig # hide annoying debug errors from py4j logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) -SUPPORTED_FILE_TYPES: List[str] = ["csv", "tsv", "json", "parquet", "avro"] -SUPPORTED_COMPRESSIONS: List[str] = ["gz", "bz2"] - - -class PathSpec(ConfigModel): - class Config: - arbitrary_types_allowed = True - - include: str = Field( - description="Path to table (s3 or local file system). Name variable {table} is used to mark the folder with dataset. In absence of {table}, file level dataset will be created. Check below examples for more details." - ) - exclude: Optional[List[str]] = Field( - default=None, - description="list of paths in glob pattern which will be excluded while scanning for the datasets", - ) - file_types: List[str] = Field( - default=SUPPORTED_FILE_TYPES, - description="Files with extenstions specified here (subset of default value) only will be scanned to create dataset. Other files will be omitted.", - ) - - default_extension: Optional[str] = Field( - description="For files without extension it will assume the specified file type. If it is not set the files without extensions will be skipped.", - ) - - table_name: Optional[str] = Field( - default=None, - description="Display name of the dataset.Combination of named variableds from include path and strings", - ) - - enable_compression: bool = Field( - default=True, - description="Enable or disable processing compressed files. Currenly .gz and .bz files are supported.", - ) - - sample_files: bool = Field( - default=True, - description="Not listing all the files but only taking a handful amount of sample file to infer the schema. File count and file size calculation will be disabled. This can affect performance significantly if enabled", - ) - - # to be set internally - _parsable_include: str - _compiled_include: parse.Parser - _glob_include: str - _is_s3: bool - - def allowed(self, path: str) -> bool: - logger.debug(f"Checking file to inclusion: {path}") - if not pathlib.PurePath(path).globmatch( - self._glob_include, flags=pathlib.GLOBSTAR - ): - return False - logger.debug(f"{path} matched include ") - if self.exclude: - for exclude_path in self.exclude: - if pathlib.PurePath(path).globmatch( - exclude_path, flags=pathlib.GLOBSTAR - ): - return False - logger.debug(f"{path} is not excluded") - ext = os.path.splitext(path)[1].strip(".") - - if (ext == "" and self.default_extension is None) and ( - ext != "*" and ext not in self.file_types - ): - return False - - logger.debug(f"{path} had selected extension {ext}") - logger.debug(f"{path} allowed for dataset creation") - return True - - def is_s3(self): - return self._is_s3 - - @classmethod - def get_parsable_include(cls, include: str) -> str: - parsable_include = include - for i in range(parsable_include.count("*")): - parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1) - return parsable_include - - def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]: - return self._compiled_include.parse(path) - - @pydantic.root_validator() - def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: - - if "**" in values["include"]: - raise ValueError("path_spec.include cannot contain '**'") - - if values.get("file_types") is None: - values["file_types"] = SUPPORTED_FILE_TYPES - else: - for file_type in values["file_types"]: - if file_type not in SUPPORTED_FILE_TYPES: - raise ValueError( - f"file type {file_type} not in supported file types. Please specify one from {SUPPORTED_FILE_TYPES}" - ) - - if values.get("default_extension") is not None: - if values.get("default_extension") not in SUPPORTED_FILE_TYPES: - raise ValueError( - f"default extension {values.get('default_extension')} not in supported default file extension. Please specify one from {SUPPORTED_FILE_TYPES}" - ) - - include_ext = os.path.splitext(values["include"])[1].strip(".") - if ( - include_ext not in values["file_types"] - and include_ext != "*" - and not values["default_extension"] - and include_ext not in SUPPORTED_COMPRESSIONS - ): - raise ValueError( - f"file type specified ({include_ext}) in path_spec.include is not in specified file " - f'types. Please select one from {values.get("file_types")} or specify ".*" to allow all types' - ) - - values["_parsable_include"] = PathSpec.get_parsable_include(values["include"]) - logger.debug(f'Setting _parsable_include: {values.get("_parsable_include")}') - compiled_include_tmp = parse.compile(values["_parsable_include"]) - values["_compiled_include"] = compiled_include_tmp - logger.debug(f'Setting _compiled_include: {values["_compiled_include"]}') - values["_glob_include"] = re.sub(r"\{[^}]+\}", "*", values["include"]) - logger.debug(f'Setting _glob_include: {values.get("_glob_include")}') - - if values.get("table_name") is None: - if "{table}" in values["include"]: - values["table_name"] = "{table}" - else: - logger.debug(f"include fields: {compiled_include_tmp.named_fields}") - logger.debug( - f"table_name fields: {parse.compile(values['table_name']).named_fields}" - ) - if not all( - x in values["_compiled_include"].named_fields - for x in parse.compile(values["table_name"]).named_fields - ): - raise ValueError( - "Not all named variables used in path_spec.table_name are specified in " - "path_spec.include" - ) - - if values.get("exclude") is not None: - for exclude_path in values["exclude"]: - if len(parse.compile(exclude_path).named_fields) != 0: - raise ValueError( - "path_spec.exclude should not contain any named variables" - ) - - values["_is_s3"] = is_s3_uri(values["include"]) - if not values["_is_s3"]: - # Sampling only makes sense on s3 currently - values["sample_files"] = False - logger.debug(f'Setting _is_s3: {values.get("_is_s3")}') - return values - class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase): path_specs: Optional[List[PathSpec]] = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 29f54dc9449e3..dcdce5f4ff59d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -60,13 +60,14 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.path_spec import PathSpec from datahub.ingestion.source.aws.s3_util import ( get_bucket_name, get_bucket_relative_path, get_key_prefix, strip_s3_prefix, ) -from datahub.ingestion.source.s3.config import DataLakeSourceConfig, PathSpec +from datahub.ingestion.source.s3.config import DataLakeSourceConfig from datahub.ingestion.source.s3.profiling import _SingleTableProfiler from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.ingestion.source.schema_inference import avro, csv_tsv, json, parquet @@ -474,7 +475,8 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: extension = pathlib.Path(table_data.full_path).suffix if path_spec.enable_compression and ( - extension[1:] in datahub.ingestion.source.s3.config.SUPPORTED_COMPRESSIONS + extension[1:] + in datahub.ingestion.source.aws.path_spec.SUPPORTED_COMPRESSIONS ): # Removing the compression extension and using the one before that like .json.gz -> .json extension = pathlib.Path(table_data.full_path).with_suffix("").suffix @@ -756,35 +758,18 @@ def extract_table_data( ) -> TableData: logger.debug(f"Getting table data for path: {path}") - parsed_vars = path_spec.get_named_vars(path) + table_name, table_path = path_spec.extract_table_name_and_path(path) table_data = None - if parsed_vars is None or "table" not in parsed_vars.named: - table_data = TableData( - display_name=os.path.basename(path), - is_s3=path_spec.is_s3(), - full_path=path, - partitions=None, - timestamp=timestamp, - table_path=path, - number_of_files=1, - size_in_bytes=size, - ) - else: - include = path_spec.include - depth = include.count("/", 0, include.find("{table}")) - table_path = ( - "/".join(path.split("/")[:depth]) + "/" + parsed_vars.named["table"] - ) - table_data = TableData( - display_name=self.extract_table_name(path_spec, parsed_vars.named), - is_s3=path_spec.is_s3(), - full_path=path, - partitions=None, - timestamp=timestamp, - table_path=table_path, - number_of_files=1, - size_in_bytes=size, - ) + table_data = TableData( + display_name=table_name, + is_s3=path_spec.is_s3(), + full_path=path, + partitions=None, + timestamp=timestamp, + table_path=table_path, + number_of_files=1, + size_in_bytes=size, + ) return table_data def resolve_templated_folders(self, bucket_name: str, prefix: str) -> Iterable[str]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py index da4db1eb6b1dd..04bc8435250f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py @@ -18,6 +18,7 @@ from sqllineage.runner import LineageRunner import datahub.emitter.mce_builder as builder +from datahub.configuration import ConfigModel from datahub.configuration.source_common import DatasetLineageProviderConfigBase from datahub.configuration.time_window_config import BaseTimeWindowConfig from datahub.emitter import mce_builder @@ -32,6 +33,9 @@ support_status, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws import s3_util +from datahub.ingestion.source.aws.path_spec import PathSpec +from datahub.ingestion.source.aws.s3_util import strip_s3_prefix from datahub.ingestion.source.sql.postgres import PostgresConfig from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, @@ -101,8 +105,31 @@ def __post_init__(self): self.dataset_lineage_type = DatasetLineageTypeClass.TRANSFORMED +class S3LineageProviderConfig(ConfigModel): + """ + Any source that produces s3 lineage from/to Datasets should inherit this class. + """ + + path_specs: List[PathSpec] = Field( + description="List of PathSpec. See below the details about PathSpec" + ) + + +class DatasetS3LineageProviderConfigBase(ConfigModel): + """ + Any source that produces s3 lineage from/to Datasets should inherit this class. + """ + + s3_lineage_config: Optional[S3LineageProviderConfig] = Field( + default=None, description="Common config for S3 lineage generation" + ) + + class RedshiftConfig( - PostgresConfig, BaseTimeWindowConfig, DatasetLineageProviderConfigBase + PostgresConfig, + BaseTimeWindowConfig, + DatasetLineageProviderConfigBase, + DatasetS3LineageProviderConfigBase, ): # Although Amazon Redshift is compatible with Postgres's wire format, # we actually want to use the sqlalchemy-redshift package and dialect @@ -672,6 +699,14 @@ def get_db_name(self, inspector: Inspector = None) -> str: db_name = db_alias return db_name + def _get_s3_path(self, path: str) -> str: + if self.config.s3_lineage_config: + for path_spec in self.config.s3_lineage_config.path_specs: + if path_spec.allowed(path): + table_name, table_path = path_spec.extract_table_name_and_path(path) + return table_path + return path + def _populate_lineage_map( self, query: str, lineage_type: LineageCollectorType ) -> None: @@ -747,6 +782,7 @@ def _populate_lineage_map( f"Only s3 source supported with copy. The source was: {path}.", ) continue + path = strip_s3_prefix(self._get_s3_path(path)) else: platform = LineageDatasetPlatform.REDSHIFT path = f'{db_name}.{db_row["source_schema"]}.{db_row["source_table"]}' From 396daaaf013220040e5afa2834e6e58c3ca979ea Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 27 Jun 2022 13:07:38 +0200 Subject: [PATCH 2/2] Flake8 fixes --- metadata-ingestion/src/datahub/configuration/source_common.py | 2 +- metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/configuration/source_common.py b/metadata-ingestion/src/datahub/configuration/source_common.py index b65be96ac3e91..db71e3b068d55 100644 --- a/metadata-ingestion/src/datahub/configuration/source_common.py +++ b/metadata-ingestion/src/datahub/configuration/source_common.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Dict, Optional from pydantic import validator from pydantic.fields import Field diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py index 04bc8435250f2..51b7a821becc4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/redshift.py @@ -33,7 +33,6 @@ support_status, ) from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws import s3_util from datahub.ingestion.source.aws.path_spec import PathSpec from datahub.ingestion.source.aws.s3_util import strip_s3_prefix from datahub.ingestion.source.sql.postgres import PostgresConfig