Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6706] Lazy load operator extra links #7327

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import re
import sys
import types
from typing import Any, Callable, Dict, List, Optional, Set, Type
from typing import Any, Callable, Dict, List, Optional, Type

import pkg_resources

Expand Down Expand Up @@ -113,33 +113,6 @@ def load_entrypoint_plugins(entry_points, airflow_plugins):
return airflow_plugins


def register_inbuilt_operator_links() -> None:
"""
Register all the Operators Links that are already defined for the operators
in the "airflow" project. Example: QDSLink (Operator Link for Qubole Operator)
This is required to populate the "whitelist" of allowed classes when deserializing operator links
"""
inbuilt_operator_links: Set[Type] = set()

try:
from airflow.providers.google.cloud.operators.bigquery import BigQueryConsoleLink, BigQueryConsoleIndexableLink # noqa E501 # pylint: disable=R0401,line-too-long
inbuilt_operator_links.update([BigQueryConsoleLink, BigQueryConsoleIndexableLink])
except ImportError:
pass

try:
from airflow.providers.qubole.operators.qubole import QDSLink # pylint: disable=R0401
inbuilt_operator_links.update([QDSLink])
except ImportError:
pass

registered_operator_link_classes.update({
"{}.{}".format(link.__module__, link.__name__): link
for link in inbuilt_operator_links
})


def is_valid_plugin(plugin_obj, existing_plugins):
"""
Check whether a potential object is a subclass of
Expand Down Expand Up @@ -329,4 +302,3 @@ def integrate_plugins() -> None:
integrate_hook_plugins()
integrate_executor_plugins()
integrate_macro_plugins()
register_inbuilt_operator_links()
22 changes: 15 additions & 7 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import enum
import logging
from inspect import Parameter, signature
from typing import Any, Dict, Iterable, Optional, Set, Union
from typing import Any, Dict, Iterable, List, Optional, Set, Union

import cattr
import pendulum
Expand All @@ -32,8 +32,15 @@
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.serialization.json_schema import Validator, load_dag_schema
from airflow.settings import json
from airflow.utils.module_loading import import_string
from airflow.www.utils import get_python_source

BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.qubole.operators.qubole.QDSLink"
]


class BaseSerialization:
"""BaseSerialization provides utils for serialization."""
Expand Down Expand Up @@ -418,15 +425,16 @@ def _deserialize_operator_extra_links(
# }
# )

_operator_link_class, data = list(_operator_links_source.items())[0]

if _operator_link_class in registered_operator_link_classes:
single_op_link_class_name = registered_operator_link_classes[_operator_link_class]
_operator_link_class_path, data = list(_operator_links_source.items())[0]
if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
single_op_link_class = import_string(_operator_link_class_path)
elif _operator_link_class_path in registered_operator_link_classes:
single_op_link_class = registered_operator_link_classes[_operator_link_class_path]
else:
raise KeyError("Operator Link class %r not registered" % _operator_link_class)
raise KeyError("Operator Link class %r not registered" % _operator_link_class_path)

op_predefined_extra_link: BaseOperatorLink = cattr.structure(
data, single_op_link_class_name)
data, single_op_link_class)

op_predefined_extra_links.update(
{op_predefined_extra_link.name: op_predefined_extra_link}
Expand Down