Skip to content

Commit

Permalink
[AIRFLOW-6706] Lazy load operator extra links (apache#7327)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Feb 2, 2020
1 parent 58cc390 commit b180e4b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 36 deletions.
30 changes: 1 addition & 29 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import re
import sys
import types
from typing import Any, Callable, Dict, List, Optional, Set, Type
from typing import Any, Callable, Dict, List, Optional, Type

import pkg_resources

Expand Down Expand Up @@ -113,33 +113,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
return airflow_plugins


def register_inbuilt_operator_links() -> None:
"""
Register all the Operators Links that are already defined for the operators
in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)
This is required to populate the "whitelist" of allowed classes when deserializing operator links
"""
inbuilt_operator_links: Set[Type] = set()

try:
from airflow.providers.google.cloud.operators.bigquery import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long
inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
except ImportError:
pass

try:
from airflow.providers.qubole.operators.qubole import QDSLink # pylint: disable=R0401
inbuilt_operator_links.update([QDSLink])
except ImportError:
pass

registered_operator_link_classes.update({
"{}.{}".format(link.__module__, link.__name__): link
for link in inbuilt_operator_links
})


def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
Expand Down Expand Up @@ -329,4 +302,3 @@ def integrate_plugins() -> None:
integrate_hook_plugins()
integrate_executor_plugins()
integrate_macro_plugins()
register_inbuilt_operator_links()
22 changes: 15 additions & 7 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import enum
import logging
from inspect import Parameter, signature
from typing import Any, Dict, Iterable, Optional, Set, Union
from typing import Any, Dict, Iterable, List, Optional, Set, Union

import cattr
import pendulum
Expand All @@ -32,8 +32,15 @@
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.json_schema import Validator, load_dag_schema
from airflow.settings import json
from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source

BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.qubole.operators.qubole.QDSLink"
]


class BaseSerialization:
"""BaseSerialization provides utils for serialization."""
Expand Down Expand Up @@ -418,15 +425,16 @@ def _deserialize_operator_extra_links(
# }
# )

_operator_link_class, data = list(_operator_links_source.items())[0]

if _operator_link_class in registered_operator_link_classes:
single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
_operator_link_class_path, data = list(_operator_links_source.items())[0]
if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
single_op_link_class = import_string(_operator_link_class_path)
elif _operator_link_class_path in registered_operator_link_classes:
single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
raise KeyError("Operator Link class %r not registered" % _operator_link_class)
raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)

op_predefined_extra_link: BaseOperatorLink = cattr.structure(
data, single_op_link_class_name)
data, single_op_link_class)

op_predefined_extra_links.update(
{op_predefined_extra_link.name: op_predefined_extra_link}
Expand Down

0 comments on commit b180e4b

Please sign in to comment.