Skip to content

Commit

Permalink
Merge branch 'master' into jp--datahub-slack-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jayacryl authored Jul 17, 2024
2 parents f9a8325 + 5f79621 commit ac4adf7
Show file tree
Hide file tree
Showing 24 changed files with 708 additions and 507 deletions.
19 changes: 9 additions & 10 deletions metadata-ingestion/scripts/modeldocgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from dataclasses import Field, dataclass, field
from enum import auto
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Iterable
from typing import Any, Dict, Iterable, List, Optional, Tuple

import avro.schema
import click

from datahub.configuration.common import ConfigEnum, ConfigModel
from datahub.configuration.common import ConfigEnum, PermissiveConfigModel
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
Expand All @@ -22,7 +22,9 @@
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsClass,
BrowsePathsV2Class,
DatasetPropertiesClass,
DatasetSnapshotClass,
ForeignKeyConstraintClass,
Expand All @@ -34,8 +36,6 @@
StringTypeClass,
SubTypesClass,
TagAssociationClass,
BrowsePathsV2Class,
BrowsePathEntryClass,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -493,30 +493,29 @@ def strip_types(field_path: str) -> str:
],
)


@dataclass
class EntityAspectName:
entityName: str
aspectName: str


@dataclass
class AspectPluginConfig:
class AspectPluginConfig(PermissiveConfigModel):
className: str
enabled: bool
supportedEntityAspectNames: List[EntityAspectName]
supportedEntityAspectNames: List[EntityAspectName] = []
packageScan: Optional[List[str]] = None
supportedOperations: Optional[List[str]] = None


@dataclass
class PluginConfiguration:
class PluginConfiguration(PermissiveConfigModel):
aspectPayloadValidators: Optional[List[AspectPluginConfig]] = None
mutationHooks: Optional[List[AspectPluginConfig]] = None
mclSideEffects: Optional[List[AspectPluginConfig]] = None
mcpSideEffects: Optional[List[AspectPluginConfig]] = None


