diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index fdd7c13753a224..64aa2f0fd224e5 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -43,6 +43,7 @@ from airflow.models import DAG, DagModel, SlaMiss, errors from airflow.models.dagrun import DagRun from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKeyType +from airflow.operators.dummy_operator import DummyOperator from airflow.stats import Stats from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies import SCHEDULED_DEPS @@ -912,6 +913,11 @@ def _schedule_task_instances( # Task starts out in the scheduled state. All tasks in the # scheduled state will be sent to the executor ti.state = State.SCHEDULED + # If the task is dummy, then mark it as done automatically + if isinstance(ti.task, DummyOperator) \ + and not ti.task.on_execute_callback \ + and not ti.task.on_success_callback: + ti.state = State.SUCCESS # Also save this task instance to the DB. self.log.info("Creating / updating %s in ORM", ti) diff --git a/airflow/operators/dummy_operator.py b/airflow/operators/dummy_operator.py index 734b9a79761d6e..119d2d2cd63998 100644 --- a/airflow/operators/dummy_operator.py +++ b/airflow/operators/dummy_operator.py @@ -24,6 +24,8 @@ class DummyOperator(BaseOperator): """ Operator that does literally nothing. It can be used to group tasks in a DAG. + + The task is evaluated by the scheduler but never processed by the executor. """ ui_color = '#e8f7e4' diff --git a/tests/dags/test_only_dummy_tasks.py b/tests/dags/test_only_dummy_tasks.py new file mode 100644 index 00000000000000..8b0f04d8e5c1c4 --- /dev/null +++ b/tests/dags/test_only_dummy_tasks.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from datetime import datetime + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + +DEFAULT_DATE = datetime(2016, 1, 1) + +default_args = { + "owner": "airflow", + "start_date": DEFAULT_DATE, +} + +dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') + +with dag: + task_a = DummyOperator(task_id="test_task_a") + + task_b = DummyOperator(task_id="test_task_b") + + task_a >> task_b + + task_c = DummyOperator(task_id="test_task_c") + + task_d = DummyOperator(task_id="test_task_on_execute", on_execute_callback=lambda *args, **kwargs: 1) + + task_e = DummyOperator(task_id="test_task_on_success", on_success_callback=lambda *args, **kwargs: 1) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 04be18ad00667c..275b3faa0a43de 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1082,6 +1082,50 @@ def test_should_parse_only_unpaused_dags(self): self.assertEqual(['test_multiple_dags__dag_2'], [dag.dag_id for dag in simple_dags]) self.assertEqual({'test_multiple_dags__dag_2'}, {ti.dag_id for ti in tis}) + def test_should_mark_dummy_task_as_success(self): + dag_file = os.path.join( + os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py' + ) + dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) + with create_session() as session: + session.query(TaskInstance).delete() + session.query(DagModel).delete() + + dagbag = DagBag(dag_folder=dag_file, include_examples=False) + dagbag.sync_to_db() + + simple_dags, import_errors_count = dag_file_processor.process_file( + file_path=dag_file, failure_callback_requests=[] + ) + with create_session() as session: + tis = session.query(TaskInstance).all() + + self.assertEqual(0, import_errors_count) + self.assertEqual(['test_only_dummy_tasks'], [dag.dag_id for dag in simple_dags]) + self.assertEqual(5, len(tis)) + self.assertEqual({ + ('test_task_a', 'success'), + ('test_task_b', None), + ('test_task_c', 'success'), + ('test_task_on_execute', 'scheduled'), + ('test_task_on_success', 'scheduled'), + }, {(ti.task_id, ti.state) for ti in tis}) + + dag_file_processor.process_file( + file_path=dag_file, failure_callback_requests=[] + ) + with create_session() as session: + tis = session.query(TaskInstance).all() + + self.assertEqual(5, len(tis)) + self.assertEqual({ + ('test_task_a', 'success'), + ('test_task_b', 'success'), + ('test_task_c', 'success'), + ('test_task_on_execute', 'scheduled'), + ('test_task_on_success', 'scheduled'), + }, {(ti.task_id, ti.state) for ti in tis}) + class TestDagFileProcessorQueriesCount(unittest.TestCase): """ @@ -2493,10 +2537,12 @@ def test_scheduler_reschedule(self): dag = DAG( dag_id='test_scheduler_reschedule', start_date=DEFAULT_DATE) - dummy_task = DummyOperator( + dummy_task = BashOperator( task_id='dummy', dag=dag, - owner='airflow') + owner='airflow', + bash_command='echo 1', + ) dag.clear() dag.is_subdag = False