Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Propagate configs to Ray remote functions #1707

Merged
merged 10 commits into from
Dec 8, 2023

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Dec 7, 2023

General Changes

  1. Adds serde and pickling __reduce__ impl for PyDaftConfig so it can be sent over the wire in Ray
  2. Parametrizes our runners (both PyRunner and RayRunner) with a DaftConfig object upon initialization.
  3. We thus also have to make it illegal from the user-API to call set_config once the runners have been instantiated
  4. I also consolidated our _RUNNER and _DAFT_CONTEXT global singletons in the daft.context module -- the runner now resides inside the DaftContext object

Overall, these changes allow us to have:

  • Python-side singleton for DaftContext containing a lazily-initialized Runner
  • The lazily-initialized Runner is initialized with the DaftConfig at time-of-initialization (when any dataframes are created or executed)
  • All subsequent code that is executed by the runner (whether locally or on Ray) should use the config that is held by the Runner, instead of reaching out to the global singleton

RayRunner changes

  1. For every call to @ray.remote, ensure that we accept a daft_config as the first arg and call set_config as the first action
  2. When the RayRunner is initialized, we call a ray.put to put the DaftConfig into the cluster store that objectref. This objectref is also passed around to various other datastructures that may potentially call ray.remote functions, such as the RayRunnerIO, RayPartitionSet and RayMaterializedResult so that they can correctly parametrize their calls with the same config.

@github-actions github-actions bot added the enhancement New feature or request label Dec 7, 2023
Copy link

codecov bot commented Dec 7, 2023

Codecov Report

Merging #1707 (21bec7c) into main (a53cd51) will decrease coverage by 0.19%.
Report is 3 commits behind head on main.
The diff coverage is 81.94%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1707      +/-   ##
==========================================
- Coverage   85.11%   84.92%   -0.19%     
==========================================
  Files          55       55              
  Lines        5368     5414      +46     
==========================================
+ Hits         4569     4598      +29     
- Misses        799      816      +17     
Files Coverage Δ
daft/runners/pyrunner.py 96.36% <100.00%> (-0.03%) ⬇️
daft/runners/ray_runner.py 88.02% <97.29%> (-0.73%) ⬇️
daft/context.py 77.08% <62.50%> (+0.67%) ⬆️

... and 9 files with indirect coverage changes

daft/context.py Outdated Show resolved Hide resolved
daft/context.py Outdated Show resolved Hide resolved
@clarkzinzow
Copy link
Contributor

clarkzinzow commented Dec 8, 2023

@jaychia Quick high-level question: what's the primary motivation for attaching the PyDaftConfig to the runners instead of fetching from the global singleton when it's time to submit a Ray task? It seems like attaching it to the runner doesn't give us much except some extra complexity, but I might be missing something.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Dec 8, 2023

Main motivators that come to mind are:

  1. ensuring that the config is frozen from when execution starts to when execution finishes to prevent config drift across Ray workers,
  2. ensuring that the ray.put()-broadcasted config is properly garbage collected by keeping it out of the global scope, and tying it's lifecycle to the runner is an easy way to ensure that.

Is that right?

jaychia and others added 2 commits December 7, 2023 19:19
Co-authored-by: Clark Zinzow <[email protected]>
Co-authored-by: Clark Zinzow <[email protected]>
@jaychia
Copy link
Contributor Author

jaychia commented Dec 8, 2023

Main motivators that come to mind are:

  1. ensuring that the config is frozen from when execution starts to when execution finishes to prevent config drift across Ray workers,
  2. ensuring that the ray.put()-broadcasted config is properly garbage collected by keeping it out of the global scope, and tying it's lifecycle to the runner is an easy way to ensure that.

Is that right?

Yup! My primary motivation was that if we could tie the lifetime of the config to the lifetime of the runner as much as possible, then we have a guarantee that all the configs are consistent all remote calls in a given run on a given runner.

Ideally the lifecycle of objects should look something like:

  1. "Setup stage": set all configs programmatically
  2. "Execution stage": config is used to instantiate the runner and is then thrown away.

(2) isn't quite true atm because we keep a redundant copy of the daft_config on the context object still... But I disable messing with it by not allowing calls to set_config once the runners have been instantiated

@jaychia
Copy link
Contributor Author

jaychia commented Dec 8, 2023

Also, I couldn't find a way around certain ray.remote functional calls needing the ObjectRef "version" of the daft_config, because simply grabbing the singleton like so: get_context().daft_config only gives the local copy of the config instead of the one that we put in the cluster...

The easiest way was to just store the ObjectRef on the runner and manage its lifetime there

@jaychia jaychia merged commit 37e7329 into main Dec 8, 2023
39 of 40 checks passed
@jaychia jaychia deleted the jay/propagate-config-ray-remote branch December 8, 2023 06:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants