diff --git a/airflow/providers/openlineage/extractors/manager.py b/airflow/providers/openlineage/extractors/manager.py index d9bc49350d824f..81425d62558fe4 100644 --- a/airflow/providers/openlineage/extractors/manager.py +++ b/airflow/providers/openlineage/extractors/manager.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -from contextlib import suppress from typing import TYPE_CHECKING, Iterator from airflow.providers.openlineage import conf @@ -24,9 +23,11 @@ from airflow.providers.openlineage.extractors.base import DefaultExtractor from airflow.providers.openlineage.extractors.bash import BashExtractor from airflow.providers.openlineage.extractors.python import PythonExtractor -from airflow.providers.openlineage.utils.utils import get_unknown_source_attribute_run_facet +from airflow.providers.openlineage.utils.utils import ( + get_unknown_source_attribute_run_facet, + try_import_from_string, +) from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.module_loading import import_string if TYPE_CHECKING: from openlineage.client.run import Dataset @@ -35,11 +36,6 @@ from airflow.models import Operator -def try_import_from_string(string): - with suppress(ImportError): - return import_string(string) - - def _iter_extractor_types() -> Iterator[type[BaseExtractor]]: if PythonExtractor is not None: yield PythonExtractor @@ -61,10 +57,15 @@ def __init__(self): self.extractors[operator_class] = extractor for extractor_path in conf.custom_extractors(): - extractor: type[BaseExtractor] = try_import_from_string(extractor_path) + extractor: type[BaseExtractor] | None = try_import_from_string(extractor_path) + if not extractor: + self.log.warning( + "OpenLineage is unable to import custom extractor `%s`; will ignore it.", extractor_path + ) + continue for operator_class in extractor.get_operator_classnames(): if operator_class in self.extractors: - self.log.debug( + self.log.warning( "Duplicate OpenLineage custom extractor found for `%s`. " "`%s` will be used instead of `%s`", operator_class, diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index bc5dcb2fefe7fc..ff6ad63970a665 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -43,6 +43,7 @@ ) from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.log.secrets_masker import Redactable, Redacted, SecretsMasker, should_hide_value_for_key +from airflow.utils.module_loading import import_string if TYPE_CHECKING: from airflow.models import DagRun, TaskInstance @@ -52,6 +53,11 @@ _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" +def try_import_from_string(string: str) -> Any: + with suppress(ImportError): + return import_string(string) + + def get_operator_class(task: BaseOperator) -> type: if task.__class__.__name__ in ("DecoratedMappedOperator", "MappedOperator"): return task.operator_class