Skip to content

Commit

Permalink
Don't schedule dummy tasks (apache#7880)
Browse files Browse the repository at this point in the history
(cherry picked from commit d87c59d)
  • Loading branch information
mik-laj authored and kaxil committed Mar 30, 2020
1 parent b886436 commit c496caa
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
5 changes: 5 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from airflow.models import DagRun, SlaMiss, errors
from airflow.settings import Stats
from airflow.ti_deps.dep_context import DepContext, SCHEDULEABLE_STATES, SCHEDULED_DEPS
from airflow.operators.dummy_operator import DummyOperator
from airflow.ti_deps.deps.pool_slots_available_dep import STATES_TO_COUNT_AS_RUNNING
from airflow.utils import asciiart, helpers, timezone
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
Expand Down Expand Up @@ -1630,6 +1631,10 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
# Task starts out in the scheduled state. All tasks in the
# scheduled state will be sent to the executor
ti.state = State.SCHEDULED
# If the task is dummy, then mark it as done automatically
if isinstance(ti.task, DummyOperator) \
and not ti.task.on_success_callback:
ti.state = State.SUCCESS

# Also save this task instance to the DB.
self.log.info("Creating / updating %s in ORM", ti)
Expand Down
2 changes: 2 additions & 0 deletions airflow/operators/dummy_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class DummyOperator(BaseOperator):
"""
Operator that does literally nothing. It can be used to group tasks in a
DAG.
The task is evaluated by the scheduler but never processed by the executor.
"""

ui_color = '#e8f7e4'
Expand Down
43 changes: 43 additions & 0 deletions tests/dags/test_only_dummy_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator

DEFAULT_DATE = datetime(2016, 1, 1)

default_args = {
"owner": "airflow",
"start_date": DEFAULT_DATE,
}

dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once')

with dag:
task_a = DummyOperator(task_id="test_task_a")

task_b = DummyOperator(task_id="test_task_b")

task_a >> task_b

task_c = DummyOperator(task_id="test_task_c")

task_d = DummyOperator(task_id="test_task_on_execute", on_success_callback=lambda *args, **kwargs: 1)

task_e = DummyOperator(task_id="test_task_on_success", on_success_callback=lambda *args, **kwargs: 1)
49 changes: 47 additions & 2 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import psutil
import pytest
import six
from airflow.models.taskinstance import TaskInstance
from parameterized import parameterized

import airflow.example_dags
Expand Down Expand Up @@ -1828,10 +1829,12 @@ def test_scheduler_reschedule(self):
dag = DAG(
dag_id='test_scheduler_reschedule',
start_date=DEFAULT_DATE)
dummy_task = DummyOperator(
dummy_task = BashOperator(
task_id='dummy',
dag=dag,
owner='airflow')
owner='airflow',
bash_command='echo 1',
)

dag.clear()
dag.is_subdag = False
Expand Down Expand Up @@ -2855,3 +2858,45 @@ def test_reset_orphaned_tasks_with_orphans(self):
self.assertEqual(state, ti.state)

session.close()

def test_should_mark_dummy_task_as_success(self):
dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py'
)
scheduler_job = SchedulerJob(dag_ids=[], log=mock.MagicMock())
with create_session() as session:
session.query(TaskInstance).delete()
session.query(DagModel).delete()

dagbag = DagBag(dag_folder=dag_file, include_examples=False)
dagbag.get_dag('test_only_dummy_tasks').sync_to_db()

simple_dags, import_errors_count = scheduler_job.process_file(
file_path=dag_file, zombies=[]
)
with create_session() as session:
tis = session.query(TaskInstance).all()

self.assertEqual(0, import_errors_count)
self.assertEqual(['test_only_dummy_tasks'], [dag.dag_id for dag in simple_dags])
self.assertEqual(5, len(tis))
self.assertEqual({
('test_task_a', 'success'),
('test_task_b', None),
('test_task_c', 'success'),
('test_task_on_execute', 'scheduled'),
('test_task_on_success', 'scheduled'),
}, {(ti.task_id, ti.state) for ti in tis})

scheduler_job.process_file(file_path=dag_file, zombies=[])
with create_session() as session:
tis = session.query(TaskInstance).all()

self.assertEqual(5, len(tis))
self.assertEqual({
('test_task_a', 'success'),
('test_task_b', 'success'),
('test_task_c', 'success'),
('test_task_on_execute', 'scheduled'),
('test_task_on_success', 'scheduled'),
}, {(ti.task_id, ti.state) for ti in tis})

0 comments on commit c496caa

Please sign in to comment.