Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(airflow): Fix nodes grouping #664

Merged
merged 24 commits into from
May 10, 2024
Merged

Conversation

ElenaKhaustova
Copy link
Contributor

Description

Fix #655

Development notes

Checklist

  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the relevant RELEASE.md file
  • Added tests to cover my changes

@ElenaKhaustova ElenaKhaustova self-assigned this May 3, 2024
Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
for node, parent_nodes in pipeline.node_dependencies.items():
for parent in parent_nodes:
dependencies[parent.name].append(node.name)

# Sort both parent and child nodes to make sure it's deterministic
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sorting was replaced with the original topological order obtained from kedro; it is deterministic. Otherwise nodes and dependencies have a different order which is confusing.

Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
@ElenaKhaustova ElenaKhaustova marked this pull request as ready for review May 3, 2024 20:49
@ElenaKhaustova
Copy link
Contributor Author

@ankatiyar and I have also discussed whether we need to add grouping logic to the e2e test. IMO, grouping logic is currently covered well by unit tests, and we can go without it. Unit tests already include kedro airflow create --group-in-memory command run, which is kind of the e2e test. But happy to hear others' thoughts.

Copy link
Contributor

@ankatiyar ankatiyar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manually tested, works well! 💯
ETA: About the e2e test, I think it's still worth adding an e2e test because in the unit tests we are mocking the catalog etc. The bug would have been evident earlier if it there was an e2e test in place. I think we can do it as a follow up task though, I want to explore other grouping strategies as a part of the airflow milestone too.

@ElenaKhaustova ElenaKhaustova requested a review from DimedS May 8, 2024 10:43
Copy link
Contributor

@DimedS DimedS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, @ElenaKhaustova ! It looks good to me. I just have a few small questions.

