Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show Deprecation warning on duplicate Task ids #8728

Merged
merged 4 commits into from
May 7, 2020

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented May 5, 2020

We already raise the warning at https://github.com/apache/airflow/blob/1.10.10/airflow/models/dag.py#L1318-L1328 but that wasn't enough.

We raise an error for Airflow 2.0 . PR where we did that: https://github.com/apache/airflow/pull/6549/files

Before:

airflow ❯ python -c "import airflow; from airflow.operators.dummy_operator import DummyOperator; dag = airflow.DAG(dag_id='test', start_date=airflow.utils.timezone.utcnow()); t1 = DummyOperator(dag=dag, task_id='t1'); t1_2 = DummyOperator(dag=dag, task_id='t1')"

After:

airflow ❯ python -c "import airflow; from airflow.operators.dummy_operator import DummyOperator; dag = airflow.DAG(dag_id='test', start_date=airflow.utils.timezone.utcnow()); t1 = DummyOperator(dag=dag, task_id='t1'); t1_2 = DummyOperator(dag=dag, task_id='t1')"
/opt/airflow/airflow/models/baseoperator.py:555: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
  category=PendingDeprecationWarning)

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@kaxil kaxil requested a review from ashb May 5, 2020 23:04
Copy link
Contributor

@jhtimmins jhtimmins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Basically two things worth noting.

  1. A few naming ambiguities in the tests that could be clarified.
  2. The tests are currently testing the content of warning message strings. This is making them brittle.

PendingDeprecationWarning,
"The requested task could not be added to the DAG because a task with "
"task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to "
"overwrite a task will raise an exception."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing for the exact response message is likely to make this case brittle. Here are some possible alternatives:

  1. Create a class constant on Dag that stores this string, and use it both in the implementation and the test.
  2. Same as 1, but instead put it in a warnings.py file or similar.
  3. Same as 1, but add some type of Warning or Alert mixin.
  4. Create a child class of PendingDeprecationWarning that encompasses the message body, then just text for the Exception type.
  5. Only modify the test and check for the presence of t1 and 2.0.

4 probably makes the most sense. Can add 5 to it for extra safety that string interpolation is working.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason we test for the exact same message is otherwise, even if someone changes the message in warnings.py the test would still pass.

tests/models/test_dag.py Outdated Show resolved Hide resolved
tests/models/test_dag.py Outdated Show resolved Hide resolved
tests/models/test_dag.py Outdated Show resolved Hide resolved
@kaxil kaxil added this to the Airflow 1.10.11 milestone May 6, 2020
@kaxil
Copy link
Member Author

kaxil commented May 6, 2020

CI failures are unrelated. @potiuk any idea?

@potiuk
Copy link
Member

potiuk commented May 7, 2020

Will be fixed by #8758

@@ -546,7 +546,8 @@ def dag(self, dag):
"The DAG assigned to {} can not be changed.".format(self))
elif self.task_id not in dag.task_dict:
dag.add_task(self)

elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] != self:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want dag.task_dict[self.task_id] != self, or dag.task_dict[self.task_id] is not self.

The former use __eq__, the later checks for exactly the same object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, I will update Airflow Master as we currently raise errors after == check

https://github.com/kaxil/airflow/blob/c3a46b9fec618489314a4ac8d4be289481e8edd3/airflow/models/baseoperator.py#L603-L606

@kaxil kaxil merged commit 1ef5f13 into apache:v1-10-test May 7, 2020
@kaxil kaxil deleted the raise-warning branch May 7, 2020 17:18
@ashb
Copy link
Member

ashb commented May 7, 2020

Only cos it's related to this PR, not that it's a bug with this PR

And I think I've just found a problem in our example dags caused by us using != in master:

delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_2)

@tooptoop4
Copy link
Contributor

@kaxil this is causing issue for me if i have same taskid in 2 different dags

@kaxil
Copy link
Member Author

kaxil commented Oct 8, 2020

@kaxil this is causing issue for me if i have same taskid in 2 different dags

It should not show you warning for different dags, unless you are using context manager and have as dag: for both of your DAGs

@tooptoop4
Copy link
Contributor

@kaxil raised #11354

@JeffryMAC
Copy link

JeffryMAC commented Oct 8, 2020

@kaxil I use context manager and as dag for every DAG I have.
the "as dag" is not the dag_id. why it matters?

@kaxil
Copy link
Member Author

kaxil commented Oct 8, 2020

@kaxil I use context manager and as dag for every DAG I have.
the "as dag" is not the dag_id. why it matters?

You are right, in theory it should not, although I roughly remember I had to change:

with DAG(dag_id='d1') as dag:
     t1 = BashOperator(...)

with DAG(dag_id='d2') as dag:
     t1 = BashOperator(...)

to

with DAG(dag_id='d1') as dag_1:
     t1 = BashOperator(...)

with DAG(dag_id='d2') as dag_2:
     t1 = BashOperator(...)

Can't remember why and when though

cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants