Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Link PhysicalOperator to its LogicalOperator #47986

Merged

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Oct 11, 2024

Why are these changes needed?

It can be useful to know the LogicalOperator which was translated to the particular PhysicalOperator. For example, if we are trying to determine the exact type of Read operation, the PhysicalOperator will be a Map operator, which is ambiguous (we can only tell by examining the operator name, which is not reliable).

This PR links each PhysicalOperator to its originating LogicalOperator upon creation in the execution plan optimizer, so that we can access this information during execution.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Comment on lines 122 to 123
physical_op = plan_fn(logical_op, physical_children)
physical_op.set_logical_operator(logical_op)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work for logical operators that create multiple physical operators (e.g., Read which creates an InputDataBuffer and a TaskPoolOperator)? Would PhysicalOperator._logical_operator be None for everything but the last physical operator created by a single logical operator?

Copy link
Contributor Author

@scottjlee scottjlee Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. yeah, the current behavior would be exactly as you describe above. alternatively, i could add it to each physical operator created, but that would require a bigger change to PhysicalOperator.__init__() to accept the source LogicalOperator as a parameter.

i can see that also being useful, but requires a bigger change. thoughts on if it's worth the additional complexity here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Read -> InputDataBuffer->TaskPoolMapOperator was the only case i could come up with where one LogicalOperator produces multiple PhysicalOperators. so that's why i'm wondering if it's worth the extra change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, i can just manually call set_logical_operator() in plan_read_op, so no need to add it as a constructor arg.

Copy link
Contributor

@raulchen raulchen Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another approach is that you iterate the DAG here and set logical operator for those don't have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the physical DAG is already generated, how would we know which logical operator to set?

for example (ignoring operator fusion), if we have Read -> Map, which gets translated to `InputDataBufer -> TaskPoolMapOperator_1 -> TaskPoolMapOperator_2, and the mapping of physical -> logical operator is:

{
    Read: TaskPoolMapOperator_1,
    Map: TaskPoolMapOperator_2
}

how would we know that the InputDataBuffer also originated from the Read op, unless we explicitly set it during the translation from logical to physical op?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This _plan function generates the DAG in a post-order traverse order.
So at this line, all nodes without logical operators must be created by the current logical operator.

@bveeramani bveeramani self-requested a review October 11, 2024 16:50
@@ -188,6 +188,9 @@ def __init__(
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_completed = False
# The LogicalOperator which was translated to create this PhysicalOperator.
# Set via `PhysicalOperator.set_logical_operator()`.
self._logical_op = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a list for fused physical operators.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i was looking at this, and realized that during operator fusion, we actually construct a new LogicalOperator to represent the fused PhysicalOperators. For example for fusing two Map->Map ops, we construct a new logical Map op: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/logical/rules/operator_fusion.py#L355-L360

so i was thinking we should link this newly constructed logical op, instead of combining the list of linked logical ops from the physical operators being fused, since this most closely reflects what's going on under the hood. let me know if you think we should still maintain the full list of logical operators (i can still see that being useful for keeping track of where linked logical operators end up).

@scottjlee scottjlee enabled auto-merge (squash) October 15, 2024 19:33
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Oct 15, 2024
@scottjlee scottjlee merged commit 347af0e into ray-project:master Oct 15, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants