Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tasks with none_failed trigger rule do not run when all upstream tasks are skipped #14319

Closed
n-oden opened this issue Feb 19, 2021 · 5 comments
Closed
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug pending-response

Comments

@n-oden
Copy link

n-oden commented Feb 19, 2021

Apache Airflow version: 1.10.12
Kubernetes version (if you are using kubernetes) (use kubectl version): v1.16.15-gke.6000

Environment:

  • Cloud provider or hardware configuration: Google Kubernetes Engine
  • OS (e.g. from /etc/os-release): "Container-Optimized OS"
BUILD_ID=12371.1088.0
NAME="Container-Optimized OS"
KERNEL_COMMIT_ID=52bdab9330bdd9e50dc967f8aa850829921ca217
GOOGLE_CRASH_ID=Lakitu
VERSION_ID=77
BUG_REPORT_URL="https://cloud.google.com/container-optimized-os/docs/resources/support-policy#contact_us"
PRETTY_NAME="Container-Optimized OS from Google"
VERSION=77
GOOGLE_METRICS_PRODUCT_ID=26
HOME_URL="https://cloud.google.com/container-optimized-os/docs"
ID=cos
  • Kernel (e.g. uname -a): Linux gke-services-1-default-pool-3ef08c09-v95l 4.19.112+ #1 SMP Sat Oct 10 13:45:37 PDT 2020 x86_64 Intel(R) Xeon(R) CPU @ 2.30GHz GenuineIntel GNU/Linux
  • Install tools: helm, chart airflow-7.16.0
  • Others:

What happened:

Even after #7464, we are finding that tasks with the none_failed trigger are still being skipped when their direct upstream is skipped.

In a simple three-task DAG where, the first step is the GoogleCloudStoragePrefixSensor, followed by a processing task and ending with a heartbeat check operator:

check_for_late_data >> run_statekeeper >> passive_check

The passive_check task is configured with NONE_FAILED:

passive_check = PassiveCheckOperator(
    task_id="passive_check",
    dag=dag,
    trigger_rule=TriggerRule.NONE_FAILED)

The GCS sensor operator exits thusly if it finds no keys:

[2021-02-12 00:32:18,130] {taskinstance.py:1025} INFO - Marking task as SKIPPED.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=
[2021-02-12 00:32:18,130] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=20210212T003218

The intermediate step is also skipped as is intended (it uses the default trigger rule). But the final step is skipped as well, which should not happen:

image

The same thing happens if we put a dummy shared start task upstream:
image

What you expected to happen:

The "passive_check" task should have run, because its trigger is none_failed and no tasks upstream of it have failed, they have only been skipped.

As this was allegedly fixed in #7464, I suspect that either something has regressed since 1.10.10 or there is a corner case not yet caught.

How to reproduce it:

The following DAG reproduces the issue (presuming that you have a working Google Cloud Platform connection and a GCS bucket which the DAG can read):

import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor

SOURCE_FILE_BUCKET =  "GCS_BUCKET_NAME"  # replace this with a GCS bucket under your control
SOURCE_FILE_PREFIX = "nofileshere/"  # make sure there are zero keys under this prefix

with DAG(
    dag_id="simple_skip",
    start_date=datetime.datetime(2021, 2, 19),
    schedule_interval=None,
) as dag:
    find_no_data = GoogleCloudStoragePrefixSensor(
        dag=dag,
        task_id="find_no_data",
        soft_fail=True,
        timeout=60 * 2,
        bucket=SOURCE_FILE_BUCKET,
        prefix=SOURCE_FILE_PREFIX,
    )
    step_1 = DummyOperator(task_id="step_1", dag=dag)
    step_2 = DummyOperator(task_id="step_2", dag=dag, trigger_rule="none_failed")
    find_no_data >> step_1 >> step_2
@n-oden n-oden added the kind:bug This is a clearly a bug label Feb 19, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 19, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@vikramkoka vikramkoka added the area:Scheduler including HA (high availability) scheduler label Feb 19, 2021
@n-oden
Copy link
Author

n-oden commented Feb 19, 2021

As it turns out, this has been fixed between 1.10.12 and HEAD, but has not been released yet.

The issue was in https://github.com/apache/airflow/blob/1.10.12/airflow/sensors/base_sensor_operator.py#L112-L114:

                if self.soft_fail and not context['ti'].is_eligible_to_retry():
                    self._do_skip_downstream_tasks(context)
                    raise AirflowSkipException('Snap. Time is **OUT.')**

Any sensor that inherited BaseSensorOperator and had soft_fail set to True would raise AirflowSkipExcepton but only after manually skipping all downstream tasks in _do_skip_downstream_tasks():

    def _do_skip_downstream_tasks(self, context):
        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
        self.log.debug("Downstream task_ids %s", downstream_tasks)
        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

This appears to have been fixed in HEAD, where in https://github.com/apache/airflow/blob/master/airflow/sensors/base.py#L233-L236 just the exception is raised and the _do_skip_downstream_tasks() function no longer exists.

And as a side note, if you look at https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1142-L1157, the _run_raw_task() method is primarily a very large try/except block that attempts to run the task and then handle each possible exception in turn. But the exception handler for AirflowSkipException does not return, which means that the flow falls out of the try/except block and continues on L1182 with the task being marked success. You can actually see this in our log snippet above:

[2021-02-12 00:32:18,130] {taskinstance.py:1025} INFO - Marking task as SKIPPED.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=
[2021-02-12 00:32:18,130] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=20210212T003218

I suspect that there should be a raise or a return at the end of that block?

@eladkal
Copy link
Contributor

eladkal commented Sep 19, 2021

hi @n-oden this issue is reported against 1.10
Please let us know if the issue still happens on latest Airflow version.

@karuhanga
Copy link

karuhanga commented Sep 30, 2021

Same behavior through to 1.10.15. Looks like we'd need to upgrade to 2.x.

@josh-fell
Copy link
Contributor

josh-fell commented Oct 16, 2021

I can confirm this is fixed in Airflow 2.1.0 and Airflow 2.2.0.

DAG code

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


@task
def task1():
    raise AirflowSkipException

@task
def task2():
    raise AirflowSkipException

@task
def task3():
    raise AirflowSkipException


with DAG(
    dag_id="__test__",
    start_date=datetime(2021, 1, 1),
    schedule_interval="0 * * * *",
    tags=["test"],
    catchup=False,
) as dag:
    end = DummyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED)

    task1() >> task2() >> end
    task3() >> end

Sample DAG run
image

@eladkal eladkal closed this as completed Oct 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug pending-response
Projects
None yet
Development

No branches or pull requests

5 participants