Skip to content

Commit

Permalink
refactor(ingest): clean up pipeline init error handling (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and cccs-Dustin committed Feb 1, 2023
1 parent e110244 commit 32defe9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 48 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def on_failure(
pass


SinkReportType = TypeVar("SinkReportType", bound=SinkReport)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel)
SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True)
Self = TypeVar("Self", bound="Sink")


Expand Down
83 changes: 37 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import contextlib
import itertools
import logging
import os
import platform
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, NoReturn, Optional, cast
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast

import click
import humanfriendly
import psutil

import datahub
from datahub.configuration.common import IgnorableError, PipelineExecutionError
from datahub.configuration.common import (
ConfigModel,
IgnorableError,
PipelineExecutionError,
)
from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.extractor.extractor_registry import extractor_registry
Expand Down Expand Up @@ -101,6 +106,16 @@ class PipelineInitError(Exception):
pass


@contextlib.contextmanager
def _add_init_error_context(step: str) -> Iterator[None]:
"""Enriches any exceptions raised with information about the step that failed."""

try:
yield
except Exception as e:
raise PipelineInitError(f"Failed to {step}: {e}") from e


@dataclass
class CliReport(Report):
cli_version: str = datahub.nice_version_name()
Expand All @@ -121,12 +136,9 @@ class Pipeline:
ctx: PipelineContext
source: Source
extractor: Extractor
sink: Sink
sink: Sink[ConfigModel, SinkReport]
transformers: List[Transformer]

def _raise_initialization_error(self, e: Exception, msg: str) -> NoReturn:
raise PipelineInitError(f"{msg}: {e}") from e

def __init__(
self,
config: PipelineConfig,
Expand All @@ -146,7 +158,7 @@ def __init__(
self.last_time_printed = int(time.time())
self.cli_report = CliReport()

try:
with _add_init_error_context("set up framework context"):
self.ctx = PipelineContext(
run_id=self.config.run_id,
datahub_api=self.config.datahub_api,
Expand All @@ -155,64 +167,43 @@ def __init__(
preview_mode=preview_mode,
pipeline_config=self.config,
)
except Exception as e:
self._raise_initialization_error(e, "Failed to set up framework context")

sink_type = self.config.sink.type
try:
with _add_init_error_context(f"find a registered sink for type {sink_type}"):
sink_class = sink_registry.get(sink_type)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to find a registered sink for type {sink_type}"
)

try:
with _add_init_error_context(f"configure the sink ({sink_type})"):
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
self.sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type {self.config.sink.type} ({sink_class}) configured")
logger.info(f"Sink configured successfully. {self.sink.configured()}")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure sink ({sink_type})"
)

# once a sink is configured, we can configure reporting immediately to get observability
try:
with _add_init_error_context("configure reporters"):
self._configure_reporting(report_to, no_default_report)
except Exception as e:
self._raise_initialization_error(e, "Failed to configure reporters")

try:
source_type = self.config.source.type
source_type = self.config.source.type
with _add_init_error_context(
f"find a registered source for type {source_type}"
):
source_class = source_registry.get(source_type)
except Exception as e:
self._raise_initialization_error(e, "Failed to create source")

try:
self.source: Source = source_class.create(
with _add_init_error_context(f"configure the source ({source_type})"):
self.source = source_class.create(
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
logger.debug(f"Source type {source_type} ({source_class}) configured")
logger.info("Source configured successfully.")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure source ({source_type})"
)

try:
extractor_class = extractor_registry.get(self.config.source.extractor)
extractor_type = self.config.source.extractor
with _add_init_error_context(f"configure the extractor ({extractor_type})"):
extractor_class = extractor_registry.get(extractor_type)
self.extractor = extractor_class(
self.config.source.extractor_config, self.ctx
)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure extractor ({self.config.source.extractor})"
)

try:
with _add_init_error_context("configure transformers"):
self._configure_transforms()
except ValueError as e:
self._raise_initialization_error(e, "Failed to configure transformers")

def _configure_transforms(self) -> None:
self.transformers = []
Expand Down Expand Up @@ -523,7 +514,7 @@ def pretty_print_summary(
)
num_failures_sink = len(self.sink.get_report().failures)
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
fg=self._get_text_color(
running=currently_running,
failures=True,
Expand Down

0 comments on commit 32defe9

Please sign in to comment.