-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding node_id to ExecutionPlanProperties #12186
base: main
Are you sure you want to change the base?
Conversation
…notation to combined create_physical_plan API
We had similar challenges when using DataFusion in a context similar to yours (checkpointing etc.) I have consulted with my team on how we solved them, and discussed this general approach. I can share the following general thoughts/concerns:
Therefore, we think the right approach is to traverse the tree (for plans or streams, depending on your use case), generate the IDs as you see fit, and store it downstream in a map container that associates nodes with your IDs. I don't think doing this upstream inside Apologies for not being able to provide feedback and start the detailed discussion before you prepared a PR, but in my defense you were too fast :) I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well |
Thanks for the feedback @ozankabak . Would it make sense to add this traversal code in the utils? I would be nice for this to be available in the core library itself. |
Let's create a new example file, add the traversal logic and a simple of demonstration of how to store the node <-> id association via a map in that file. I think it will be a fairly succinct yet guiding example for many others. You may run into a difficulty creating or inserting into a map with We wanted to make |
Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle. While we are interested in using the node_id for stateful checkpointing, it does make sense for this to be on an ExecutionPlan since it is a node on PhysicalPlan graph.
Streams can easily, derive their id "{node_id}_{stream_id}"
Presumably for any user interested in using the "node_id", they'd make their operators implement "with_node_id".
For additional types of node_ids, I suppose one could maintain them in a HashMap<usize, T> where usize is the node_id and not a ptr to Arc |
Can you share a link to your implementation attempt? This is a little surprising, I'd like to go over it and understand what is going on. |
@ozankabak would love any pointers. this code is admittedly a quick draft. so we pass in a hashmap and recursivesly traverse the tree.
then when executing the plan, we need to actually annotate this and set a global hash map singleton.
now when creating a Stream, we do need to pass a reference to the ExecutionPlan it is tied to in order for it to figure out the channel_tag/checkpoint_tag it is supposed to use to coordinate checkpoints. Btw the annotation outputted is something like -
In contrast this PR annotates the node_ids during
|
Just chiming in -- the implementation in this PR seems quite reasonable to me. While there are definitely ways to hack around the limitation of not having Node IDs, those strategies would be quite vulnerable to upstream breaking changes and given datafusion's goal of being extensible it makes makes sense for these to be a core a feature of the library. |
Thanks for the sketch, I see the challenge. After reviewing your sketch and going back and inspecting our solution in our fork, it seems to me like it may indeed make sense to add this functionality natively to upstream DF. That being said, I am worried about a few things (default value being Next week, I (or maybe @berkaysynnada) will work on a draft PR to flesh out some ideas on our end, then we can compare/contrast and arrive at a final design together. Then we can get some extra community review and merge this if noone else has other concerns or objections (whether this functionality should belong to core etc.) Thanks for awesome collaboration. |
Just wanted to leave a note to stress that I haven't forgotten about this -- this is on our shortlist to focus on after resolving some urgent issues. |
thanks for taking the time. looking forward to continuing the discussion. |
Which issue does this PR close?
Closes #11364
Rationale for this change
Currently ExecutionPlans dont have an identifier associated with them, making it hard to distinguish between the nodes for
usecases such as snapshotting continuous pipelines, displaying node metrics in a UI etc.
What changes are included in this PR?
Changes to -
ExecutionPlanProperties
to add node_idOption<usize>
ExecutionPlan
to addwith_node_id()
method to return a copy of the ExecutionPlan with assigned node id.SessionState
to add node_id annotation to finalized physical plans.physical-plan/src/node_id.rs
to traverse ExecutionPlans and generate deterministic ids for the whole tree.Are these changes tested?
Added asserts to an existing test in
datafusion-examples/src/planner_api.rs
.Are there any user-facing changes?
No