-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Correctly store non-default Nones in serialized tasks/dags #8772
Correctly store non-default Nones in serialized tasks/dags #8772
Conversation
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future.
Curious failure:
I wonder why I didn't see that locally. |
somewhere |
Curious, the DAG with the problem has |
Oh, cos it's parsing in a subprocess. Hmmmm! I don't think we need to parse all dags in a subprocess, just one is enough. |
Changed it to only parse example_dags (rather than all the provider dags too) in the subprocess, and the rest are just round-tripped. This cut the test time for test_serialized_dags from 8s to 4-5s on my laptop too as a bonus :) |
The main thing I was fixning here was `start_date=utcnow()` which is always going to be wrong (discovered via a test in apache#8772). While I was updating the DAG I updated it to use context manager, and shift operators.
The main thing I was fixning here was `start_date=utcnow()` which is always going to be wrong (discovered via a test in #8772). While I was updating the DAG I updated it to use context manager, and shift operators.
@@ -648,6 +683,23 @@ def test_dag_serialized_fields_with_schema(self): | |||
dag_params: set = set(dag_schema.keys()) - ignored_keys | |||
self.assertEqual(set(DAG.get_serialized_fields()), dag_params) | |||
|
|||
def test_operator_subclass_changing_base_defaults(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaxil this is the case that means we need to check the MRO for defaults.
I think storing all non-defaults is nicer anyway - we can show these in the UI that way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't merge it yet, testing something locally.
Just wanted to test it with the following diff:
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 0ab1d80ff..d3a1ff54a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -416,60 +416,6 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
return op
- @classmethod
- def _is_constructor_param(cls, attrname: str, instance: Any) -> bool:
- # Check all super classes too
- return any(
- attrname in cls.__constructor_params_for_subclass(typ)
- for typ in type(instance).mro()
- )
-
- @classmethod
- def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: Any) -> bool:
- """
- Check if ``value`` is the default value for ``attrname`` as set by the
- constructor of ``instance``, or any of it's parent classes up
- to-and-including BaseOperator.
-
- .. seealso::
-
- :py:meth:`BaseSerialization._value_is_hardcoded_default`
- """
-
- def _is_default(ctor_params, attrname, value):
- if attrname not in ctor_params:
- return False
- ctor_default = ctor_params[attrname].default
-
- # Also returns True if the value is an empty list or empty dict.
- # This is done to account for the case where the default value of
- # the field is None but has the ``field = field or {}`` set.
- return ctor_default is value or (ctor_default is None and value in [{}, []])
-
- for typ in type(instance).mro():
- ctor_params = cls.__constructor_params_for_subclass(typ)
-
- if _is_default(ctor_params, attrname, value):
- if typ is BaseOperator:
- return True
- # For added fun, if a subclass sets a different default value to the
- # same argument, (i.e. a subclass changes default of do_xcom_push from
- # True to False), we then do want to include it.
- #
- # This is because we set defaults based on BaseOperators
- # defaults, so if we didn't set this when inflating we'd
- # have the wrong value
-
- base_op_ctor_params = cls.__constructor_params_for_subclass(BaseOperator)
- if attrname not in base_op_ctor_params:
- return True
- return base_op_ctor_params[attrname].default == value
-
- if typ is BaseOperator:
- break
-
- return False
-
@classmethod
def _is_excluded(cls, var: Any, attrname: str, op: BaseOperator):
if var is not None and op.has_dag() and attrname.endswith("_date"):
All tests pass except one with the above diff.
Failing test:
def validate_deserialized_task(self, serialized_task, task,):
"""Verify non-airflow operators are casted to BaseOperator."""
assert isinstance(serialized_task, SerializedBaseOperator)
assert not isinstance(task, SerializedBaseOperator)
assert isinstance(task, BaseOperator)
fields_to_check = task.get_serialized_fields() - {
# Checked separately
'_task_type', 'subdag',
# Type is exluded, so don't check it
'_log',
# List vs tuple. Check separately
'template_fields',
# We store the string, real dag has the actual code
'on_failure_callback', 'on_success_callback', 'on_retry_callback',
}
assert serialized_task.task_type == task.task_type
for field in fields_to_check:
> assert getattr(serialized_task, field) == getattr(task, field), \
f'{task.dag.dag_id}.{task.task_id}.{field} does not match'
E AssertionError: example_gcp_gke.pod_task.resources does not match
E assert None == []
E +None
E -[]
tests/serialization/test_dag_serialization.py:353: AssertionError
That fails because we are not storing []
as it is "default"-ish (check below), but on re-inflating it's getting set to None, but that isn't the same as the value in the dag.
airflow/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Lines 185 to 187 in db1b51d
if kwargs.get('xcom_push') is not None: | |
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") | |
super().__init__(*args, resources=None, **kwargs) |
self.resources = self._set_resources(resources) |
airflow/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Lines 316 to 320 in db1b51d
@staticmethod | |
def _set_resources(resources): | |
if not resources: | |
return [] | |
return [Resources(**resources)] |
We can still keep the changes in the PR to support that in the future where we could serialise all properties of dags. The performance impact should not be minimal because of the use of LRU cache.
Feel free to merge once we add a comment in the doc on why we kept the change. |
I was wrong about where that fn was used, so I've removed that complex code an replace it with this in the test instead: if serialized_task.resources is None:
assert task.resources is None or task.resources == []
else:
assert serialized_task.resources == task.resources |
(We've got that code in this diff comment, we can bring it back when we want it, but that will need more changes elsewhere to serialize more fields |
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. (cherry picked from commit a715aa6)
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. (cherry picked from commit a715aa6)
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. (cherry picked from commit a715aa6)
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. (cherry picked from commit a715aa6)
The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. (cherry picked from commit a715aa6)
The default schedule_interval for a DAG is
@daily
, soschedule_interval=None
is actually not the default, but we were notstoring any null attributes previously.
This meant that upon re-inflating the DAG the schedule_interval would
become @daily.
This fixes that problem, and extends the test to look at all the
serialized attributes in our round-trip tests, rather than just the few
that the webserver cared about.
It doesn't change the serialization format, it just changes what/when
values were stored.
This solution was more complex than I hoped for, but the test case in
test_operator_subclass_changing_base_defaults is a real one that the
round trip tests discovered from the DatabricksSubmitRunOperator -- I
have just captured it in this test in case that specific operator
changes in future.
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.