diff --git a/airflow/sensors/date_time_sensor.py b/airflow/sensors/date_time_sensor.py new file mode 100644 index 00000000000000..a62f3cb8c97b25 --- /dev/null +++ b/airflow/sensors/date_time_sensor.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime + +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils import timezone +from airflow.utils.decorators import apply_defaults + + +class DateTimeSensor(BaseSensorOperator): + """ + Waits until the specified datetime. + + A major advantage of this sensor is idempotence for the ``target_time``. + It handles some cases for which ``TimeSensor`` and ``TimeDeltaSensor`` are not suited. + + **Example** 1 : + If a task needs to wait for 11am on each ``execution_date``. Using + ``TimeSensor`` or ``TimeDeltaSensor``, all backfill tasks started at + 1am have to wait for 10 hours. This is unnecessary, e.g. a backfill + task with ``{{ ds }} = '1970-01-01'`` does not need to wait because + ``1970-01-01T11:00:00`` has already passed. + + **Example** 2 : + If a DAG is scheduled to run at 23:00 daily, but one of the tasks is + required to run at 01:00 next day, using ``TimeSensor`` will return + ``True`` immediately because 23:00 > 01:00. Instead, we can do this: + + .. code-block:: python + + DateTimeSensor( + task_id='wait_for_0100', + target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}', + ) + + :param target_time: datetime after which the job succeeds. (templated) + :type target_time: str or datetime.datetime + """ + + template_fields = ("target_time",) + + @apply_defaults + def __init__( + self, target_time, *args, **kwargs + ): + super(DateTimeSensor, self).__init__(*args, **kwargs) + if isinstance(target_time, datetime.datetime): + self.target_time = target_time.isoformat() + elif isinstance(target_time, str): + self.target_time = target_time + else: + raise TypeError( + "Expected str or datetime.datetime type for target_time. Got {}".format( + type(target_time) + ) + ) + + def poke(self, context): + self.log.info("Checking if the time (%s) has come", self.target_time) + return timezone.utcnow() > timezone.parse(self.target_time) diff --git a/tests/sensors/test_date_time_sensor.py b/tests/sensors/test_date_time_sensor.py new file mode 100644 index 00000000000000..b8b81d145d05ad --- /dev/null +++ b/tests/sensors/test_date_time_sensor.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from mock import patch +from parameterized import parameterized + +from airflow.models.dag import DAG +from airflow.sensors.date_time_sensor import DateTimeSensor +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2015, 1, 1) + + +class TestDateTimeSensor: + @classmethod + def setup_class(cls): + args = {"owner": "airflow", "start_date": DEFAULT_DATE} + cls.dag = DAG("test_dag", default_args=args) + + @parameterized.expand( + [ + ( + "valid_datetime", + timezone.datetime(2020, 7, 6, 13, tzinfo=timezone.utc), + "2020-07-06T13:00:00+00:00", + ), + ("valid_str", "20200706T210000+8", "20200706T210000+8"), + ] + ) + def test_valid_input(self, task_id, target_time, expected): + op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag,) + assert op.target_time == expected + + def test_invalid_input(self): + with pytest.raises(TypeError): + DateTimeSensor( + task_id="test", target_time=timezone.utcnow().time(), dag=self.dag, + ) + + @parameterized.expand( + [ + ( + "poke_datetime", + timezone.datetime(2020, 1, 1, 22, 59, tzinfo=timezone.utc), + True, + ), + ("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False), + ("poke_str_basic_with_tz", "20200102T065959+8", True), + ] + ) + @patch( + "airflow.sensors.date_time_sensor.timezone.utcnow", + return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc), + ) + def test_poke(self, task_id, target_time, expected, mock_utcnow): + op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag) + assert op.poke(None) == expected