From 614b2329c1603ef1e2199044e2cc9e4b7332c2e0 Mon Sep 17 00:00:00 2001 From: eladkal <45845474+eladkal@users.noreply.github.com> Date: Sat, 28 May 2022 23:10:39 +0300 Subject: [PATCH] Replace `use_task_execution_date` with `use_task_logical_date` (#23983) * Replace `use_task_execution_date` with `use_task_logical_date` We have some operators/sensors that use `*_execution_date` as the class parameters. This PR deprecate the usage of these parameters and replace it with `logical_date`. There is no change in functionality, under the hood the functionality already uses `logical_date` this is just about the parameters name as exposed to the users. --- airflow/operators/datetime.py | 15 ++++++++++++--- airflow/operators/weekday.py | 16 ++++++++++++---- airflow/sensors/weekday.py | 22 +++++++++++++++------- tests/operators/test_datetime.py | 21 +++++++++++++++++++-- tests/operators/test_weekday.py | 19 +++++++++++++++++-- tests/sensors/test_weekday_sensor.py | 25 ++++++++++++++++++++----- 6 files changed, 95 insertions(+), 23 deletions(-) diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py index c37a4f9d50c11c..c5a423d5638684 100644 --- a/airflow/operators/datetime.py +++ b/airflow/operators/datetime.py @@ -16,6 +16,7 @@ # under the License. import datetime +import warnings from typing import Iterable, Union from airflow.exceptions import AirflowException @@ -39,7 +40,7 @@ class BranchDateTimeOperator(BaseBranchOperator): ``datetime.datetime.now()`` falls below target_lower or above ``target_upper``. :param target_lower: target lower bound. :param target_upper: target upper bound. - :param use_task_execution_date: If ``True``, uses task's execution day to compare with targets. + :param use_task_logical_date: If ``True``, uses task's logical date to compare with targets. Execution date is useful for backfilling. If ``False``, uses system's date. """ @@ -50,6 +51,7 @@ def __init__( follow_task_ids_if_false: Union[str, Iterable[str]], target_lower: Union[datetime.datetime, datetime.time, None], target_upper: Union[datetime.datetime, datetime.time, None], + use_task_logical_date: bool = False, use_task_execution_date: bool = False, **kwargs, ) -> None: @@ -64,10 +66,17 @@ def __init__( self.target_upper = target_upper self.follow_task_ids_if_true = follow_task_ids_if_true self.follow_task_ids_if_false = follow_task_ids_if_false - self.use_task_execution_date = use_task_execution_date + self.use_task_logical_date = use_task_logical_date + if use_task_execution_date: + self.use_task_logical_date = use_task_execution_date + warnings.warn( + "Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.", + DeprecationWarning, + stacklevel=2, + ) def choose_branch(self, context: Context) -> Union[str, Iterable[str]]: - if self.use_task_execution_date is True: + if self.use_task_logical_date: now = timezone.make_naive(context["logical_date"], self.dag.timezone) else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py index fb35079fe0d3ee..b23d57e9fb1d48 100644 --- a/airflow/operators/weekday.py +++ b/airflow/operators/weekday.py @@ -15,7 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import warnings from typing import Iterable, Union from airflow.operators.branch import BaseBranchOperator @@ -41,7 +41,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator): * ``{WeekDay.TUESDAY}`` * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` - :param use_task_execution_day: If ``True``, uses task's execution day to compare + :param use_task_logical_date: If ``True``, uses task's logical date to compare with is_today. Execution Date is Useful for backfilling. If ``False``, uses system's day of the week. """ @@ -52,6 +52,7 @@ def __init__( follow_task_ids_if_true: Union[str, Iterable[str]], follow_task_ids_if_false: Union[str, Iterable[str]], week_day: Union[str, Iterable[str]], + use_task_logical_date: bool = False, use_task_execution_day: bool = False, **kwargs, ) -> None: @@ -59,11 +60,18 @@ def __init__( self.follow_task_ids_if_true = follow_task_ids_if_true self.follow_task_ids_if_false = follow_task_ids_if_false self.week_day = week_day - self.use_task_execution_day = use_task_execution_day + self.use_task_logical_date = use_task_logical_date + if use_task_execution_day: + self.use_task_logical_date = use_task_execution_day + warnings.warn( + "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.", + DeprecationWarning, + stacklevel=2, + ) self._week_day_num = WeekDay.validate_week_day(week_day) def choose_branch(self, context: Context) -> Union[str, Iterable[str]]: - if self.use_task_execution_day: + if self.use_task_logical_date: now = context["logical_date"] else: now = timezone.make_naive(timezone.utcnow(), self.dag.timezone) diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py index bdf9275e107b5a..5bb4db646f7c4f 100644 --- a/airflow/sensors/weekday.py +++ b/airflow/sensors/weekday.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import warnings from airflow.sensors.base import BaseSensorOperator from airflow.utils import timezone @@ -33,7 +34,7 @@ class DayOfWeekSensor(BaseSensorOperator): weekend_check = DayOfWeekSensor( task_id='weekend_check', week_day='Saturday', - use_task_execution_day=True, + use_task_logical_date=True, dag=dag) **Example** (with multiple day using set): :: @@ -41,7 +42,7 @@ class DayOfWeekSensor(BaseSensorOperator): weekend_check = DayOfWeekSensor( task_id='weekend_check', week_day={'Saturday', 'Sunday'}, - use_task_execution_day=True, + use_task_logical_date=True, dag=dag) **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): :: @@ -52,7 +53,7 @@ class DayOfWeekSensor(BaseSensorOperator): weekend_check = DayOfWeekSensor( task_id='weekend_check', week_day={WeekDay.SATURDAY, WeekDay.SUNDAY}, - use_task_execution_day=True, + use_task_logical_date=True, dag=dag) :param week_day: Day of the week to check (full name). Optionally, a set @@ -64,16 +65,23 @@ class DayOfWeekSensor(BaseSensorOperator): * ``{WeekDay.TUESDAY}`` * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}`` - :param use_task_execution_day: If ``True``, uses task's execution day to compare + :param use_task_logical_date: If ``True``, uses task's logical date to compare with week_day. Execution Date is Useful for backfilling. If ``False``, uses system's day of the week. Useful when you don't want to run anything on weekdays on the system. """ - def __init__(self, *, week_day, use_task_execution_day=False, **kwargs): + def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs): super().__init__(**kwargs) self.week_day = week_day - self.use_task_execution_day = use_task_execution_day + self.use_task_logical_date = use_task_logical_date + if use_task_execution_day: + self.use_task_logical_date = use_task_execution_day + warnings.warn( + "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.", + DeprecationWarning, + stacklevel=2, + ) self._week_day_num = WeekDay.validate_week_day(week_day) def poke(self, context: Context): @@ -82,7 +90,7 @@ def poke(self, context: Context): self.week_day, WeekDay(timezone.utcnow().isoweekday()).name, ) - if self.use_task_execution_day: + if self.use_task_logical_date: return context['logical_date'].isoweekday() in self._week_day_num else: return timezone.utcnow().isoweekday() in self._week_day_num diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py index 2bf4ff5d4a8ad2..bb1f9282a465da 100644 --- a/tests/operators/test_datetime.py +++ b/tests/operators/test_datetime.py @@ -20,6 +20,7 @@ import unittest import freezegun +import pytest from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance as TI @@ -225,10 +226,10 @@ def test_branch_datetime_operator_lower_comparison_outside_range(self): ) @freezegun.freeze_time("2020-12-01 09:00:00") - def test_branch_datetime_operator_use_task_execution_date(self): + def test_branch_datetime_operator_use_task_logical_date(self): """Check if BranchDateTimeOperator uses task execution date""" in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0) - self.branch_op.use_task_execution_date = True + self.branch_op.use_task_logical_date = True self.dr = self.dag.create_dagrun( run_id='manual_exec_date__', start_date=in_between_date, @@ -249,3 +250,19 @@ def test_branch_datetime_operator_use_task_execution_date(self): 'branch_2': State.SKIPPED, } ) + + def test_deprecation_warning(self): + warning_message = ( + """Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.""" + ) + with pytest.warns(DeprecationWarning) as warnings: + BranchDateTimeOperator( + task_id='warning', + follow_task_ids_if_true='branch_1', + follow_task_ids_if_false='branch_2', + target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0), + target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0), + use_task_execution_date=True, + dag=self.dag, + ) + assert warning_message == str(warnings[0].message) diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py index 69ab21a6df84eb..5185e1728cf5b4 100644 --- a/tests/operators/test_weekday.py +++ b/tests/operators/test_weekday.py @@ -134,14 +134,14 @@ def test_branch_follow_true(self, _, weekday): @freeze_time("2021-01-25") # Monday def test_branch_follow_true_with_execution_date(self): - """Checks if BranchDayOfWeekOperator follows true branch when set use_task_execution_day""" + """Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date""" branch_op = BranchDayOfWeekOperator( task_id="make_choice", follow_task_ids_if_true="branch_1", follow_task_ids_if_false="branch_2", week_day="Wednesday", - use_task_execution_day=True, # We compare to DEFAULT_DATE which is Wednesday + use_task_logical_date=True, # We compare to DEFAULT_DATE which is Wednesday dag=self.dag, ) @@ -274,3 +274,18 @@ def test_branch_xcom_push_true_branch(self): for ti in tis: if ti.task_id == 'make_choice': assert ti.xcom_pull(task_ids='make_choice') == 'branch_1' + + def test_deprecation_warning(self): + warning_message = ( + """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.""" + ) + with pytest.warns(DeprecationWarning) as warnings: + BranchDayOfWeekOperator( + task_id="week_day_warn", + follow_task_ids_if_true="branch_1", + follow_task_ids_if_false="branch_2", + week_day="Monday", + use_task_execution_day=True, + dag=self.dag, + ) + assert warning_message == str(warnings[0].message) diff --git a/tests/sensors/test_weekday_sensor.py b/tests/sensors/test_weekday_sensor.py index 04e133fa1410ce..5aa8bdbf823b8c 100644 --- a/tests/sensors/test_weekday_sensor.py +++ b/tests/sensors/test_weekday_sensor.py @@ -72,7 +72,7 @@ def tearDown(self): ) def test_weekday_sensor_true(self, _, week_day): op = DayOfWeekSensor( - task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag + task_id='weekday_sensor_check_true', week_day=week_day, use_task_logical_date=True, dag=self.dag ) op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) assert op.week_day == week_day @@ -83,7 +83,7 @@ def test_weekday_sensor_false(self): poke_interval=1, timeout=2, week_day='Tuesday', - use_task_execution_day=True, + use_task_logical_date=True, dag=self.dag, ) with pytest.raises(AirflowSensorTimeout): @@ -95,7 +95,7 @@ def test_invalid_weekday_number(self): DayOfWeekSensor( task_id='weekday_sensor_invalid_weekday_num', week_day=invalid_week_day, - use_task_execution_day=True, + use_task_logical_date=True, dag=self.dag, ) @@ -110,7 +110,7 @@ def test_weekday_sensor_with_invalid_type(self): DayOfWeekSensor( task_id='weekday_sensor_check_true', week_day=invalid_week_day, - use_task_execution_day=True, + use_task_logical_date=True, dag=self.dag, ) @@ -120,8 +120,23 @@ def test_weekday_sensor_timeout_with_set(self): poke_interval=1, timeout=2, week_day={WeekDay.MONDAY, WeekDay.TUESDAY}, - use_task_execution_day=True, + use_task_logical_date=True, dag=self.dag, ) with pytest.raises(AirflowSensorTimeout): op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True) + + def test_deprecation_warning(self): + warning_message = ( + """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.""" + ) + with pytest.warns(DeprecationWarning) as warnings: + DayOfWeekSensor( + task_id='week_day_warn', + poke_interval=1, + timeout=2, + week_day='Tuesday', + use_task_execution_day=True, + dag=self.dag, + ) + assert warning_message == str(warnings[0].message)