-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Fix actor pool initialization in ray client mode #3028
Conversation
CodSpeed Performance ReportMerging #3028 will not alter performanceComparing Summary
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3028 +/- ##
==========================================
- Coverage 78.45% 78.26% -0.20%
==========================================
Files 611 611
Lines 72619 72804 +185
==========================================
+ Hits 56975 56979 +4
- Misses 15644 15825 +181
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet! Seems like making the Scheduler the ActorPoolManager solved most of the problems. Pretty good.
I'm not 100% sure we want to add another test to our matrix though. Let me check with @samster25
self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None | ||
self, | ||
psets: dict[str, list[PartitionT]], | ||
actor_pool_manager: physical_plan.ActorPoolManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
@@ -271,6 +271,92 @@ def wait(self) -> None: | |||
raise NotImplementedError() | |||
|
|||
|
|||
class LocalPartitionSet(PartitionSet[MicroPartition]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a pure movement of the class from pyrunner.py? Why did we decide to move it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was being imported by dataframe.py and ray_runner.py which was causing some circular import issues. I figured since it was being used outside of the pyrunner I would move it out to simplify the dependency tree.
@@ -259,6 +260,7 @@ fn physical_plan_to_partition_tasks( | |||
physical_plan: &PhysicalPlan, | |||
py: Python, | |||
psets: &HashMap<String, Vec<PyObject>>, | |||
actor_pool_manager: &PyObject, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a good idea to actually wrap this into a ExecutionContext
? We could start using that to thread other things through as well (e.g. the execution config).
Not blocking for this PR, but could be great to formalize in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might make sense. I'll try that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at it for a bit, I don't think it would be the best idea, since the execution config is actually put into the ray object store and sent to all of the tasks, whereas the actor pool manager does not need to leave the scheduler actor.
tests/test_resource_requests.py
Outdated
df = df.with_column( | ||
"more_cpu_request", | ||
assert_resources_2(col("id"), num_cpus=1), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why rm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my note above
Note:
- One of the tests was cut down because it needed 3 CPUs to run, which combined with the 1 CPU for the scheduler actor, meant no more tasks could even be scheduled. I plan on adding some more informative errors/warnings when this happens in a future PR
This PR moves the
actor_pool_context
method from the ray runner to the scheduler, and routes the relevantActorPoolManager
implementation toactor_pool_project
. That way, it does not accidentally get the wrong actor pool context when the scheduler is running on a ray actor, which we do when in ray client mode.With this change, in addition to separating the
actor_pool_context
method out ofRunner
intoActorPoolManager
, I also move some other things around to clean things up, especially so thatray_runner.py
no longer depends on thepyrunner.py
.Note: