-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check for TaskGroup in _PythonDecoratedOperator #12312
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, indeed. Didn't think of this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it! |
7fa6611
to
5c7fc78
Compare
Thanks for raising and addressing the issue! Change looks good. Just a few minor suggestions. |
@@ -165,7 +166,7 @@ def __init__( | |||
multiple_outputs: bool = False, | |||
**kwargs, | |||
) -> None: | |||
kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag')) | |||
kwargs['task_id'] = self._get_unique_task_id(task_id, kwargs.get('dag'), kwargs.get('task_group')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing kwargs.get
, I suggest having dag
and task_group
as two keyword arguments with defaults. I.e. like this:
def __init__(
self, self,
*, *,
python_callable: Callable, python_callable: Callable,
task_id: str, task_id: str,
op_args: Tuple[Any], op_args: Tuple[Any],
op_kwargs: Dict[str, Any], op_kwargs: Dict[str, Any],
multiple_outputs: bool = False, multiple_outputs: bool = False,
dag=None,
task_group=None,
**kwargs, **kwargs,
) -> None: ) -> None:
kwargs['task_id'] = self._get_unique_task_id(task_id, dag, task_group)
...
super().__init__(dag=dag, task_group=task_group, **kwargs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was something I wanted suggest when we were doing the @task
PR. However, I like that we access the kwargs and then we pass them to super constructor. Additionally they are not visible in signature so users don't have to think about them. But no strong opinion on my side
That being said, I wasn't super aware of the feature of dynamically generating unique task_id when For example, if there are many I think we should make it clear somewhere that the dynamic task_id feature should be used carefully if the DAG is expected to change in the future. |
This is a valid problem. I would be in favor of describing this in docs as I'm not sure if we will be able to solve this problem somehow (disabling id auto generation is rather no go for me). @casassg @kaxil @dimberman @ashb WDYT? |
Agree on adding this on the tutorial and concepts documentation for |
Crucial feature of functions decorated by @task is to be able to invoke them multiple times in single DAG. To do this we are generating custom task_id for each invocation. However, this didn't work with TaskGroup as the task_id is already altered by adding group_id prefix. This PR fixes it. closes: apache#12309
Co-authored-by: Kaxil Naik <[email protected]>
08338c1
to
d1e8760
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotta love the Knights of Nii
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Crucial feature of functions decorated by
@task
is to be ableto invoke them multiple times in single DAG. To do this we are
generating custom task_id for each invocation. However, this didn't
work with TaskGroup as the task_id is already altered by adding group_id
prefix. This PR fixes it.
closes: #12309
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.