Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Dependency already registered for DAG" warnings during runs in taskflow based tasks #26599

Closed
1 of 2 tasks
emredjan opened this issue Sep 22, 2022 · 5 comments
Closed
1 of 2 tasks
Assignees
Labels
area:core kind:bug This is a clearly a bug
Milestone

Comments

@emredjan
Copy link

emredjan commented Sep 22, 2022

Apache Airflow version

2.4.0

What happened

On version 2.4.0, in DAGs with simple taskflow based tasks (nothing dynamic), I was getting the warnings about "Dependency already registered for DAG", that weren't giving warnings prior to 2.4.0, and I wanted to test with a simpler one. I copied the exact dag from the taskflow tutorial documentation, the simple extract-transform-load example located here: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#example-taskflow-api-pipeline

import json
import pendulum

from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example'],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])

tutorial_taskflow_api()

Running these gives these warnings, even though no complex dependencies, no dynamic task generation exists:

Log from the last step:

*** Reading local file: /data/apps/airflow//logs/dag_id=tutorial_taskflow_api/run_id=manual__2022-09-22T14:08:19.967374+00:00/task_id=load/attempt=1.log
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [queued]>
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [queued]>
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1383} INFO - Executing <Task(_PythonDecoratedOperator): load> on 2022-09-22 14:08:19.967374+00:00
[2022-09-22, 16:08:23 CEST] {standard_task_runner.py:54} INFO - Started process 3995388 to run task
[2022-09-22, 16:08:23 CEST] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'tutorial_taskflow_api', 'load', 'manual__2022-09-22T14:08:19.967374+00:00', '--job-id', '1185', '--raw', '--subdir', 'DAGS_FOLDER/dwh/tutorial_taskflow_api.py', '--cfg-path', '/tmp/tmps2oxurlk']
[2022-09-22, 16:08:23 CEST] {standard_task_runner.py:83} INFO - Job 1185: Subtask load
[2022-09-22, 16:08:23 CEST] {dagbag.py:525} INFO - Filling up the DagBag from /data/apps/airflow/dags/dwh/tutorial_taskflow_api.py
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): extract>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, extract already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): transform>, load already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {taskmixin.py:205} WARNING - Dependency <Task(_PythonDecoratedOperator): load>, transform already registered for DAG: tutorial_taskflow_api
[2022-09-22, 16:08:23 CEST] {task_command.py:384} INFO - Running <TaskInstance: tutorial_taskflow_api.load manual__2022-09-22T14:08:19.967374+00:00 [running]> on host svgbiappp045.gbi.int
[2022-09-22, 16:08:23 CEST] {warnings.py:109} WARNING - /data/apps/.pyenv/versions/3.10.5/envs/airflow-py310/lib/python3.10/site-packages/airflow/models/renderedtifields.py:258: SAWarning: Coercing Subquery object into a select() for use in IN(); please pass a select() construct explicitly
  tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),

[2022-09-22, 16:08:23 CEST] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=tutorial_taskflow_api
AIRFLOW_CTX_TASK_ID=load
AIRFLOW_CTX_EXECUTION_DATE=2022-09-22T14:08:19.967374+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-09-22T14:08:19.967374+00:00
[2022-09-22, 16:08:23 CEST] {logging_mixin.py:117} INFO - Total order value is: 1236.70
[2022-09-22, 16:08:23 CEST] {python.py:177} INFO - Done. Returned value was: None
[2022-09-22, 16:08:23 CEST] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=tutorial_taskflow_api, task_id=load, execution_date=20220922T140819, start_date=20220922T140823, end_date=20220922T140823
[2022-09-22, 16:08:23 CEST] {local_task_job.py:164} INFO - Task exited with return code 0
[2022-09-22, 16:08:23 CEST] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead

These tasks should run without warnings about dependencies being already registered.

How to reproduce

Copy the tutorial taskflow DAG and run it on 2.4.0.

Operating System

Red Hat Enterprise Linux 8.6 (Ootpa)

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

CeleryExecutor with rabbitmq, 1 main machine for webserver/scheduler and 1 additional worker node

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@emredjan emredjan added area:core kind:bug This is a clearly a bug labels Sep 22, 2022
@potiuk potiuk added this to the Airflow 2.4.1 milestone Sep 22, 2022
@potiuk
Copy link
Member

potiuk commented Sep 22, 2022

cc: @ashb - this is the issue which I asked the user to open FYI

@ashb
Copy link
Member

ashb commented Sep 29, 2022

It turns out these warnings have been happening since 2.3.0, but due to the mistake fixed by #26779 we just never saw them!

Correction: Since 2.1.0!!!!

ashb added a commit to astronomer/airflow that referenced this issue Sep 29, 2022
While investigating apache#26599 and the change from AIP-45, I noticed that
these warning messages weren't new! The only thing that was new was that
we started seeing them.

This is because the logger for BaseOperator and all subclasses is
`airflow.task.operators`, and the `airflow.task` logger is not
configured (with `set_context()`) until we have a TaskInstance, so it
just dropped all messages on the floor!

This changes it so that log messages are propagated to parent loggers by
default, but when we configure a context (and thus have a file to write
to) we stop that. A similar change was made for the `airflow.processor`
(but that is unlikely to suffer the same fate)
@ashb
Copy link
Member

ashb commented Sep 30, 2022

Found the problem. We're calling set_xcomarg_deps once for _each class in the hierarchy from inside apply_defaults , instead of just once.

And @task's hierarchy is _PythonDecoratedOperator, DecoratedOperator, PythonOperator, BaseOperator. Fix coming shortly.

ashb added a commit that referenced this issue Sep 30, 2022
…26779)

* Ensure the log messages from operators during parsing go somewhere

While investigating #26599 and the change from AIP-45, I noticed that
these warning messages weren't new! The only thing that was new was that
we started seeing them.

This is because the logger for BaseOperator and all subclasses is
`airflow.task.operators`, and the `airflow.task` logger is not
configured (with `set_context()`) until we have a TaskInstance, so it
just dropped all messages on the floor!

This changes it so that log messages are propagated to parent loggers by
default, but when we configure a context (and thus have a file to write
to) we stop that. A similar change was made for the `airflow.processor`
(but that is unlikely to suffer the same fate)

* Give a real row count value so logs don't fail

The ArangoDB sensor test was logging a mock object, which previously was
getting dropped before emitting, but with this change now fails with
"Mock is not an integer" when attempting the  `%d` interpolation.

To avoid making the mock overly specific (`arangodb_client_for_test.db.`
`return_value.aql.execute.return_value.count.return_value`!) I have
changed the test to mock the hook entirely (which is already tested)
@ashb
Copy link
Member

ashb commented Oct 3, 2022

Fixed, will be in 2.4.2

@ashb ashb closed this as completed Oct 3, 2022
@zachliu
Copy link
Contributor

zachliu commented Oct 6, 2022

just a simple note, if you're using the LocalExecutor, you'll see the warnings regardless of the DAG you're running

2022-10-06_15-04

ephraimbuddy pushed a commit that referenced this issue Oct 18, 2022
…26779)

* Ensure the log messages from operators during parsing go somewhere

While investigating #26599 and the change from AIP-45, I noticed that
these warning messages weren't new! The only thing that was new was that
we started seeing them.

This is because the logger for BaseOperator and all subclasses is
`airflow.task.operators`, and the `airflow.task` logger is not
configured (with `set_context()`) until we have a TaskInstance, so it
just dropped all messages on the floor!

This changes it so that log messages are propagated to parent loggers by
default, but when we configure a context (and thus have a file to write
to) we stop that. A similar change was made for the `airflow.processor`
(but that is unlikely to suffer the same fate)

* Give a real row count value so logs don't fail

The ArangoDB sensor test was logging a mock object, which previously was
getting dropped before emitting, but with this change now fails with
"Mock is not an integer" when attempting the  `%d` interpolation.

To avoid making the mock overly specific (`arangodb_client_for_test.db.`
`return_value.aql.execute.return_value.count.return_value`!) I have
changed the test to mock the hook entirely (which is already tested)

(cherry picked from commit 7363e35)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

6 participants