dependencies = {}
for node in pipeline.nodes:
nodes[node.name] = [node]
dependencies[node.name] = []
for node, parent_nodes in pipeline.node_dependencies.items():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you believe that this row will be deterministic? I mean it affects the order of .append(node.name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are iterating nodes in the order they were resolved with topological sort, which is deterministic, and nodes and dependencies dictionaries are ordered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dependencies[parent.name].append(node.name)
I understand that the parents will be deterministic because you initialised an empty dictionary previously. However, could you explain in more detail why the dictionary values containing the children will also be deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're deterministic because all the traversals inside group_memory_nodes function are deterministic as well and based on pipeline.nodes order which is sorted already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments about the intent of keeping it deterministic, it is not so obvious from the code. Ideally there should be tests for it but AFIAK it's a bit tricky to reproduce the randomness. This is only manually tested I assume.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're deterministic because all the traversals inside group_memory_nodes function are deterministic as well and based on pipeline.nodes order which is sorted already.

Is it means that previously code was not deterministic (and sorting was used) because of using defaultdict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tested manually, yes.

Theoretically, we can randomise n test cases and run each test k times to ensure we get the same results within runs. But given that there's no randomness, the results will be the same, so I don't think we need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The randomness cannot be simulated with n use cases, this is another stories, but I agree we don't need it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Randomness - no, but you can make a stress test for determinism, which will not guarantee it with 100% chance, but for large n and k, it will be close to it. Anyway, creating an adjacency list is deterministic, and dfs is deterministic; except for them, we have other loops which rely on Pipeline API, so there's no place from where randomness can come. So, I agree on not doing anything with it. 🙂

if node_input in memory_datasets:
adj_matrix[node.name].add(output_to_node[node_input].name)
adj_matrix[output_to_node[node_input].name].add(node.name)
parents[output_to_node[node_input].name].add(node.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it seems like that's a children dictionary, because the values are children, as I understand it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, a key is a parent's name, while value is the children's list. Will rename it to parent_to_children for clarity. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 40 to 45
for node_input in node.inputs:
if node_input in output_to_node:
if node_input in memory_datasets:
adj_matrix[node.name].add(output_to_node[node_input].name)
adj_matrix[output_to_node[node_input].name].add(node.name)
parent_to_children[output_to_node[node_input].name].add(node.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for node_input in node.inputs:
if node_input in output_to_node:
if node_input in memory_datasets:
adj_matrix[node.name].add(output_to_node[node_input].name)
adj_matrix[output_to_node[node_input].name].add(node.name)
parent_to_children[output_to_node[node_input].name].add(node.name)
for node_input in node.inputs:
if node_input in output_to_node:
parent_to_children[output_to_node[node_input].name].add(node.name)
if node_input in memory_datasets:
adj_matrix[node.name].add(output_to_node[node_input].name)
adj_matrix[output_to_node[node_input].name].add(node.name)

If rearranging it does not change the logic, I prefer to keep the related code closer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""
memory_datasets = get_memory_datasets(catalog, pipeline)

adj_matrix: dict[str, set] = {node.name: set() for node in pipeline.nodes}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really an adjacency matrix? I was thinking https://en.wikipedia.org/wiki/Adjacency_matrix but it seems like it's just another dictionary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an adjacency list: https://en.wikipedia.org/wiki/Adjacency_list. Will rename it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@noklam noklam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of small comment. Is this tested manually?

The only concern for me is implementing a new dfs here. I recalled I have to implement a bfs for the SoftFailRunner, at the end I end up using some Pipeline API, which actually is doing the search already. I just want to make sure this is considered already, and do we see a need to extend on the Pipeline API side instead.

example of bfs:
https://github.com/noklam/kedro-softfail-runner/blob/970036ea8d5c969d02ed9150bfd4a2dc4baf967a/kedro_softfail_runner/core.py#L92

@ElenaKhaustova
Copy link
Contributor Author

ElenaKhaustova commented May 9, 2024

Left a couple of small comment. Is this tested manually?

The only concern for me is implementing a new dfs here. I recalled I have to implement a bfs for the SoftFailRunner, at the end I end up using some Pipeline API, which actually is doing the search already. I just want to make sure this is considered already, and do we see a need to extend on the Pipeline API side instead.

example of bfs: https://github.com/noklam/kedro-softfail-runner/blob/970036ea8d5c969d02ed9150bfd4a2dc4baf967a/kedro_softfail_runner/core.py#L92

Thank you for the comments, I've addressed them.

Not sure I fully understand the concern about using dfs vs bfs. We cannot rely just on Pipeline API since we need to adjust the graph by adding new edges to group connected MemoryDatasets into one node. Thus we need an updated adjacency matrix and make one traversal to find connected components. There's no difference whether it's a dfs or bfs. The only issue which might happen with dfs is if the number of nodes is more than the default size of the recursion stack which is around 1000.

@ElenaKhaustova ElenaKhaustova requested a review from noklam May 9, 2024 19:38
Signed-off-by: Elena Khaustova <[email protected]>
@noklam
Copy link
Contributor

noklam commented May 10, 2024

Not sure I fully understand the concern about using dfs vs bfs. We cannot rely just on Pipeline API since we need to adjust the graph by adding new edges to group connected MemoryDatasets into one node. Thus we need an updated adjacency matrix and make one traversal to find connected components. There's no difference whether it's a dfs or bfs. The only issue which might happen with dfs is if the number of nodes is more than the default size of the recursion stack which is around 1000.

@ElenaKhaustova I haven't dived deep into this enough to understand it fully. I will approve this now as I don't want to block the PR as it works perfectly fine.

kedro-org/kedro#3758 It's another topic but a way to adjust the graph. The API is obviously bad but just want to show that there may be potential to open this up a little bit.

@ElenaKhaustova
Copy link
Contributor Author

Not sure I fully understand the concern about using dfs vs bfs. We cannot rely just on Pipeline API since we need to adjust the graph by adding new edges to group connected MemoryDatasets into one node. Thus we need an updated adjacency matrix and make one traversal to find connected components. There's no difference whether it's a dfs or bfs. The only issue which might happen with dfs is if the number of nodes is more than the default size of the recursion stack which is around 1000.

@ElenaKhaustova I haven't dived deep into this enough to understand it fully. I will approve this now as I don't want to block the PR as it works perfectly fine.

kedro-org/kedro#3758 It's another topic but a way to adjust the graph. The API is obviously bad but just want to show that there may be potential to open this up a little bit.

To be honest, I don't think the API is bad it's just out of the scope of pipeline API.

@ElenaKhaustova ElenaKhaustova merged commit 624ace1 into main May 10, 2024
20 checks passed
@ElenaKhaustova ElenaKhaustova deleted the airflow-bug-nodes-grouping branch May 10, 2024 20:34
tgoelles pushed a commit to tgoelles/kedro-plugins that referenced this pull request Jun 6, 2024
* Update memory dataset checking

Signed-off-by: Ankita Katiyar <[email protected]>

* Built adjacency matrix

Signed-off-by: Elena Khaustova <[email protected]>

* Implemented connectivity components search

Signed-off-by: Elena Khaustova <[email protected]>

* Replaced sort with topological order

Signed-off-by: Elena Khaustova <[email protected]>

* Removed debug output

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed pre-commit errors

Signed-off-by: Elena Khaustova <[email protected]>

* Updated unit tests for node grouping

Signed-off-by: Elena Khaustova <[email protected]>

* Refactored grouping function

Signed-off-by: Elena Khaustova <[email protected]>

* Added clarification comments

Signed-off-by: Elena Khaustova <[email protected]>

* Updated unit test

Signed-off-by: Elena Khaustova <[email protected]>

* Added missed return types

Signed-off-by: Elena Khaustova <[email protected]>

* Linter errors fix

Signed-off-by: Elena Khaustova <[email protected]>

* Fixed mypy errors

Signed-off-by: Elena Khaustova <[email protected]>

* Fixing docs build

Signed-off-by: Elena Khaustova <[email protected]>

* Fixing docs build

Signed-off-by: Elena Khaustova <[email protected]>

* Renamed parent dictionary

Signed-off-by: Elena Khaustova <[email protected]>

* Added comments to clarify the resulting order nodes

Signed-off-by: Elena Khaustova <[email protected]>

* Renamed matrix -> list

Signed-off-by: Elena Khaustova <[email protected]>

* Applied suggested change

Signed-off-by: Elena Khaustova <[email protected]>

* Added missed renamings

Signed-off-by: Elena Khaustova <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Elena Khaustova <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
Signed-off-by: tgoelles <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kedro-airflow: Grouping in memory is not working properly
4 participants