diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5226855a870b54..9c2124e874207d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1513,7 +1513,9 @@ def get_serialized_fields(cls): 'parent_dag', '_old_context_manager_dags', 'safe_dag_id', 'last_loaded', '_full_filepath', 'user_defined_filters', 'user_defined_macros', '_schedule_interval', 'partial', '_old_context_manager_dags', - '_pickle_id', '_log', 'is_subdag', 'task_dict' + '_pickle_id', '_log', 'is_subdag', 'task_dict', 'template_searchpath', + 'sla_miss_callback', 'on_success_callback', 'on_failure_callback', + 'template_undefined', 'jinja_environment_kwargs' } return cls.__serialized_fields diff --git a/airflow/serialization/json_schema.py b/airflow/serialization/json_schema.py index 7a09b5b8161754..40b44da8dc842c 100644 --- a/airflow/serialization/json_schema.py +++ b/airflow/serialization/json_schema.py @@ -50,9 +50,9 @@ def iter_errors(self, instance) -> Iterable[jsonschema.exceptions.ValidationErro ... -def load_dag_schema() -> Validator: +def load_dag_schema_dict() -> dict: """ - Load Json Schema for DAG + Load & return Json Schema for DAG as Python dict """ schema_file_name = 'schema.json' schema_file = pkgutil.get_data(__name__, schema_file_name) @@ -61,5 +61,13 @@ def load_dag_schema() -> Validator: raise AirflowException("Schema file {} does not exists".format(schema_file_name)) schema = json.loads(schema_file.decode()) + return schema + + +def load_dag_schema() -> Validator: + """ + Load & Validate Json Schema for DAG + """ + schema = load_dag_schema_dict() jsonschema.Draft7Validator.check_schema(schema) return jsonschema.Draft7Validator(schema) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 16e14af665e1de..0f15d27a09bb5a 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -92,7 +92,10 @@ "start_date": { "$ref": "#/definitions/datetime" }, "end_date": { "$ref": "#/definitions/datetime" }, "dagrun_timeout": { "$ref": "#/definitions/timedelta" }, - "doc_md": { "type" : "string"} + "doc_md": { "type" : "string"}, + "_default_view": { "type" : "string"}, + "_access_control": {"$ref": "#/definitions/dict" }, + "is_paused_upon_creation": { "type": "boolean" } }, "required": [ "params", diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index b6bba94bc0cd1e..8e8ed2037caeb6 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -35,6 +35,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator +from airflow.serialization.json_schema import load_dag_schema_dict from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG from tests.test_utils.mock_operators import CustomOperator, CustomOpLink, GoogleLink @@ -53,6 +54,7 @@ } }, "start_date": 1564617600.0, + "is_paused_upon_creation": False, "params": {}, "_dag_id": "simple_dag", "fileloc": None, @@ -107,6 +109,7 @@ def make_simple_dag(): "depends_on_past": False, }, start_date=datetime(2019, 8, 1), + is_paused_upon_creation=False, ) BaseOperator(task_id='simple_task', dag=dag, owner='airflow') CustomOperator(task_id='custom_task', dag=dag) @@ -462,6 +465,18 @@ def test_extra_serialized_field_and_multiple_operator_links(self): google_link_from_plugin = simple_task.get_extra_links(test_date, GoogleLink.name) self.assertEqual("https://www.google.com", google_link_from_plugin) + def test_dag_serialized_fields_with_schema(self): + """ + Additional Properties are disabled on DAGs. This test verifies that all the + keys in DAG.get_serialized_fields are listed in Schema definition. + """ + dag_schema: dict = load_dag_schema_dict()["definitions"]["dag"]["properties"] + + # The parameters we add manually in Serialization needs to be ignored + ignored_keys: set = {"is_subdag", "tasks"} + dag_params: set = set(dag_schema.keys()) - ignored_keys + self.assertEqual(set(DAG.get_serialized_fields()), dag_params) + if __name__ == '__main__': unittest.main()