Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DateTimeSensor #9697

Merged
merged 2 commits into from
Jul 23, 2020
Merged

Add DateTimeSensor #9697

merged 2 commits into from
Jul 23, 2020

Conversation

22quinn
Copy link
Contributor

@22quinn 22quinn commented Jul 6, 2020

Related to #9609

A major advantage of this new sensor is idempotence for the target_time.

It handles some cases for which TimeSensor and TimeDeltaSensor are not suited.

Example 1) If a task needs to wait for 11am on each execution_date. Using TimeSensor or TimeDeltaSensor, all backfill tasks started at 1am have to wait for 10 hours. This is unnecessary, e.g. a backfill task with {{ ds }} = '1970-01-01' does not need to wait because 1970-01-01T11:00:00 has already passed.

Example 2) If a DAG is scheduled to run at 23:00 daily, but one of the tasks is required to run at 01:00 next day, using TimeSensor will return True immediately because 23:00 > 01:00. Instead, we can use DateTimeSensor:

        DateTimeSensor(
            task_id='wait_for_0100',
            target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}',
            dag=dag
        )

On another topic - I think TimeSensor, TimeDeltaSensor, and this DateTimeSensor should probably be put in the same file sensors/time.py. What does everyone think? But that will be a breaking change. Will have to do it in a separate PR.


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

with pytest.raises(TypeError):
DateTimeSensor(
task_id="test", target_time=timezone.utcnow().time(), dag=self.dag,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a test for poke method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added and rebased


def poke(self, context: Dict) -> bool:
self.log.info("Checking if the time (%s) has come", self.target_time)
return timezone.utcnow() > timezone.parse(self.target_time)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it should be changed to>=? Same applies to TimeSensor and TimeDeltaSensor. The impact of changing to >= is that the task can succeed one poke interval earlier in some cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say that > is safer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking >= is more mathematically correct. But now I realize it's extremely rare to have = because utcnow() has microsecond precision. So nvm

@turbaszek
Copy link
Member

turbaszek commented Jul 23, 2020

@zikun before we merge, I think it would be helpful to add some info from PR description to docstring:

    A major advantage of this sensor is idempotence for the ``target_time``.
    It handles some cases for which ``TimeSensor`` and ``TimeDeltaSensor`` are not suited.

    **Example**: 1 ::
        If a task needs to wait for 11am on each ``execution_date``. Using
        ``TimeSensor`` or ``TimeDeltaSensor``, all backfill tasks started at 1am have to wait for 10 hours.
        This is unnecessary, e.g. a backfill task with ``{{ ds }} = '1970-01-01'`` does not need to wait
        because ``1970-01-01T11:00:00`` has already passed.

    **Example**: 2 ::
        If a DAG is scheduled to run at 23:00 daily, but one of the tasks is required
        to run at 01:00 next day, using ``TimeSensor`` will return ``True`` immediately because 23:00 > 01:00.
        Instead, we can use ``DateTimeSensor``:

        .. code-block:: python
            DateTimeSensor(
                task_id='wait_for_0100',
                target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}',
                dag=dag
            )

Hope this is ok formatted rst but I'm not sure

@22quinn
Copy link
Contributor Author

22quinn commented Jul 23, 2020

I tried on rst online viewer. It should work.

@turbaszek turbaszek merged commit 243b704 into apache:master Jul 23, 2020
@22quinn 22quinn deleted the datetime-sensor branch July 24, 2020 02:13
@kaxil kaxil added this to the Airflow 1.10.12 milestone Aug 14, 2020
kaxil pushed a commit that referenced this pull request Aug 14, 2020
* Add DateTimeSensor

(cherry picked from commit 243b704)
kaxil pushed a commit that referenced this pull request Aug 15, 2020
* Add DateTimeSensor

(cherry picked from commit 243b704)
kaxil pushed a commit that referenced this pull request Aug 15, 2020
* Add DateTimeSensor

(cherry picked from commit 243b704)
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
* Add DateTimeSensor

(cherry picked from commit 243b704)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants