diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 5751b2d934267..d31b65f003426 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -45,13 +45,16 @@ from sqlalchemy.exc import ProgrammingError from typing_extensions import Concatenate, ParamSpec +from datahub.emitter import mce_builder from datahub.emitter.mce_builder import get_sys_time +from datahub.ingestion.graph.client import get_default_graph from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.profiling.common import ( Cardinality, convert_to_cardinality, ) from datahub.ingestion.source.sql.sql_common import SQLSourceReport +from datahub.metadata.com.linkedin.pegasus2avro.schema import EditableSchemaMetadata from datahub.metadata.schema_classes import ( DatasetFieldProfileClass, DatasetProfileClass, @@ -296,6 +299,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase): query_combiner: SQLAlchemyQueryCombiner + platform: str + env: str + def _get_columns_to_profile(self) -> List[str]: if not self.config.any_field_level_metrics_enabled(): return [] @@ -670,6 +676,16 @@ def generate_dataset_profile( # noqa: C901 (complexity) profile.columnCount = len(all_columns) columns_to_profile = set(self._get_columns_to_profile()) + ( + ignore_table_sampling, + columns_list_to_ignore_sampling, + ) = _get_columns_to_ignore_sampling( + self.dataset_name, + self.config.tags_to_ignore_sampling, + self.platform, + self.env, + ) + logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries") self.query_combiner.flush() @@ -732,76 +748,80 @@ def generate_dataset_profile( # noqa: C901 (complexity) if not profile.rowCount: continue - self._get_dataset_column_sample_values(column_profile, column) - if ( - type_ == ProfilerDataType.INT - or type_ == ProfilerDataType.FLOAT - or type_ == ProfilerDataType.NUMERIC + not ignore_table_sampling + and column not in columns_list_to_ignore_sampling ): - self._get_dataset_column_min(column_profile, column) - self._get_dataset_column_max(column_profile, column) - self._get_dataset_column_mean(column_profile, column) - self._get_dataset_column_median(column_profile, column) - self._get_dataset_column_stdev(column_profile, column) - - if cardinality in [ - Cardinality.ONE, - Cardinality.TWO, - Cardinality.VERY_FEW, - ]: - self._get_dataset_column_distinct_value_frequencies( - column_profile, - column, - ) - if cardinality in { - Cardinality.FEW, - Cardinality.MANY, - Cardinality.VERY_MANY, - }: - self._get_dataset_column_quantiles(column_profile, column) - self._get_dataset_column_histogram(column_profile, column) - - elif type_ == ProfilerDataType.STRING: - if cardinality in [ - Cardinality.ONE, - Cardinality.TWO, - Cardinality.VERY_FEW, - Cardinality.FEW, - ]: - self._get_dataset_column_distinct_value_frequencies( - column_profile, - column, - ) + self._get_dataset_column_sample_values(column_profile, column) - elif type_ == ProfilerDataType.DATETIME: - self._get_dataset_column_min(column_profile, column) - self._get_dataset_column_max(column_profile, column) - - # FIXME: Re-add histogram once kl_divergence has been modified to support datetimes - - if cardinality in [ - Cardinality.ONE, - Cardinality.TWO, - Cardinality.VERY_FEW, - Cardinality.FEW, - ]: - self._get_dataset_column_distinct_value_frequencies( - column_profile, - column, - ) + if ( + type_ == ProfilerDataType.INT + or type_ == ProfilerDataType.FLOAT + or type_ == ProfilerDataType.NUMERIC + ): + self._get_dataset_column_min(column_profile, column) + self._get_dataset_column_max(column_profile, column) + self._get_dataset_column_mean(column_profile, column) + self._get_dataset_column_median(column_profile, column) + self._get_dataset_column_stdev(column_profile, column) + + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + ]: + self._get_dataset_column_distinct_value_frequencies( + column_profile, + column, + ) + if cardinality in { + Cardinality.FEW, + Cardinality.MANY, + Cardinality.VERY_MANY, + }: + self._get_dataset_column_quantiles(column_profile, column) + self._get_dataset_column_histogram(column_profile, column) + + elif type_ == ProfilerDataType.STRING: + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + self._get_dataset_column_distinct_value_frequencies( + column_profile, + column, + ) - else: - if cardinality in [ - Cardinality.ONE, - Cardinality.TWO, - Cardinality.VERY_FEW, - Cardinality.FEW, - ]: - self._get_dataset_column_distinct_value_frequencies( - column_profile, - column, - ) + elif type_ == ProfilerDataType.DATETIME: + self._get_dataset_column_min(column_profile, column) + self._get_dataset_column_max(column_profile, column) + + # FIXME: Re-add histogram once kl_divergence has been modified to support datetimes + + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + self._get_dataset_column_distinct_value_frequencies( + column_profile, + column, + ) + + else: + if cardinality in [ + Cardinality.ONE, + Cardinality.TWO, + Cardinality.VERY_FEW, + Cardinality.FEW, + ]: + self._get_dataset_column_distinct_value_frequencies( + column_profile, + column, + ) logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries") self.query_combiner.flush() @@ -896,6 +916,7 @@ class DatahubGEProfiler: base_engine: Engine platform: str # passed from parent source config + env: str # The actual value doesn't matter, it just matters that we use it consistently throughout. _datasource_name_base: str = "my_sqlalchemy_datasource" @@ -906,12 +927,15 @@ def __init__( report: SQLSourceReport, config: GEProfilingConfig, platform: str, + env: str = "PROD", ): self.report = report self.config = config self.times_taken = [] self.total_row_count = 0 + self.env = env + # TRICKY: The call to `.engine` is quite important here. Connection.connect() # returns a "branched" connection, which does not actually use a new underlying # DB-API object from the connection pool. Engine.connect() does what we want to @@ -1151,6 +1175,8 @@ def _generate_single_profile( self.report, custom_sql, query_combiner, + self.platform, + self.env, ).generate_dataset_profile() time_taken = timer.elapsed_seconds() @@ -1309,3 +1335,44 @@ def create_bigquery_temp_table( return bigquery_temp_table finally: raw_connection.close() + + +def _get_columns_to_ignore_sampling( + dataset_name: str, tags_to_ignore: Optional[List[str]], platform: str, env: str +) -> Tuple[bool, List[str]]: + logger.debug("Collecting columns to ignore for sampling") + + ignore_table: bool = False + columns_to_ignore: List[str] = [] + + if not tags_to_ignore: + return ignore_table, columns_to_ignore + + dataset_urn = mce_builder.make_dataset_urn( + name=dataset_name, platform=platform, env=env + ) + + datahub_graph = get_default_graph() + + dataset_tags = datahub_graph.get_tags(dataset_urn) + if dataset_tags: + ignore_table = any( + tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore + for tag_association in dataset_tags.tags + ) + + if not ignore_table: + metadata = datahub_graph.get_aspect( + entity_urn=dataset_urn, aspect_type=EditableSchemaMetadata + ) + + if metadata: + for schemaField in metadata.editableSchemaFieldInfo: + if schemaField.globalTags: + columns_to_ignore.extend( + schemaField.fieldPath + for tag_association in schemaField.globalTags.tags + if tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore + ) + + return ignore_table, columns_to_ignore diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index f340a7b41b7af..2ca6828e0bdfa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -172,6 +172,14 @@ class GEProfilingConfig(ConfigModel): description="Whether to profile external tables. Only Snowflake and Redshift supports this.", ) + tags_to_ignore_sampling: Optional[List[str]] = pydantic.Field( + default=None, + description=( + "Fixed list of tags to ignore sampling." + " If not specified, tables will be sampled based on `use_sampling`." + ), + ) + @pydantic.root_validator(pre=True) def deprecate_bigquery_temp_table_schema(cls, values): # TODO: Update docs to remove mention of this field. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 8e4b6ea83b459..bb10dc95973a7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1166,6 +1166,7 @@ def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": report=self.report, config=self.config.profiling, platform=self.platform, + env=self.config.env, ) def get_profile_args(self) -> Dict: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 30fad9ad584c1..f051dc9e5b550 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -226,6 +226,7 @@ def get_profiler_instance( report=self.report, config=self.config.profiling, platform=self.platform, + env=self.config.env, ) def is_dataset_eligible_for_profiling(