Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): clean up pipeline init error handling #6817

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def on_failure(
pass


SinkReportType = TypeVar("SinkReportType", bound=SinkReport)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel)
SinkReportType = TypeVar("SinkReportType", bound=SinkReport, covariant=True)
SinkConfig = TypeVar("SinkConfig", bound=ConfigModel, covariant=True)
Self = TypeVar("Self", bound="Sink")


Expand Down
83 changes: 37 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import contextlib
import itertools
import logging
import os
import platform
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, NoReturn, Optional, cast
from typing import Any, Dict, Iterable, Iterator, List, Optional, cast

import click
import humanfriendly
import psutil

import datahub
from datahub.configuration.common import IgnorableError, PipelineExecutionError
from datahub.configuration.common import (
ConfigModel,
IgnorableError,
PipelineExecutionError,
)
from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.extractor.extractor_registry import extractor_registry
Expand Down Expand Up @@ -101,6 +106,16 @@ class PipelineInitError(Exception):
pass


@contextlib.contextmanager
def _add_init_error_context(step: str) -> Iterator[None]:
"""Enriches any exceptions raised with information about the step that failed."""

try:
yield
except Exception as e:
raise PipelineInitError(f"Failed to {step}: {e}") from e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failed while executing step {step}?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps the error messages the same as before e.g. "Failed to set up framework context" - i didn't want to make the errors too verbose



@dataclass
class CliReport(Report):
cli_version: str = datahub.nice_version_name()
Expand All @@ -121,12 +136,9 @@ class Pipeline:
ctx: PipelineContext
source: Source
extractor: Extractor
sink: Sink
sink: Sink[ConfigModel, SinkReport]
transformers: List[Transformer]

def _raise_initialization_error(self, e: Exception, msg: str) -> NoReturn:
raise PipelineInitError(f"{msg}: {e}") from e

def __init__(
self,
config: PipelineConfig,
Expand All @@ -146,7 +158,7 @@ def __init__(
self.last_time_printed = int(time.time())
self.cli_report = CliReport()

try:
with _add_init_error_context("set up framework context"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this pattern..

self.ctx = PipelineContext(
run_id=self.config.run_id,
datahub_api=self.config.datahub_api,
Expand All @@ -155,64 +167,43 @@ def __init__(
preview_mode=preview_mode,
pipeline_config=self.config,
)
except Exception as e:
self._raise_initialization_error(e, "Failed to set up framework context")

sink_type = self.config.sink.type
try:
with _add_init_error_context(f"find a registered sink for type {sink_type}"):
sink_class = sink_registry.get(sink_type)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to find a registered sink for type {sink_type}"
)

try:
with _add_init_error_context(f"configure the sink ({sink_type})"):
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
self.sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type {self.config.sink.type} ({sink_class}) configured")
logger.info(f"Sink configured successfully. {self.sink.configured()}")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure sink ({sink_type})"
)

# once a sink is configured, we can configure reporting immediately to get observability
try:
with _add_init_error_context("configure reporters"):
self._configure_reporting(report_to, no_default_report)
except Exception as e:
self._raise_initialization_error(e, "Failed to configure reporters")

try:
source_type = self.config.source.type
source_type = self.config.source.type
with _add_init_error_context(
f"find a registered source for type {source_type}"
):
source_class = source_registry.get(source_type)
except Exception as e:
self._raise_initialization_error(e, "Failed to create source")

try:
self.source: Source = source_class.create(
with _add_init_error_context(f"configure the source ({source_type})"):
self.source = source_class.create(
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
logger.debug(f"Source type {source_type} ({source_class}) configured")
logger.info("Source configured successfully.")
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure source ({source_type})"
)

try:
extractor_class = extractor_registry.get(self.config.source.extractor)
extractor_type = self.config.source.extractor
with _add_init_error_context(f"configure the extractor ({extractor_type})"):
extractor_class = extractor_registry.get(extractor_type)
self.extractor = extractor_class(
self.config.source.extractor_config, self.ctx
)
except Exception as e:
self._raise_initialization_error(
e, f"Failed to configure extractor ({self.config.source.extractor})"
)

try:
with _add_init_error_context("configure transformers"):
self._configure_transforms()
except ValueError as e:
self._raise_initialization_error(e, "Failed to configure transformers")

def _configure_transforms(self) -> None:
self.transformers = []
Expand Down Expand Up @@ -512,7 +503,7 @@ def pretty_print_summary(
)
num_failures_sink = len(self.sink.get_report().failures)
click.secho(
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures {'so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
f"{'⏳' if currently_running else ''} Pipeline {'running' if currently_running else 'finished'} with at least {num_failures_source+num_failures_sink} failures{' so far' if currently_running else ''}; produced {workunits_produced} events {duration_message}",
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
fg=self._get_text_color(
running=currently_running,
failures=True,
Expand Down