From e390d6635c26536f5896e5a08f21e16584eadec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Sat, 1 Feb 2020 17:57:31 +0100 Subject: [PATCH 1/2] [AIRFLOW-6706] Lazy load operator extra links --- airflow/plugins_manager.py | 30 +-------------------- airflow/serialization/serialized_objects.py | 22 ++++++++++----- 2 files changed, 16 insertions(+), 36 deletions(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 84002cdd86ab7..02874893eac1f 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -23,7 +23,7 @@ import re import sys import types -from typing import Any, Callable, Dict, List, Optional, Set, Type +from typing import Any, Callable, Dict, List, Optional, Type import pkg_resources @@ -113,33 +113,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins): return airflow_plugins -def register_inbuilt_operator_links() -> None: - """ - Register all the Operators Links that are already defined for the operators - in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator) - - This is required to populate the "whitelist" of allowed classes when deserializing operator links - """ - inbuilt_operator_links: Set[Type] = set() - - try: - from airflow.providers.google.cloud.operators.bigquery import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long - inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink]) - except ImportError: - pass - - try: - from airflow.providers.qubole.operators.qubole import QDSLink # pylint: disable=R0401 - inbuilt_operator_links.update([QDSLink]) - except ImportError: - pass - - registered_operator_link_classes.update({ - "{}.{}".format(link.__module__, link.__name__): link - for link in inbuilt_operator_links - }) - - def is_valid_plugin(plugin_obj, existing_plugins): """ Check whether a potential object is a subclass of @@ -329,4 +302,3 @@ def integrate_plugins() -> None: integrate_hook_plugins() integrate_executor_plugins() integrate_macro_plugins() - register_inbuilt_operator_links() diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 3634a4448056f..e48e735488967 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -20,7 +20,7 @@ import enum import logging from inspect import Parameter, signature -from typing import Any, Dict, Iterable, Optional, Set, Union +from typing import Any, Dict, Iterable, List, Optional, Set, Union import cattr import pendulum @@ -32,8 +32,15 @@ from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.json_schema import Validator, load_dag_schema from airflow.settings import json +from airflow.utils.module_loading import import_string from airflow.www.utils import get_python_source +WHITELIST_OPERATOR_EXTRA_LINKS: List[str] = [ + "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink", + "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink", + "airflow.providers.qubole.operators.qubole.QDSLink" +] + class BaseSerialization: """BaseSerialization provides utils for serialization.""" @@ -418,15 +425,16 @@ def _deserialize_operator_extra_links( # } # ) - _operator_link_class, data = list(_operator_links_source.items())[0] - - if _operator_link_class in registered_operator_link_classes: - single_op_link_class_name = registered_operator_link_classes[_operator_link_class] + _operator_link_class_path, data = list(_operator_links_source.items())[0] + if _operator_link_class_path in WHITELIST_OPERATOR_EXTRA_LINKS: + single_op_link_class = import_string(_operator_link_class_path) + elif _operator_link_class_path in registered_operator_link_classes: + single_op_link_class = registered_operator_link_classes[_operator_link_class_path] else: - raise KeyError("Operator Link class %r not registered" % _operator_link_class) + raise KeyError("Operator Link class %r not registered" % _operator_link_class_path) op_predefined_extra_link: BaseOperatorLink = cattr.structure( - data, single_op_link_class_name) + data, single_op_link_class) op_predefined_extra_links.update( {op_predefined_extra_link.name: op_predefined_extra_link} From 7511cd88c36421c8e05722d67979eab4ab4c7989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Sat, 1 Feb 2020 21:54:37 +0100 Subject: [PATCH 2/2] fixup! [AIRFLOW-6706] Lazy load operator extra links --- airflow/serialization/serialized_objects.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index e48e735488967..5d570bb0a13cd 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -35,7 +35,7 @@ from airflow.utils.module_loading import import_string from airflow.www.utils import get_python_source -WHITELIST_OPERATOR_EXTRA_LINKS: List[str] = [ +BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [ "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink", "airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink", "airflow.providers.qubole.operators.qubole.QDSLink" @@ -426,7 +426,7 @@ def _deserialize_operator_extra_links( # ) _operator_link_class_path, data = list(_operator_links_source.items())[0] - if _operator_link_class_path in WHITELIST_OPERATOR_EXTRA_LINKS: + if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS: single_op_link_class = import_string(_operator_link_class_path) elif _operator_link_class_path in registered_operator_link_classes: single_op_link_class = registered_operator_link_classes[_operator_link_class_path]