Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/hive): ignore sampling for tagged column/table #10096

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 133 additions & 66 deletions metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@
from sqlalchemy.exc import ProgrammingError
from typing_extensions import Concatenate, ParamSpec

from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.profiling.common import (
Cardinality,
convert_to_cardinality,
)
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import EditableSchemaMetadata
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
Expand Down Expand Up @@ -296,6 +299,9 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):

query_combiner: SQLAlchemyQueryCombiner

platform: str
env: str

def _get_columns_to_profile(self) -> List[str]:
if not self.config.any_field_level_metrics_enabled():
return []
Expand Down Expand Up @@ -670,6 +676,16 @@ def generate_dataset_profile( # noqa: C901 (complexity)
profile.columnCount = len(all_columns)
columns_to_profile = set(self._get_columns_to_profile())

(
ignore_table_sampling,
columns_list_to_ignore_sampling,
) = _get_columns_to_ignore_sampling(
self.dataset_name,
self.config.tags_to_ignore_sampling,
self.platform,
self.env,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
self.query_combiner.flush()

Expand Down Expand Up @@ -732,76 +748,80 @@ def generate_dataset_profile( # noqa: C901 (complexity)
if not profile.rowCount:
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
not ignore_table_sampling
and column not in columns_list_to_ignore_sampling
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
self._get_dataset_column_sample_values(column_profile, column)

elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if (
anshbansal marked this conversation as resolved.
Show resolved Hide resolved
type_ == ProfilerDataType.INT
or type_ == ProfilerDataType.FLOAT
or type_ == ProfilerDataType.NUMERIC
):
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)
self._get_dataset_column_mean(column_profile, column)
self._get_dataset_column_median(column_profile, column)
self._get_dataset_column_stdev(column_profile, column)

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
if cardinality in {
Cardinality.FEW,
Cardinality.MANY,
Cardinality.VERY_MANY,
}:
self._get_dataset_column_quantiles(column_profile, column)
self._get_dataset_column_histogram(column_profile, column)

elif type_ == ProfilerDataType.STRING:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)
elif type_ == ProfilerDataType.DATETIME:
self._get_dataset_column_min(column_profile, column)
self._get_dataset_column_max(column_profile, column)

# FIXME: Re-add histogram once kl_divergence has been modified to support datetimes

if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

else:
if cardinality in [
Cardinality.ONE,
Cardinality.TWO,
Cardinality.VERY_FEW,
Cardinality.FEW,
]:
self._get_dataset_column_distinct_value_frequencies(
column_profile,
column,
)

logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
self.query_combiner.flush()
Expand Down Expand Up @@ -896,6 +916,7 @@ class DatahubGEProfiler:

base_engine: Engine
platform: str # passed from parent source config
env: str

# The actual value doesn't matter, it just matters that we use it consistently throughout.
_datasource_name_base: str = "my_sqlalchemy_datasource"
Expand All @@ -906,12 +927,15 @@ def __init__(
report: SQLSourceReport,
config: GEProfilingConfig,
platform: str,
env: str = "PROD",
):
self.report = report
self.config = config
self.times_taken = []
self.total_row_count = 0

self.env = env

# TRICKY: The call to `.engine` is quite important here. Connection.connect()
# returns a "branched" connection, which does not actually use a new underlying
# DB-API object from the connection pool. Engine.connect() does what we want to
Expand Down Expand Up @@ -1151,6 +1175,8 @@ def _generate_single_profile(
self.report,
custom_sql,
query_combiner,
self.platform,
self.env,
).generate_dataset_profile()

time_taken = timer.elapsed_seconds()
Expand Down Expand Up @@ -1309,3 +1335,44 @@ def create_bigquery_temp_table(
return bigquery_temp_table
finally:
raw_connection.close()


def _get_columns_to_ignore_sampling(
dataset_name: str, tags_to_ignore: Optional[List[str]], platform: str, env: str
) -> Tuple[bool, List[str]]:
logger.debug("Collecting columns to ignore for sampling")

ignore_table: bool = False
columns_to_ignore: List[str] = []

if not tags_to_ignore:
return ignore_table, columns_to_ignore

dataset_urn = mce_builder.make_dataset_urn(
name=dataset_name, platform=platform, env=env
)

datahub_graph = get_default_graph()

dataset_tags = datahub_graph.get_tags(dataset_urn)
if dataset_tags:
ignore_table = any(
tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
for tag_association in dataset_tags.tags
)

if not ignore_table:
metadata = datahub_graph.get_aspect(
entity_urn=dataset_urn, aspect_type=EditableSchemaMetadata
)

if metadata:
for schemaField in metadata.editableSchemaFieldInfo:
if schemaField.globalTags:
columns_to_ignore.extend(
schemaField.fieldPath
for tag_association in schemaField.globalTags.tags
if tag_association.tag.split("urn:li:tag:")[1] in tags_to_ignore
)

return ignore_table, columns_to_ignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ class GEProfilingConfig(ConfigModel):
description="Whether to profile external tables. Only Snowflake and Redshift supports this.",
)

tags_to_ignore_sampling: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of tags to ignore sampling."
" If not specified, tables will be sampled based on `use_sampling`."
),
)

@pydantic.root_validator(pre=True)
def deprecate_bigquery_temp_table_schema(cls, values):
# TODO: Update docs to remove mention of this field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
report=self.report,
config=self.config.profiling,
platform=self.platform,
env=self.config.env,
)

def get_profile_args(self) -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def get_profiler_instance(
report=self.report,
config=self.config.profiling,
platform=self.platform,
env=self.config.env,
)

def is_dataset_eligible_for_profiling(
Expand Down
Loading