Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple cron expressions in schedule_interval #24733

Closed
wants to merge 11 commits into from
13 changes: 10 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@


DagStateChangeCallback = Callable[[Context], None]
ScheduleInterval = Union[None, str, timedelta, relativedelta]
MultiCron = Union[List[str], Set[str], Tuple[str, ...]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This additional type alias doesn’t seem to be necessary?

(Also this can probably be simply Collection[str])

ScheduleInterval = Union[None, str, timedelta, relativedelta, MultiCron]

# FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval],
# but Mypy cannot handle that right now. Track progress of PEP 661 for progress.
Expand Down Expand Up @@ -168,7 +169,9 @@ def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timet
return OnceTimetable()
if isinstance(interval, (timedelta, relativedelta)):
return DeltaDataIntervalTimetable(interval)
if isinstance(interval, str):
if isinstance(interval, str) or (
isinstance(interval, (list, set, tuple)) and all(isinstance(element, str) for element in interval)
):
Comment on lines +172 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(interval, str) or (
isinstance(interval, (list, set, tuple)) and all(isinstance(element, str) for element in interval)
):
if isinstance(interval, str) or (
isinstance(interval, Collection) and all(isinstance(element, str) for element in interval)
):

return CronDataIntervalTimetable(interval, timezone)
raise ValueError(f"{interval!r} is not a valid schedule_interval.")

Expand Down Expand Up @@ -2500,7 +2503,11 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=NEW_SESSION):
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.schedule_interval = (
list(dag.schedule_interval)
if isinstance(dag.schedule_interval, set)
else dag.schedule_interval
)
Comment on lines +2506 to +2510
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to special-case set?

orm_dag.timetable_description = dag.timetable.description

run: Optional[DagRun] = most_recent_runs.get(dag.dag_id)
Expand Down
45 changes: 44 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,42 @@
}
}
},
"set": {
"description": "A Python set",
"type": "object",
"properties": {
"__type": {
"type": "string",
"const": "set"
},
"__var": {
"type": "array"
}
},
"required": [
"__type",
"__var"
],
"additionalProperties": false
},
"tuple": {
"description": "A Python tuple",
"type": "object",
"properties": {
"__type": {
"type": "string",
"const": "tuple"
},
"__var": {
"type": "array"
}
},
"required": [
"__type",
"__var"
],
"additionalProperties": false
},
"timezone": {
"anyOf": [
{ "type": "string" },
Expand Down Expand Up @@ -115,7 +151,14 @@
{ "type": "null" },
{ "type": "string" },
{ "$ref": "#/definitions/typed_timedelta" },
{ "$ref": "#/definitions/typed_relativedelta" }
{ "$ref": "#/definitions/typed_relativedelta" },
{
"type": "array",
"description": "List of cron expressions (string)",
"items": { "type": "string" }
},
{ "$ref": "#/definitions/set" },
{ "$ref": "#/definitions/tuple" }
]
},
"schedule_on":{
Expand Down
119 changes: 51 additions & 68 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
# under the License.

import datetime
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
from pendulum.tz.timezone import Timezone

from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc

Delta = Union[datetime.timedelta, relativedelta]

Expand Down Expand Up @@ -97,111 +96,95 @@ def next_dagrun_info(
return DagRunInfo.interval(start=start, end=end)


def _is_schedule_fixed(expression: str) -> bool:
"""Figures out if the schedule has a fixed time (e.g. 3 AM every day).

:return: True if the schedule has a fixed time, False if not.
class CronDataIntervalTimetable(_DataIntervalTimetable):
"""
Timetable that schedules data intervals using one or more cron expressions.

Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.
This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either:
- A five-segment cron representation
- One of ``cron_presets``
- Or a collection containing values from the above

This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""
cron = croniter(expression)
next_a = cron.get_next(datetime.datetime)
next_b = cron.get_next(datetime.datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour

def __init__(
self, crons: Union[str, List[str], Set[str], Tuple[str, ...]], timezone: Union[str, Timezone]
) -> None:
Comment on lines +111 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of normalizing here, I wonder if it’s easier to make this only accept List[str] and normalize in DAG.__init__ instead.

"""
:param crons: One or more cron expressions
:param timezone:
"""
if isinstance(crons, str):
crons = [crons]

class CronDataIntervalTimetable(_DataIntervalTimetable):
"""Timetable that schedules data intervals with a cron expression.
self._expressions = {cron_presets.get(expression, expression) for expression in crons}
cron_descriptions = set()

This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
a five/six-segment representation, or one of ``cron_presets``.
for cron_expression in self._expressions:
descriptor = ExpressionDescriptor(
expression=cron_expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
)

The implementation extends on croniter to add timezone awareness. This is
because croniter works only with naive timestamps, and cannot consider DST
when determining the next/previous time.
try:
# Check if cron expression contains more than 5 elements and avoid evaluation for now as
# Croniter has inconsistent evaluation with other libraries
if len(croniter(cron_expression).expanded) > 5:
raise FormatException()
interval_description = descriptor.get_description()
except (CroniterBadCronError, FormatException, MissingFieldException):
interval_description = ""

Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""
cron_descriptions.add(interval_description)

def __init__(self, cron: str, timezone: Union[str, Timezone]) -> None:
self._expression = cron_presets.get(cron, cron)
self.description = " and ".join(cron_descriptions)

if isinstance(timezone, str):
timezone = Timezone(timezone)
self._timezone = timezone

descriptor = ExpressionDescriptor(
expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
)
try:
# checking for more than 5 parameters in Cron and avoiding evaluation for now,
# as Croniter has inconsistent evaluation with other libraries
if len(croniter(self._expression).expanded) > 5:
raise FormatException()
interval_description = descriptor.get_description()
except (CroniterBadCronError, FormatException, MissingFieldException):
interval_description = ""
self.description = interval_description

@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
from airflow.serialization.serialized_objects import decode_timezone

return cls(data["expression"], decode_timezone(data["timezone"]))
return cls(data["expressions"], decode_timezone(data["timezone"]))

def __eq__(self, other: Any) -> bool:
"""Both expression and timezone should match.

"""
Both expression and timezone should match.
This is only for testing purposes and should not be relied on otherwise.
"""
if not isinstance(other, CronDataIntervalTimetable):
return NotImplemented
return self._expression == other._expression and self._timezone == other._timezone
return self._expressions == other._expressions and self._timezone == other._timezone

@property
def summary(self) -> str:
return self._expression
return ", ".join(self._expressions)

def serialize(self) -> Dict[str, Any]:
from airflow.serialization.serialized_objects import encode_timezone

return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}
return {"expressions": self._expressions, "timezone": encode_timezone(self._timezone)}

def validate(self) -> None:
try:
croniter(self._expression)
for cron_expression in self._expressions:
croniter(cron_expression)
except (CroniterBadCronError, CroniterBadDateError) as e:
raise AirflowTimetableInvalid(str(e))

@cached_property
def _should_fix_dst(self) -> bool:
# This is lazy so instantiating a schedule does not immediately raise
# an exception. Validity is checked with validate() during DAG-bagging.
return not _is_schedule_fixed(self._expression)

def _get_next(self, current: DateTime) -> DateTime:
"""Get the first schedule after specified time, with DST fixed."""
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_next(datetime.datetime)
if not self._should_fix_dst:
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = scheduled - naive
return convert_to_utc(current.in_timezone(self._timezone) + delta)
"""Get the first schedule after specified time in UTC."""
crons = {croniter(expression, start_time=current) for expression in self._expressions}
earliest_datetime = min(cron.get_next(datetime.datetime) for cron in crons)
return convert_to_utc(earliest_datetime)

def _get_prev(self, current: DateTime) -> DateTime:
"""Get the first schedule before specified time, with DST fixed."""
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_prev(datetime.datetime)
if not self._should_fix_dst:
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = naive - scheduled
return convert_to_utc(current.in_timezone(self._timezone) - delta)
"""Get the first schedule before specified time in UTC."""
crons = {croniter(expression, start_time=current) for expression in self._expressions}
latest_datetime = max(cron.get_prev(datetime.datetime) for cron in crons)
return convert_to_utc(latest_datetime)

def _align(self, current: DateTime) -> DateTime:
"""Get the next scheduled time.
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional, Set, Tuple, Union

from croniter import croniter
from dateutil.relativedelta import relativedelta # for doctest
Expand All @@ -39,7 +39,7 @@ def date_range(
start_date: datetime,
end_date: Optional[datetime] = None,
num: Optional[int] = None,
delta: Optional[Union[str, timedelta, relativedelta]] = None,
delta: Optional[Union[str, timedelta, relativedelta, List[str], Set[str], Tuple[str, ...]]] = None,
) -> List[datetime]:
"""
Get a set of dates as a list based on a start, end and delta, delta
Expand Down
42 changes: 29 additions & 13 deletions docs/apache-airflow/concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,41 @@ You can also provide an ``.airflowignore`` file inside your ``DAG_FOLDER``, or a
Running DAGs
------------

DAGs will run in one of two ways:
A DAG can run in two ways:

- When they are *triggered* either manually or via the API
- On a defined *schedule*, which is defined as part of the DAG
- *Manually triggered* via the UI or API
- Or *scheduled*, defined by ``schedule_interval`` on the DAG. For example::

DAGs do not *require* a schedule, but it's very common to define one. You define it via the ``schedule_interval`` argument, like this::

with DAG("my_daily_dag", schedule_interval="@daily"):
...
from airflow.models import DAG

The ``schedule_interval`` argument takes any value that is a valid `Crontab <https://en.wikipedia.org/wiki/Cron>`_ schedule value, so you could also do::

with DAG("my_daily_dag", schedule_interval="0 * * * *"):
with DAG("my_dag", schedule_interval="0 0 * * *"):
...

.. tip::

For more information on ``schedule_interval`` values, see :doc:`DAG Run </dag-run>`.
This DAG will run every day at 00:00, as defined by the cron expression given to ``schedule_interval``. The value of ``schedule_interval`` can take several types:

+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| Type | When to use | Example |
+==========================================+======================================================================================================================+===================================+
| ``None`` | No schedule, use for manually triggered DAGs | ``None`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| Cron expression (``str``) | To run at cron-based intervals + ``"0 0 * * *"`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| Cron preset (``str``) | Convenience cron expression for readability + ``"@daily"`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| List of cron expressions/presets | To run at intervals that cannot be expressed by a single cron expression. + ``["@daily", "0 3 * * MON,TUE"]`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| ``datetime.timedelta`` | To run at frequency-based intervals. Useful if your interval cannot be expressed by cron e.g. ``timedelta(days=3)``. + ``timedelta(days=3)`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+
| ``dateutil.relativedelta.relativedelta`` | To express an interval in weeks, months, or years (which timedelta cannot do natively). + ``relativedelta(months=1)`` |
+------------------------------------------+----------------------------------------------------------------------------------------------------------------------+-----------------------------------+

.. tip::
A convenient tool for converting cron expressions to human language is `Crontab.guru <https://crontab.guru/>`_.

If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.
For more information on ``logical date``, see :ref:`data-interval` and
BasPH marked this conversation as resolved.
Show resolved Hide resolved
:ref:`faq:what-does-execution-date-mean`.
For a full list of all available cron presets, see :ref:`dag-run:cron-presets`. If ``schedule_interval`` does not provide enough flexibility to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.

Every time you run a DAG, you are creating a new instance of that DAG which
Airflow calls a :doc:`DAG Run </dag-run>`. DAG Runs can run in parallel for the
Expand Down Expand Up @@ -197,6 +210,9 @@ schedule interval put in place, the logical date is going to indicate the time
at which it marks the start of the data interval, where the DAG run's start
date would then be the logical date + scheduled interval.

.. tip::
For more information on ``logical date``, see :ref:`data-interval` and :ref:`faq:what-does-execution-date-mean`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For more information on ``logical date``, see :ref:`data-interval` and :ref:`faq:what-does-execution-date-mean`.
For more information on *logical date*, see :ref:`data-interval` and :ref:`faq:what-does-execution-date-mean`.

nit (this word is not code so let’s not make it look like code)


DAG Assignment
--------------

Expand Down
2 changes: 2 additions & 0 deletions docs/apache-airflow/dag-run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ There are two possible terminal states for the DAG Run:
Be careful if some of your tasks have defined some specific `trigger rule <dags.html#trigger-rules>`_.
These can lead to some unexpected behavior, e.g. if you have a leaf task with trigger rule `"all_done"`, it will be executed regardless of the states of the rest of the tasks and if it will succeed, then the whole DAG Run will also be marked as ``success``, even if something failed in the middle.

.. _dag-run:cron-presets:

Cron Presets
''''''''''''

Expand Down
Loading