Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow plugin does not support dynamic task mapping #6606

Closed
cccs-seb opened this issue Dec 2, 2022 · 4 comments
Closed

Airflow plugin does not support dynamic task mapping #6606

cccs-seb opened this issue Dec 2, 2022 · 4 comments
Labels
accepted An Issue that is confirmed as a bug by the DataHub Maintainers. bug Bug report ingestion PR or Issue related to the ingestion of metadata

Comments

@cccs-seb
Copy link
Contributor

cccs-seb commented Dec 2, 2022

Describe the bug

Any DAG task using Airflow's dynamic task mapping will crash when using the Airflow plugin as it can't set the on success/failure callbacks.

Versions:

  • apache-airflow==2.4.2
  • acryl-datahub-airflow-plugin==0.9.2.2

Here is an example stacktrace:

[2022-12-01 23:21:36,883] {dagbag.py:322} DEBUG - Importing /home/jovyan/airflow/dags/taskflow.py
Setting task policy for Dag: taskflow Task: make_list
Setting task policy for Dag: taskflow Task: printer
[2022-12-01 23:21:36,904] {dagbag.py:564} ERROR - can't set attribute
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dagbag.py", line 551, in collect_dags
    found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dagbag.py", line 307, in process_file
    found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dagbag.py", line 437, in _process_modules
    self.bag_dag(dag=dag, root_dag=dag)
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dagbag.py", line 464, in bag_dag
    self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dagbag.py", line 481, in _bag_dag
    settings.task_policy(task)
  File "/opt/conda/lib/python3.8/site-packages/datahub_provider/_plugin.py", line 292, in custom_task_policy
    task_policy(task)
  File "/opt/conda/lib/python3.8/site-packages/datahub_provider/_plugin.py", line 281, in task_policy
    task.on_failure_callback = _wrap_on_failure_callback(task.on_failure_callback)
AttributeError: can't set attribute

To Reproduce
Steps to reproduce the behavior:

Here is a basic DAG that crashes when trying to run with Datahub configured as the lineage backend (using the plugin).

from __future__ import annotations

import pendulum

from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC")
)
def taskflow():

    @task()
    def make_list():
        return ['Hello', 'World']

    @task()
    def printer(val):
        print(val)
    
    printer.expand(val=make_list())

taskflow()

It seems like this bug has been reported in the past as well: #5583

Expected behavior
I'm not quite sure how we should handle lineage for task mapping. At the very least, the lineage for these types of tasks should either be skipped or trigger a warning.

We would like to use the latest version of this plugin which supports Airflow 2.4, but we have several users using dynamic task mapping which would break their tasks.

Desktop (please complete the following information):

  • Linux Ubuntu - using Airflow's official docker image
@cccs-seb cccs-seb added the bug Bug report label Dec 2, 2022
@hsheth2 hsheth2 added accepted An Issue that is confirmed as a bug by the DataHub Maintainers. ingestion PR or Issue related to the ingestion of metadata labels Dec 6, 2022
@cccs-seb
Copy link
Contributor Author

cccs-seb commented Dec 7, 2022

I looked into this briefly and it seems like we can't update task properties when the operator is a MappedOperator. I've asked a question related to this issue in an existing Airflow issue: apache/airflow#24547 (comment)

As a temporary workaround, we filter our mapped tasks by checking isinstance(task, MappedOperator) before the code here:

task.on_failure_callback = _wrap_on_failure_callback(task.on_failure_callback)
. That would allow the policy to be applied to non mapped operators.

@hsheth2 I can put up a PR for the above solution if its deemed acceptable while we wait for a response from the Airflow maintainers.

@hsheth2
Copy link
Collaborator

hsheth2 commented Dec 8, 2022

Yep that workaround seems pretty reasonable, and a PR would be great! We’ll probably want to print a warning and add a note in the docs for this as well - happy to provide guidance on that.

@cccs-seb
Copy link
Contributor Author

cccs-seb commented Dec 8, 2022

Talked to the Airflow folks about the issue, I'll be putting up a PR to allow setting those properties on MappedOperators. It's been labelled a feature so not sure when it will be released..

In the meantime, I'll submit a PR for this issue.

@hsheth2
Copy link
Collaborator

hsheth2 commented Jun 28, 2024

Going to close this, as #6738 was merged

@hsheth2 hsheth2 closed this as completed Jun 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted An Issue that is confirmed as a bug by the DataHub Maintainers. bug Bug report ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

No branches or pull requests

2 participants