From 3983d2407f1bd9af176161edffea65364b4b94b4 Mon Sep 17 00:00:00 2001 From: kaxil Date: Thu, 2 Jan 2020 14:25:12 +0000 Subject: [PATCH 1/2] [AIRFLOW-6425] Serialization: Add missing DAG parameters to Json Schema --- airflow/models/dag.py | 4 +++- airflow/serialization/json_schema.py | 12 ++++++++++-- airflow/serialization/schema.json | 5 ++++- airflow/serialization/serialized_objects.py | 4 ++-- tests/serialization/test_dag_serialization.py | 15 +++++++++++++++ 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5226855a870b54..9c2124e874207d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1513,7 +1513,9 @@ def get_serialized_fields(cls): 'parent_dag', '_old_context_manager_dags', 'safe_dag_id', 'last_loaded', '_full_filepath', 'user_defined_filters', 'user_defined_macros', '_schedule_interval', 'partial', '_old_context_manager_dags', - '_pickle_id', '_log', 'is_subdag', 'task_dict' + '_pickle_id', '_log', 'is_subdag', 'task_dict', 'template_searchpath', + 'sla_miss_callback', 'on_success_callback', 'on_failure_callback', + 'template_undefined', 'jinja_environment_kwargs' } return cls.__serialized_fields diff --git a/airflow/serialization/json_schema.py b/airflow/serialization/json_schema.py index 7a09b5b8161754..0a3e74d3559c84 100644 --- a/airflow/serialization/json_schema.py +++ b/airflow/serialization/json_schema.py @@ -50,9 +50,9 @@ def iter_errors(self, instance) -> Iterable[jsonschema.exceptions.ValidationErro ... -def load_dag_schema() -> Validator: +def load_schema() -> dict: """ - Load Json Schema for DAG + Load & return Json Schema for DAG """ schema_file_name = 'schema.json' schema_file = pkgutil.get_data(__name__, schema_file_name) @@ -61,5 +61,13 @@ def load_dag_schema() -> Validator: raise AirflowException("Schema file {} does not exists".format(schema_file_name)) schema = json.loads(schema_file.decode()) + return schema + + +def load_and_validate_dag_schema() -> Validator: + """ + Load & Validate Json Schema for DAG + """ + schema = load_schema() jsonschema.Draft7Validator.check_schema(schema) return jsonschema.Draft7Validator(schema) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 16e14af665e1de..0f15d27a09bb5a 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -92,7 +92,10 @@ "start_date": { "$ref": "#/definitions/datetime" }, "end_date": { "$ref": "#/definitions/datetime" }, "dagrun_timeout": { "$ref": "#/definitions/timedelta" }, - "doc_md": { "type" : "string"} + "doc_md": { "type" : "string"}, + "_default_view": { "type" : "string"}, + "_access_control": {"$ref": "#/definitions/dict" }, + "is_paused_upon_creation": { "type": "boolean" } }, "required": [ "params", diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 959a48e9e3ecfb..fdb1e67adb7ad5 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -30,7 +30,7 @@ from airflow.models import Connection from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding -from airflow.serialization.json_schema import Validator, load_dag_schema +from airflow.serialization.json_schema import Validator, load_and_validate_dag_schema from airflow.settings import json from airflow.www.utils import get_python_source @@ -475,7 +475,7 @@ def __get_constructor_defaults(): # pylint: disable=no-method-argument _CONSTRUCTOR_PARAMS = __get_constructor_defaults.__func__() # type: ignore del __get_constructor_defaults - _json_schema = load_dag_schema() + _json_schema = load_and_validate_dag_schema() @classmethod def serialize_dag(cls, dag: DAG) -> dict: diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index b6bba94bc0cd1e..a3390707db4a4c 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -35,6 +35,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator +from airflow.serialization.json_schema import load_schema from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, GoogleLink @@ -53,6 +54,7 @@ } }, "start_date": 1564617600.0, + "is_paused_upon_creation": False, "params": {}, "_dag_id": "simple_dag", "fileloc": None, @@ -107,6 +109,7 @@ def make_simple_dag(): "depends_on_past": False, }, start_date=datetime(2019, 8, 1), + is_paused_upon_creation=False, ) BaseOperator(task_id='simple_task', dag=dag, owner='airflow') CustomOperator(task_id='custom_task', dag=dag) @@ -462,6 +465,18 @@ def test_extra_serialized_field_and_multiple_operator_links(self): google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name) self.assertEqual("https://www.google.com", google_link_from_plugin) + def test_dag_serialized_fields_with_schema(self): + """ + Additional Properties are disabled on DAGs. This test verifies that all the + keys in DAG.get_serialized_fields are listed in Schema definition. + """ + dag_schema: dict = load_schema()["definitions"]["dag"]["properties"] + + # The parameters we add manually in Serialization needs to be ignored + ignored_keys: set = {"is_subdag", "tasks"} + dag_params: set = set(dag_schema.keys()) - ignored_keys + self.assertEqual(set(DAG.get_serialized_fields()), dag_params) + if __name__ == '__main__': unittest.main() From 1fa2aac90d87fc41e7a1820b056080fae0411082 Mon Sep 17 00:00:00 2001 From: kaxil Date: Thu, 2 Jan 2020 17:19:40 +0000 Subject: [PATCH 2/2] Rename funcs --- airflow/serialization/json_schema.py | 8 ++++---- airflow/serialization/serialized_objects.py | 4 ++-- tests/serialization/test_dag_serialization.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow/serialization/json_schema.py b/airflow/serialization/json_schema.py index 0a3e74d3559c84..40b44da8dc842c 100644 --- a/airflow/serialization/json_schema.py +++ b/airflow/serialization/json_schema.py @@ -50,9 +50,9 @@ def iter_errors(self, instance) -> Iterable[jsonschema.exceptions.ValidationErro ... -def load_schema() -> dict: +def load_dag_schema_dict() -> dict: """ - Load & return Json Schema for DAG + Load & return Json Schema for DAG as Python dict """ schema_file_name = 'schema.json' schema_file = pkgutil.get_data(__name__, schema_file_name) @@ -64,10 +64,10 @@ def load_schema() -> dict: return schema -def load_and_validate_dag_schema() -> Validator: +def load_dag_schema() -> Validator: """ Load & Validate Json Schema for DAG """ - schema = load_schema() + schema = load_dag_schema_dict() jsonschema.Draft7Validator.check_schema(schema) return jsonschema.Draft7Validator(schema) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index fdb1e67adb7ad5..959a48e9e3ecfb 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -30,7 +30,7 @@ from airflow.models import Connection from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding -from airflow.serialization.json_schema import Validator, load_and_validate_dag_schema +from airflow.serialization.json_schema import Validator, load_dag_schema from airflow.settings import json from airflow.www.utils import get_python_source @@ -475,7 +475,7 @@ def __get_constructor_defaults(): # pylint: disable=no-method-argument _CONSTRUCTOR_PARAMS = __get_constructor_defaults.__func__() # type: ignore del __get_constructor_defaults - _json_schema = load_and_validate_dag_schema() + _json_schema = load_dag_schema() @classmethod def serialize_dag(cls, dag: DAG) -> dict: diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index a3390707db4a4c..8e8ed2037caeb6 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -35,7 +35,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator -from airflow.serialization.json_schema import load_schema +from airflow.serialization.json_schema import load_dag_schema_dict from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, GoogleLink @@ -470,7 +470,7 @@ def test_dag_serialized_fields_with_schema(self): Additional Properties are disabled on DAGs. This test verifies that all the keys in DAG.get_serialized_fields are listed in Schema definition. """ - dag_schema: dict = load_schema()["definitions"]["dag"]["properties"] + dag_schema: dict = load_dag_schema_dict()["definitions"]["dag"]["properties"] # The parameters we add manually in Serialization needs to be ignored ignored_keys: set = {"is_subdag", "tasks"}