Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Create or document a fast way to query the status of large workflows #4056

Open
2 tasks done
Tom-Newton opened this issue Sep 20, 2023 · 10 comments
Open
2 tasks done
Labels
backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request

Comments

@Tom-Newton
Copy link
Contributor

Tom-Newton commented Sep 20, 2023

Motivation: Why do you think this is important?

For large workflows with ~1000s of nodes its difficult to monitor the overall progress. At this scale we need to be able to get things like counts of how many nodes or sub-workflows are in each status. e.g. 900 success, 200: unknown 2 failed, 98: in progress.

I think supporting large workflows like this would be a valuable feature and its critical to what I want to do with flyte.

Goal: What should the final outcome look like, ideally?

I think ideal would be an interface similar to remote.sync_execution(sync_nodes=True) but fast for large workflows. This would be very flexible.

Other interfaces would also be fine - I'm mostly just interested in it being fast.

Describe alternatives you've considered

The flyte UI:
It displays lists and graphs but at the scale of 1000s of nodes these are impossible to parse by eye. Additionally in tends to crash my browser.

flytectl get execution
Can get information about nodes when using --details but it seems to be incomplete. Writing to a .yaml file and searching, I find quite a lot of nodes are missing.

flytekit remote.sync_execution(sync_nodes=True)
This does fetch all the important information and could certainly be parsed by some python code to extra whatever metrics are needed. The problem is that it takes about 12 minutes to run on a workflow with 3000 nodes.
EDIT: It actually doesn't fetch information about nodes that haven't started processing yet. So nodes that would show with unknown status on the UI are missed.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@Tom-Newton Tom-Newton added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Sep 20, 2023
@welcome
Copy link

welcome bot commented Sep 20, 2023

Thank you for opening your first issue here! 🛠

@brndnblck brndnblck added the backlogged For internal use. Reserved for contributor team workflow. label Sep 21, 2023
@kumare3
Copy link
Contributor

kumare3 commented Sep 21, 2023

cc @pingsutw when we added system tags did we add a query method for this - in list executions?

@pingsutw
Copy link
Member

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented Sep 21, 2023

Thanks for the replies. I gave the tags filtering a try but I don't really understand how the tags can help. I want to see the status of all the nodes in a particular workflow quickly but I can't find any API that can list all the nodes in a workflow. All the APIs seem to list only immediate child nodes, rather that the whole graph.

I don't really think filtering is the problem, all the APIs I've tested already have too much filtering.

@wild-endeavor wild-endeavor removed the untriaged This issues has not yet been looked at by the Maintainers label Sep 22, 2023
@eapolinario eapolinario added backlogged For internal use. Reserved for contributor team workflow. and removed backlogged For internal use. Reserved for contributor team workflow. labels Sep 22, 2023
@Tom-Newton
Copy link
Contributor Author

I would still be keen to know if there is a better way but I ended up creating a version of remote.sync_execution(sync_nodes=True) that make gRPC requests in many parallel threads by wrapping iterate_node_executions and running it with asyncio. Now the 12 minutes is now more like 10 seconds.

@EngHabu
Copy link
Contributor

EngHabu commented Sep 26, 2023

This is awesome, @Tom-Newton...
flytekit (particularly remote) needs asyncio everywhere... Would you be open to redoing your change within flytekit codebase to support asyncIO? even if we do it one API at a time (or create a separate asyncRemote type with smaller interface initially until we migrate all APIs)

@Tom-Newton
Copy link
Contributor Author

Tom-Newton commented Sep 26, 2023

What I did was a fairly naive approach that could definitely be optimised. The best way though would be to add native support for asyncio in flytekit as you say. Anything that uses gRPC could probably benefit. I'm a total noob when it comes to asyncio and to gRPC but it looks like we could use https://grpc.github.io/grpc/python/grpc_asyncio.html. Potentially I could contribute to this but probably not in the near term.

@Tom-Newton
Copy link
Contributor Author

If anyone is interested in the code I'm using personally
flyte_remote.py.txt (Its got a .txt extension because github doesn't allow uploading .py)

@Tom-Newton
Copy link
Contributor Author

I just saw #2193 which seems to be about an async interface to flyte admin, which is one possible solution to this

Copy link

dosubot bot commented Feb 5, 2024

🤖

Hey @Tom-Newton! Great to see you diving into the depths of Flyte workflows again. Looks like we're on another interesting adventure. 🚀

Based on your requirements, I would recommend using the ListNodeExecutions function in the NodeExecutionManager class. This function lists all node executions for a given workflow execution and can be used to efficiently query the status of large workflows in Flyte.

Here is a sample usage of the ListNodeExecutions function:

nodeExecutionListRequest := admin.NodeExecutionListRequest{
    WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
        Project: "my_project",
        Domain:  "my_domain",
        Name:    "my_workflow_execution_name",
    },
    Filters: "eq(phase, SUCCEEDED)",
    Limit:   100,
    Token:   "",
    SortBy:  &admin.Sort{Key: "created_at", Direction: admin.Sort_DESCENDING},
}

nodeExecutionList, err := nodeExecutionManager.ListNodeExecutions(context.Background(), nodeExecutionListRequest)
if err != nil {
    log.Fatalf("Failed to list node executions: %v", err)
}

for _, nodeExecution := range nodeExecutionList.NodeExecutions {
    fmt.Printf("Node execution: %v\n", nodeExecution)
}

In this example, the Filters field is used to filter node executions by their phase. The Limit field is used to limit the number of node executions returned in a single request. The Token field is used for pagination. The SortBy field is used to sort the node executions by their creation time in descending order.

Please note that you need to replace "my_project", "my_domain", and "my_workflow_execution_name" with your actual project, domain, and workflow execution name. Also, replace "eq(phase, SUCCEEDED)" with your actual filter condition.

The ListNodeExecutions function handles large workflows with thousands of nodes by implementing pagination. This is done by using a limit and an offset to retrieve a subset of node executions at a time. If the number of retrieved node executions equals the limit, a new token is created for the next page. This token is the sum of the offset and the number of retrieved node executions. This way, the function can handle large workflows with thousands of nodes by retrieving and processing a manageable number of nodes at a time.

You can find more details about the ListNodeExecutions function in the flyteadmin/pkg/manager/impl/node_execution_manager.go file.

Regarding the async interface to Flyte admin discussed in issue #2193, it could potentially improve the performance of querying the status of large workflows. However, it would require significant changes to the Flyte admin and might not be available in the near future. For now, I would recommend using the ListNodeExecutions function as it is designed to handle large workflows efficiently.

I hope this helps! If you have any further questions, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants