-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry for TaskGroup #21867
Comments
What if TaskGroup contains of 20 tasks and one of them is HTTP that just retired on some trasist error. Do we really want to retry the whole task group because of it? |
It'll depends on the use case. Of course if you have a lot of tasks in a group, maybe you won't need the entire group to be executed again. In my case for example (which is described in the "additional explanation" area, I have only 2 tasks in the same task group, definitely would be useful for small groups. This is just an additional option that users of task groups could have. |
I think every developer can answer on this question himself. It depends on architecture of each dag... Main question is how to decide when TaskGroup is failed?
|
SubDagOperator which one is deprecated now inherits from BaseSensorOperator -> BaseOperator. And it has retries functional. |
Hi, I believe Taskgroup is supposed to have the same "retry” feature that subdagopt had. In other words, TaskGroup (in my opinion) will be like a "sub-pipeline" that can be combined arbitrarily in the "main pipeline". Another situation maybe like that: |
Hi, Up to now I considered TaskGroup mostly as a "visual feature". If we start adding add'l "subdag" features to it we are going to gravitate to a "subdag" concept, which is about to be dropped. If I had a need to have a more complex logic in a group of tasks then I would use approach of DAGs triggering other DAGs via TriggerDagRunOperator. Regards. |
I completely understand your opinion, but at the same time I still think that this feature would be very useful. In my case I would need this feature in a lot of pipelines so without it I'll need to create a lot of dags just for triggering them (of course I can group some of them but would still require some "dag trigger" dags). I know that it is kind of a very specific use case/situation but definitely I'm not alone with this problem. |
I expect that some of the work necessary to make this possible will be done as part of AIP-42. That feature lets you take a list of n items (in XCom, say) and tell Airflow to create n tasks for those items. The task group part of it isn't done yet, but when it is, you'll be able to have it create n task groups as well. Seems like scheduling the execution of the nth task group in an expansion would use much of the same code as rerunning a task group. |
Hello, This will be very interesting, retrie on taskgroup can make another dimension for workflow management. Regards, |
Hey, adding on here. We also encountered a situation where we need to have a retry for the entire task group (in our case, the task group checks if a cluster is up before sending a step to EMR. if the step fails we want to make sure the cluster is up before re-sending the step). I think it'll be helpful to have this mechanism. Regards |
I think this case is covered by AIP-52 Automatic setup and teardown tasks and not directly related to task groups |
Hi everyone! I'm a product manager, and I happened to come across this open issue. It seems very useful feature, and I thought it would be a great addition to our backlog. To help move things along, I've put together a set of requirements for this feature. I'd love for you to take a look and share any thoughts or concerns you might have. If anyone is available and interested in implementing this, please feel free to use the requirements as a starting point. Requirements:
Example 1: Task group with 3 tasks that is retried when any task fails and retries only failed tasks @task_group (group_id="tasks_with_retries", retries=3, retry_strategy="failed_tasks")
def task_group_example():
task_a()
task_b()
task_c() Example 2: Task group with 3 tasks that is retried only when task_b fails and retries all tasks @task_group(group_id="tasks_with_retries", retries=3, retry_condition=custom_condition)
def task_group_example():
task_a()
task_b()
task_c()
def custom_condition(task_group: TaskGroup) -> bool:
task_b_instance = next(task for task in task_group if task.task_id == "task_b")
return task_b_instance.state == State.FAILED Note: code snippets are only meant as a rough example. |
Hello @shubham22, any progress for this feature? |
@weirdjh - no, not yet. The team has been busy with other projects, including deferrable operators for Amazon provider and multi-tenancy. I will provide an update here as soon as we start development on this. Thank you for your patience. |
@shubham22 , I'd like to contribute to this project by developing this feature. I think it will be enough to start with the requirements you have created in your previous comment. Is it possible? |
Feel free |
Absolutely. Thanks @weirdjh for the initiative. The main reason I shared the requirements here is to engage those who might be interested and available. Please feel to reach out (here or on slack) if there's anything I can help with or if you want to discuss any particular requirements. |
hey @weirdjh did you make any progress on this ? If not, do you mind if we unassign it from you ? |
@vandonr-amz oh okay |
I'll take a shot at it, if someone with the rights can assign me |
Did |
Just something to consider.. the definition of last can be tricky as TaskGroup can have multiple leaf nodes thus this one probably need further split to: all leafs / specific leaf or something of a sort. We need also to consider the case of TaskGroup inside TaskGroup. Would the inner TaskGroup be considered a single leaf node? |
I don't see this as very valuable. This option doesn't exist for DAGs, and I think setting individual task retries covers enough of that need. This can be done separately, and IMO should only be a UI & API option, like it is for DAGs. |
good point. I'll start without the |
I had added that requirement in response to below comment. Users could have any number of tasks in their task group and providing them flexibility on what should be retried would help. That said, yes, this could be available as an API/UI instead of configuration for task group model. Interested to hear what others think about it.
|
This is exactly the reason I kept |
If I don't cancel B, can I still start a new run of A while it finishes the previous try ? And D after that ? That might be greedy, but it'd save time. It also means that it's more complicated than just handling it in
Also, what status should tasks have when their task group is going to be retried ? Their last know status ? (succeeded, failed, ...) ? Or |
I am not following this dicussion too much (and I have other things to look at so sorry I won't chime in for now) - but just a small reminder that this dicussion has definitely the scope and impact on the core design of key Airflow component, so whatever is discussed here, should be brought to the devlist, maybe even it is worth to mention on the devlist that this discussion is happening here. (I do not want to stop the discussion - seems relaly cool, just wanted to make sure to stress this aspect :) |
Yeah i think it's actually probably AIP-worthy because it's a big change and rather complicated, lots of corners. |
@dstandish - in the past, we have added listeners and notifiers without AIP and I believe this is a simpler change than that. It does have edge cases, but we can start with simpler implementation. IMO, AIP -> feedback -> voting -> PR -> feedback, just adds more steps to having this available to the users. Happy to disagree and commit, if others share the same opinion. |
I think it's more @shubham22 - this is why I pointed it out as well. Just the level of comments up here and the need to show diagrams to explain the complexity is enough to say "let's discuss it more seriously than PR because we might miss some voices in the discussion". While I think there are many things we might optimize for "speed to get in the hands of our users" - adding new features that might have potentially difficut rough edge cases is not one of those. For those kind of features we optimise it for "let's make sure we do not have to handle a big number of issues of confused users". We must take it "slow" - in the sense that we must make sure everyone has a chance of participation and that we make an effort to involve those "everyone"'s to flag potential problems. And yes. It makes the decision process slower. By design. This is a good thing. I think comparing listeners and notifiers is pretty unfair comparision. To be honest - both listeners and notifiers have far less far reaching impact on existing users. Those are completely new features, that were designed to support new things:
In any case - existing users will have to deliberately choose a new feature to implement notifiers or listeners and it will not affect their normal flows. Not so much with task group retry. This is something that will be immediately avaiable for everyone who uses task groups. And they will try it when they see it. It requires exactly 0 effort and 0 deliberate actions from the side of users to use it - they will "just see a new option to clear task groups" when they upgrade airflow. And I agree with @dstandish - there are edge cases. Likely nasty ones. I'd say retrying task groups is much closer to "Setup/teardown" than "notifications/listeners". And I know how much time it was spend on discussing the edge cases on setup/terdown. A LOT. The AIP has been written and updated few months later to reflect it. Nobody wants to have an AIP and deeper discussion for the sake of it. We just think it needs to be well thought before we implement it. |
Fair. I had to try to see if it can reduce some effort : ) |
Yeah it will be nice to have "living doc" of sorts, a draft AIP, where the proposal can be clearly outlined, edge cases and caveats noted, and we can refine and iterate. Will be easier for folks who have not been following to jump in and see the current state and assess it. Could create a slack channel for discussion of the points of uncertainty in the proposal. |
Warning... this is a long one... just tried to catch up on the current state and share some thoughts and some reflections based on learnings in AIP-52 With that... Re
And
So, a task group's leaves are well-defined, no matter how many task groups are contained in it. It's {x for x in group if not has_downstream_task_in_group(x)}. And the rule for determining dag run state is if any of those leaves is failed / upstream failed. So this seems like a non-issue, except for ... do you allow a retryable group within a retryable group... oy... I guess in principle there's no obvious problem. The retry of the outer implies retry of the inner. Concerning this debate:
A few related notes before discussion of this. During AIP-52, late in the process, the idea emerged to add a flag to operator which would in effect mean "should i be considered for dag run state". Essentially, letting any task be "opt out" if it is a leaf. Currently teardown tasks are ignored in this way by default, but it's overridable. Though we could extend this option to all tasks. And this could be generalized to "should i be considered for container state" i.e. group or dag. Similarly, there is another flag that could come into existence "should i be counted as a leaf when arrowing from task group". Again, from teardowns are by default ignored in this scenario. And this isn't overridable, but, you can simply add the downstream relationship after the fact if you want it. Again, not implemented but could be. And now coming back to the above discussion. It might make sense to simply delegate to the task itself whether it should be rerun on retries of the container (group or dag). Certainly though, clearing should always clear all the tasks in the container, no?
So yeah, I think given the above discussion, delegating to task seems simpler / more flexible, no?
and
You may not be aware, but recently added was a
that seems problematic.
clear, i.e. no status, would seem to be the obvious choice, consistent with current behavior. a task can't be run if it's in "success" state, why delay it. indeed, even if you don't delay it, i can imagine a race condition where the dag may end before there's a chance to retry the task group; of course this would be more problematic with a delay. also consider the following... note first that you can have things outside a group which are downstream of things inside a group. so, what happens when you have tasks that branch off downstream from tasks in the middle of your retryable task group. in effect, by retrying your task group, you are clearing "upstream". But do you also "recurse downstream", i.e. any tasks that are cleared in the group will be recursed downstream and everything else even outside the group will also be cleared. in light of this, perhaps a retryable task group should not be able to have any external downstreams apart from those that connect to its leaves. indeed, there may still be a problem with that because if there are multiple leaves in the group, then maybe one is successful and one is failed, and maybe the successful leaf has already triggered there downstreams, which are all done now, but the other leaf failed so the group retries, so now do you clear the downstreams of the success branch? probably not. in view of this perhaps what you want is more like SubDag, where you can't have inter-dag task deps (so no arrowing directly to tasks in a retryable group) and all you can do is wait for the black box to complete successfully. Indeed thinking this over it crossed my mind that perhaps what we want is to reimplement subdag as an inline construct, something different from task groups. with dag:
with TaskGroup:
with SubDag:
with TaskGroup: One thing that I encountered when working on setup and teardown was, when we first started on it I think it was having taskgroup take on too much. I.e. it wanted to make setup and teardown attributes of taskgroup. And this interfered with taskgroup as an arbitrary container of logic, because it added the constraint that a setup or teardown could only happen at begin or end of task group and only one each in a group. And this would require a either a big refactor of many dags, or disuse of setup / teardown. And it interfered with the use of taskgroup as an arbitrary container of tasks be it for visual grouping or business logic. And with taskgroups also being an authoring tool for mapping, same isue there. With retrying, it actually seems substantially less problematic than what I describe with AIP-52, but I share the anecdote simply to note that, we should be mindful of having the one construct do too much. Not that we are necessarily doing that with this. The other side of that is the risk of proliferation of too many similar constructs. But, I think inline SubDag has at least some potential worth giving a thought. |
Thank you for the very detailed explanation, @dstandish. Your insights are appreciated, and it took me three days to muster the courage to read through it 😅
Yes, the notion of retrying the inner group when retrying the outer, as well as configuring retrying explicitly for just the inner task group, seems intuitive and should not cause any problems.
Agree.
Conceptually, delegating this decision to a task, so that it can declare whether it should be considered to determine the container state, appears both reasonable and clean. I also like that it aligns with the logic used for a teardown task, making it easier for users to learn and apply.
Here, I think an opinionated approach would be easier to adopt and configure. Essentially, users would either want to retry failed tasks or the whole container in most cases, no? Too many task-level configurations might overly complicate the task model. Should we go down this path, I anticipate a name like
Yea, I could buy this as users would want more flexibility here and delegating to the task provides that.
Great callout. I hadn't previously considered the recurse downstream functionality. I also wonder if we really need to? For the sake of simplicity, we could allow users to choose
Personally, I'd be against this, especially given that we have discouraged users from using subdags due to performance issues. I think we should look into extending task groups and make them more functional, rather than reverting to extend subdags or reimplement them. Both are quite similar concepts, we should adopt one and enrich it, so that users don't need to read yet another documentation page on when to use what. |
I'm not sure how I feel about this. I don't think we need to discuss cases like:
This goes way way way beyond the definition of edge case. |
looking forward to this feature, any progress? great thanks. |
Generally if no-one reports here - there is no progress. But if you want to work on it - feel free, otherwise someone will have to start working on it and complete it - this is how open source works - the most certain way of getting something in is to implement it, the second best is to find someone who can do it - other than that it's "when somene will do it" - and there is no progress or reporting or anyone tracking it. |
@potiuk great thanks for your reply. I learned about it. As a summary here, at the time point now, there may be a lack of support for implementing a repeating loop of tasks. In the history, So would you mind to give some advices on how to work around this use case in the current version of airflow, I think that would be good for new comers to learn and find some solutions before someone take up this feature. great thanks. |
It's not always possible to give advices - sometimes the advices is "if you need it - contribute it". Generally speaking this is an open source project and expecting that one single person can always provide the answers is demanding too much. Maybe someone can give a better answer, but I think You can split to another dag and use TriggerDagRun and retry on errors. Another option would be to contribute that feature as mentioned -this is a free software and you get it for free so you get it "as is" - even if it is somewhat inconvient, but contributing is generally a good idea and I encourage you to do so. Or maybe even find and pay someeone to do it if you don't feel like it. Other than that you need to wait until someoe will do it - simply - that's how dynamics of the open-source project is. |
Discussed in #21333
Originally posted by sartyukhov February 4, 2022
Description
Hello!
Previously, a SubDag was used to organize tasks into groups. Now you've introduced a TaskGroups to the world .
It's nice and very clever. But it has a one big disadvantage over the SubDag - it cant be repeated.
Use case/motivation
For example:
In a project I have two task (A >> B):
A - collect data (PythonOperator)
B - update material view in postgres (PostgresOperator)
'A' could collect only part of data and mark itself as failed (there is no "half-failed" status as I know). But task 'B' should run regardless of A`s result (trigger_rule="all_done" for example) to update matview with part of data.
In an ~ hour I would like to repeat that process (A >> B).
With SubDag I could do that:
and that's it, C marks dag as failed and trigger it to retry.
But TaskGroup does not have retry parameter.
I also can't retry whole DAG, because it's big.
I also don't want to update material view inside task 'A' because in that way I can't do [A0, A1..An] >> B (update material view just once for several collects).
I hope it's possible. Or maybe it could be done some other way.
Thanks in advance.
Additional explanation on the use case (from #21333)
I have a specific use case where this feature would be useful. It is like:
There is a task to do one thing
There a second task (which depends on the first one) that does another thing, if this one fails I'll need to re-run the entire dag. I can't do both processes in the same task due to some limitations (I work with different java drivers on each one) and retrying the same task doesn't solve the problem because the result of this task will imply whether or not the first dag would need a re-execution.
Clear the previous task(s) also isn't good because it'll cause an infinite loop until everything succeeds, which is not exactly good, at least for me I would need only some 3-5 retries until it keeps a failed state.
My workaround for this was creating a dag that will trigger this dag, so if the triggered dag state is failed it'll re-execute the amount of times I set. However as you can see, it makes necessary the creation of 2 dags for solving the problem.
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: