-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Fix actor pool project splitting when column is not renamed #2998
Conversation
CodSpeed Performance ReportMerging #2998 will not alter performanceComparing Summary
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2998 +/- ##
==========================================
+ Coverage 77.80% 78.12% +0.31%
==========================================
Files 602 602
Lines 71892 71461 -431
==========================================
- Hits 55938 55830 -108
+ Misses 15954 15631 -323
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome
recursive_count: usize, | ||
) -> DaftResult<Transformed<Arc<LogicalPlan>>> { | ||
// TODO: eliminate the need for recursive calls by doing a post-order traversal of the plan tree. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent :)
src/daft-plan/src/logical_optimization/rules/split_actor_pool_projects.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: Jay Chia <[email protected]>
…entual-Inc#2998) Previously, this would fail: ```py import os os.environ["DAFT_ENABLE_ACTOR_POOL_PROJECTIONS"] = "1" import daft from daft import udf @udf( return_dtype=daft.DataType.int64(), batch_size=1 ) class MyUDF: def __init__(self): # import time # time.sleep(10) pass def __call__(self, _): # import time # time.sleep(10) import os pid = os.getpid() return [pid] MyUDF = MyUDF.with_concurrency(4) df = daft.from_pydict({"a": list(range(10))}) df = df.into_partitions(4) df = df.select(MyUDF(df["a"])) df = df.select(MyUDF(df["a"])) df.show() ``` This is because when we split the project into multiple actor pool projects, we create new names for intermediate columns and lose the information about the original name. This PR fixes that by adding an alias to the end of the actor pool projects. --------- Co-authored-by: Jay Chia <[email protected]>
Previously, this would fail:
This is because when we split the project into multiple actor pool projects, we create new names for intermediate columns and lose the information about the original name. This PR fixes that by adding an alias to the end of the actor pool projects.