diff --git a/metadata-ingestion/src/datahub/ingestion/api/sink.py b/metadata-ingestion/src/datahub/ingestion/api/sink.py index 460b56683cb53..e2f67c00c1e69 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/sink.py +++ b/metadata-ingestion/src/datahub/ingestion/api/sink.py @@ -77,8 +77,8 @@ def on_failure( pass -SinkReportType = TypeVar("SinkReportType", bound=SinkReport) -SinkConfig = TypeVar("SinkConfig", bound=ConfigModel) +SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True) +SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True) Self = TypeVar("Self", bound="Sink") diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 7e2fad0165565..cf91109b2b211 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -1,3 +1,4 @@ +import contextlib import itertools import logging import os @@ -5,19 +6,23 @@ import sys import time from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, NoReturn, Optional, cast +from typing import Any, Dict, Iterable, Iterator, List, Optional, cast import click import humanfriendly import psutil import datahub -from datahub.configuration.common import IgnorableError, PipelineExecutionError +from datahub.configuration.common import ( + ConfigModel, + IgnorableError, + PipelineExecutionError, +) from datahub.ingestion.api.committable import CommitPolicy from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener from datahub.ingestion.api.report import Report -from datahub.ingestion.api.sink import Sink, WriteCallback +from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback from datahub.ingestion.api.source import Extractor, Source from datahub.ingestion.api.transform import Transformer from datahub.ingestion.extractor.extractor_registry import extractor_registry @@ -101,6 +106,16 @@ class PipelineInitError(Exception): pass +@contextlib.contextmanager +def _add_init_error_context(step: str) -> Iterator[None]: + """Enriches any exceptions raised with information about the step that failed.""" + + try: + yield + except Exception as e: + raise PipelineInitError(f"Failed to {step}: {e}") from e + + @dataclass class CliReport(Report): cli_version: str = datahub.nice_version_name() @@ -121,12 +136,9 @@ class Pipeline: ctx: PipelineContext source: Source extractor: Extractor - sink: Sink + sink: Sink[ConfigModel, SinkReport] transformers: List[Transformer] - def _raise_initialization_error(self, e: Exception, msg: str) -> NoReturn: - raise PipelineInitError(f"{msg}: {e}") from e - def __init__( self, config: PipelineConfig, @@ -146,7 +158,7 @@ def __init__( self.last_time_printed = int(time.time()) self.cli_report = CliReport() - try: + with _add_init_error_context("set up framework context"): self.ctx = PipelineContext( run_id=self.config.run_id, datahub_api=self.config.datahub_api, @@ -155,64 +167,43 @@ def __init__( preview_mode=preview_mode, pipeline_config=self.config, ) - except Exception as e: - self._raise_initialization_error(e, "Failed to set up framework context") sink_type = self.config.sink.type - try: + with _add_init_error_context(f"find a registered sink for type {sink_type}"): sink_class = sink_registry.get(sink_type) - except Exception as e: - self._raise_initialization_error( - e, f"Failed to find a registered sink for type {sink_type}" - ) - try: + with _add_init_error_context(f"configure the sink ({sink_type})"): sink_config = self.config.sink.dict().get("config") or {} - self.sink: Sink = sink_class.create(sink_config, self.ctx) - logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") + self.sink = sink_class.create(sink_config, self.ctx) + logger.debug(f"Sink type {self.config.sink.type} ({sink_class}) configured") logger.info(f"Sink configured successfully. {self.sink.configured()}") - except Exception as e: - self._raise_initialization_error( - e, f"Failed to configure sink ({sink_type})" - ) # once a sink is configured, we can configure reporting immediately to get observability - try: + with _add_init_error_context("configure reporters"): self._configure_reporting(report_to, no_default_report) - except Exception as e: - self._raise_initialization_error(e, "Failed to configure reporters") - try: - source_type = self.config.source.type + source_type = self.config.source.type + with _add_init_error_context( + f"find a registered source for type {source_type}" + ): source_class = source_registry.get(source_type) - except Exception as e: - self._raise_initialization_error(e, "Failed to create source") - try: - self.source: Source = source_class.create( + with _add_init_error_context(f"configure the source ({source_type})"): + self.source = source_class.create( self.config.source.dict().get("config", {}), self.ctx ) - logger.debug(f"Source type:{source_type},{source_class} configured") + logger.debug(f"Source type {source_type} ({source_class}) configured") logger.info("Source configured successfully.") - except Exception as e: - self._raise_initialization_error( - e, f"Failed to configure source ({source_type})" - ) - try: - extractor_class = extractor_registry.get(self.config.source.extractor) + extractor_type = self.config.source.extractor + with _add_init_error_context(f"configure the extractor ({extractor_type})"): + extractor_class = extractor_registry.get(extractor_type) self.extractor = extractor_class( self.config.source.extractor_config, self.ctx ) - except Exception as e: - self._raise_initialization_error( - e, f"Failed to configure extractor ({self.config.source.extractor})" - ) - try: + with _add_init_error_context("configure transformers"): self._configure_transforms() - except ValueError as e: - self._raise_initialization_error(e, "Failed to configure transformers") def _configure_transforms(self) -> None: self.transformers = [] @@ -523,7 +514,7 @@ def pretty_print_summary( ) num_failures_sink = len(self.sink.get_report().failures) click.secho( - f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}", + f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}", fg=self._get_text_color( running=currently_running, failures=True,