Skip to content

Commit

Permalink
Replace use_task_execution_date with use_task_logical_date (apach…
Browse files Browse the repository at this point in the history
…e#23983)

* Replace `use_task_execution_date` with `use_task_logical_date`
We have some operators/sensors that use `*_execution_date` as the class parameters. This PR deprecate the usage of these parameters and replace it with `logical_date`.
There is no change in functionality, under the hood the functionality already uses `logical_date` this is just about the parameters name as exposed to the users.
  • Loading branch information
eladkal authored May 28, 2022
1 parent 882535a commit 614b232
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 23 deletions.
15 changes: 12 additions & 3 deletions airflow/operators/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import datetime
import warnings
from typing import Iterable, Union

from airflow.exceptions import AirflowException
Expand All @@ -39,7 +40,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
``datetime.datetime.now()`` falls below target_lower or above ``target_upper``.
:param target_lower: target lower bound.
:param target_upper: target upper bound.
:param use_task_execution_date: If ``True``, uses task's execution day to compare with targets.
:param use_task_logical_date: If ``True``, uses task's logical date to compare with targets.
Execution date is useful for backfilling. If ``False``, uses system's date.
"""

Expand All @@ -50,6 +51,7 @@ def __init__(
follow_task_ids_if_false: Union[str, Iterable[str]],
target_lower: Union[datetime.datetime, datetime.time, None],
target_upper: Union[datetime.datetime, datetime.time, None],
use_task_logical_date: bool = False,
use_task_execution_date: bool = False,
**kwargs,
) -> None:
Expand All @@ -64,10 +66,17 @@ def __init__(
self.target_upper = target_upper
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.use_task_execution_date = use_task_execution_date
self.use_task_logical_date = use_task_logical_date
if use_task_execution_date:
self.use_task_logical_date = use_task_execution_date
warnings.warn(
"Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)

def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
if self.use_task_execution_date is True:
if self.use_task_logical_date:
now = timezone.make_naive(context["logical_date"], self.dag.timezone)
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
Expand Down
16 changes: 12 additions & 4 deletions airflow/operators/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import warnings
from typing import Iterable, Union

from airflow.operators.branch import BaseBranchOperator
Expand All @@ -41,7 +41,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
:param use_task_execution_day: If ``True``, uses task's execution day to compare
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
"""
Expand All @@ -52,18 +52,26 @@ def __init__(
follow_task_ids_if_true: Union[str, Iterable[str]],
follow_task_ids_if_false: Union[str, Iterable[str]],
week_day: Union[str, Iterable[str]],
use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
if self.use_task_execution_day:
if self.use_task_logical_date:
now = context["logical_date"]
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
Expand Down
22 changes: 15 additions & 7 deletions airflow/sensors/weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import warnings

from airflow.sensors.base import BaseSensorOperator
from airflow.utils import timezone
Expand All @@ -33,15 +34,15 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day='Saturday',
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)
**Example** (with multiple day using set): ::
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={'Saturday', 'Sunday'},
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)
**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
Expand All @@ -52,7 +53,7 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
use_task_execution_day=True,
use_task_logical_date=True,
dag=dag)
:param week_day: Day of the week to check (full name). Optionally, a set
Expand All @@ -64,16 +65,23 @@ class DayOfWeekSensor(BaseSensorOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
:param use_task_execution_day: If ``True``, uses task's execution day to compare
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with week_day. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
"""

def __init__(self, *, week_day, use_task_execution_day=False, **kwargs):
def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
super().__init__(**kwargs)
self.week_day = week_day
self.use_task_execution_day = use_task_execution_day
self.use_task_logical_date = use_task_logical_date
if use_task_execution_day:
self.use_task_logical_date = use_task_execution_day
warnings.warn(
"Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
DeprecationWarning,
stacklevel=2,
)
self._week_day_num = WeekDay.validate_week_day(week_day)

def poke(self, context: Context):
Expand All @@ -82,7 +90,7 @@ def poke(self, context: Context):
self.week_day,
WeekDay(timezone.utcnow().isoweekday()).name,
)
if self.use_task_execution_day:
if self.use_task_logical_date:
return context['logical_date'].isoweekday() in self._week_day_num
else:
return timezone.utcnow().isoweekday() in self._week_day_num
21 changes: 19 additions & 2 deletions tests/operators/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import unittest

import freezegun
import pytest

from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance as TI
Expand Down Expand Up @@ -225,10 +226,10 @@ def test_branch_datetime_operator_lower_comparison_outside_range(self):
)

@freezegun.freeze_time("2020-12-01 09:00:00")
def test_branch_datetime_operator_use_task_execution_date(self):
def test_branch_datetime_operator_use_task_logical_date(self):
"""Check if BranchDateTimeOperator uses task execution date"""
in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
self.branch_op.use_task_execution_date = True
self.branch_op.use_task_logical_date = True
self.dr = self.dag.create_dagrun(
run_id='manual_exec_date__',
start_date=in_between_date,
Expand All @@ -249,3 +250,19 @@ def test_branch_datetime_operator_use_task_execution_date(self):
'branch_2': State.SKIPPED,
}
)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDateTimeOperator(
task_id='warning',
follow_task_ids_if_true='branch_1',
follow_task_ids_if_false='branch_2',
target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
use_task_execution_date=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
19 changes: 17 additions & 2 deletions tests/operators/test_weekday.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ def test_branch_follow_true(self, _, weekday):

@freeze_time("2021-01-25") # Monday
def test_branch_follow_true_with_execution_date(self):
"""Checks if BranchDayOfWeekOperator follows true branch when set use_task_execution_day"""
"""Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date"""

branch_op = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Wednesday",
use_task_execution_day=True, # We compare to DEFAULT_DATE which is Wednesday
use_task_logical_date=True, # We compare to DEFAULT_DATE which is Wednesday
dag=self.dag,
)

Expand Down Expand Up @@ -274,3 +274,18 @@ def test_branch_xcom_push_true_branch(self):
for ti in tis:
if ti.task_id == 'make_choice':
assert ti.xcom_pull(task_ids='make_choice') == 'branch_1'

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
BranchDayOfWeekOperator(
task_id="week_day_warn",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Monday",
use_task_execution_day=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)
25 changes: 20 additions & 5 deletions tests/sensors/test_weekday_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def tearDown(self):
)
def test_weekday_sensor_true(self, _, week_day):
op = DayOfWeekSensor(
task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag
task_id='weekday_sensor_check_true', week_day=week_day, use_task_logical_date=True, dag=self.dag
)
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
assert op.week_day == week_day
Expand All @@ -83,7 +83,7 @@ def test_weekday_sensor_false(self):
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
Expand All @@ -95,7 +95,7 @@ def test_invalid_weekday_number(self):
DayOfWeekSensor(
task_id='weekday_sensor_invalid_weekday_num',
week_day=invalid_week_day,
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)

Expand All @@ -110,7 +110,7 @@ def test_weekday_sensor_with_invalid_type(self):
DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=invalid_week_day,
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)

Expand All @@ -120,8 +120,23 @@ def test_weekday_sensor_timeout_with_set(self):
poke_interval=1,
timeout=2,
week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
use_task_execution_day=True,
use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)

def test_deprecation_warning(self):
warning_message = (
"""Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
)
with pytest.warns(DeprecationWarning) as warnings:
DayOfWeekSensor(
task_id='week_day_warn',
poke_interval=1,
timeout=2,
week_day='Tuesday',
use_task_execution_day=True,
dag=self.dag,
)
assert warning_message == str(warnings[0].message)

0 comments on commit 614b232

Please sign in to comment.