class EntityRegistry(ConfigModel):
class EntityRegistry(PermissiveConfigModel):
entities: List[EntityDefinition]
events: Optional[List[EventDefinition]]
plugins: Optional[PluginConfiguration] = None
Expand Down
35 changes: 31 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import datetime
import logging
from abc import ABCMeta, abstractmethod
Expand All @@ -10,6 +11,7 @@
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -97,6 +99,7 @@ def report_log(
context: Optional[str] = None,
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Expand All @@ -109,7 +112,8 @@ def report_log(
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
"""

stacklevel = 2
# One for this method, and one for the containing report_* call.
stacklevel = stacklevel + 2

log_key = f"{title}-{message}"
entries = self._entries[level]
Expand All @@ -118,6 +122,8 @@ def report_log(
context = f"{context[:_MAX_CONTEXT_STRING_LENGTH]} ..."

log_content = f"{message} => {context}" if context else message
if title:
log_content = f"{title}: {log_content}"
if exc:
log_content += f"{log_content}: {exc}"

Expand Down Expand Up @@ -255,9 +261,10 @@ def report_failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=False
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def failure(
Expand All @@ -266,9 +273,10 @@ def failure(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=True
StructuredLogLevel.ERROR, message, title, context, exc, log=log
)

def info(
Expand All @@ -277,11 +285,30 @@ def info(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=True
StructuredLogLevel.INFO, message, title, context, exc, log=log
)

@contextlib.contextmanager
def report_exc(
self,
message: LiteralString,
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
# suppresses the exception in addition to reporting it.
try:
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
)

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)
Expand Down
40 changes: 28 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,19 @@ def _notify_reporters_on_ingestion_completion(self) -> None:
for reporter in self.reporters:
try:
reporter.on_completion(
status="CANCELLED"
if self.final_status == PipelineStatus.CANCELLED
else "FAILURE"
if self.has_failures()
else "SUCCESS"
if self.final_status == PipelineStatus.COMPLETED
else "UNKNOWN",
status=(
"CANCELLED"
if self.final_status == PipelineStatus.CANCELLED
else (
"FAILURE"
if self.has_failures()
else (
"SUCCESS"
if self.final_status == PipelineStatus.COMPLETED
else "UNKNOWN"
)
)
),
report=self._get_structured_report(),
ctx=self.ctx,
)
Expand Down Expand Up @@ -425,7 +431,7 @@ def _time_to_print(self) -> bool:
return True
return False

def run(self) -> None:
def run(self) -> None: # noqa: C901
with contextlib.ExitStack() as stack:
if self.config.flags.generate_memory_profiles:
import memray
Expand All @@ -436,6 +442,8 @@ def run(self) -> None:
)
)

stack.enter_context(self.sink)

self.final_status = PipelineStatus.UNKNOWN
self._notify_reporters_on_ingestion_start()
callback = None
Expand All @@ -460,7 +468,17 @@ def run(self) -> None:
if not self.dry_run:
self.sink.handle_work_unit_start(wu)
try:
record_envelopes = self.extractor.get_records(wu)
# Most of this code is meant to be fully stream-based instead of generating all records into memory.
# However, the extractor in particular will never generate a particularly large list. We want the
# exception reporting to be associated with the source, and not the transformer. As such, we
# need to materialize the generator returned by get_records().
record_envelopes = list(self.extractor.get_records(wu))
except Exception as e:
self.source.get_report().failure(
"Source produced bad metadata", context=wu.id, exc=e
)
continue
try:
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
try:
Expand All @@ -482,9 +500,9 @@ def run(self) -> None:
)
# TODO: Transformer errors should cause the pipeline to fail.

self.extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.extractor.close()
self.source.close()
# no more data is coming, we need to let the transformers produce any additional records if they are holding on to state
for record_envelope in self.transform(
Expand Down Expand Up @@ -518,8 +536,6 @@ def run(self) -> None:

self._notify_reporters_on_ingestion_completion()

self.sink.close()

def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
"""
Transforms the given sequence of records by passing the records through the transformers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeIdentifierMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionResult,
AssertionResultType,
Expand All @@ -40,23 +39,20 @@ class DataQualityMonitoringResult(BaseModel):
VALUE: int


class SnowflakeAssertionsHandler(SnowflakeIdentifierMixin):
class SnowflakeAssertionsHandler:
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
connection: SnowflakeConnection,
identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.report = report
self.logger = logger
self.connection = connection
self.identifiers = identifiers
self._urns_processed: List[str] = []

@property
def identifier_config(self) -> SnowflakeIdentifierConfig:
return self.config

def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -80,10 +76,10 @@ def _gen_platform_instance_wu(self, urn: str) -> MetadataWorkUnit:
return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=DataPlatformInstance(
platform=make_data_platform_urn(self.platform),
platform=make_data_platform_urn(self.identifiers.platform),
instance=(
make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
self.identifiers.platform, self.config.platform_instance
)
if self.config.platform_instance
else None
Expand All @@ -98,7 +94,7 @@ def _process_result_row(
result = DataQualityMonitoringResult.parse_obj(result_row)
assertion_guid = result.METRIC_NAME.split("__")[-1].lower()
status = bool(result.VALUE) # 1 if PASS, 0 if FAIL
assertee = self.get_dataset_identifier(
assertee = self.identifiers.get_dataset_identifier(
result.TABLE_NAME, result.TABLE_SCHEMA, result.TABLE_DATABASE
)
if assertee in discovered_datasets:
Expand All @@ -107,7 +103,7 @@ def _process_result_row(
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.gen_dataset_urn(assertee),
asserteeUrn=self.identifiers.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class SnowflakeIdentifierConfig(
# Changing default value here.
convert_urns_to_lowercase: bool = Field(
default=True,
description="Whether to convert dataset urns to lowercase.",
)


Expand Down Expand Up @@ -210,8 +211,13 @@ class SnowflakeV2Config(
description="Populates view->view and table->view column lineage using DataHub's sql parser.",
)

lazy_schema_resolver: bool = Field(
use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
)

lazy_schema_resolver: bool = Field(
default=True,
description="If enabled, uses lazy schema resolver to resolve schemas for tables and views. "
"This is useful if you have a large number of schemas and want to avoid bulk fetching the schema for each table/view.",
)
Expand Down
Loading

0 comments on commit ac4adf7

Please sign in to comment.