Skip to content

Commit

Permalink
Handle no Dagrun in DagrunIdDep (apache#8389) (apache#11343)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben Nadler <[email protected]>
  • Loading branch information
2 people authored and RaviTezu committed Oct 25, 2020
1 parent 3cca334 commit d4ecba0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
4 changes: 2 additions & 2 deletions airflow/ti_deps/deps/dagrun_id_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def _get_dep_statuses(self, ti, session, dep_context=None):
from airflow.jobs import BackfillJob # To avoid a circular dependency
dagrun = ti.get_dagrun(session)

if not dagrun.run_id or not match(BackfillJob.ID_PREFIX + '.*', dagrun.run_id):
if not dagrun or not dagrun.run_id or not match(BackfillJob.ID_PREFIX + '.*', dagrun.run_id):
yield self._passing_status(
reason="Task's DagRun run_id is either NULL "
reason="Task's DagRun doesn't exist or the run_id is either NULL "
"or doesn't start with {}".format(BackfillJob.ID_PREFIX))
else:
yield self._failing_status(
Expand Down
7 changes: 7 additions & 0 deletions tests/ti_deps/deps/test_dagrun_id_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ def test_dagrun_id_is_not_backfill(self):
dagrun.run_id = None
ti = Mock(get_dagrun=Mock(return_value=dagrun))
self.assertTrue(DagrunIdDep().is_met(ti=ti))

def test_dagrun_is_none(self):
"""
Task instances which don't yet have an associated dagrun.
"""
ti = Mock(get_dagrun=Mock(return_value=None))
self.assertTrue(DagrunIdDep().is_met(ti=ti))

0 comments on commit d4ecba0

Please sign in to comment.