Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Spread scan tasks over Ray cluster. #1950

Merged
merged 3 commits into from
Feb 26, 2024
Merged

Conversation

clarkzinzow
Copy link
Contributor

This PR forces a SPREAD scheduling strategy for scan tasks when using the Ray runner. This should result in better load balancing of read tasks across the Ray cluster, yielding:

  • better utilization of the aggregate network bandwidth of the cluster,
  • better memory stability due to a more even post-read object distribution,
  • better performance of downstream parallel compute operations due to a more even distribution of data over the compute bandwidth of the cluster.

Closes #1940

)
]


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to move these to execution_step.py since it's a more sensible location for them, and to not introduce an otherwise unnecessary ray_runner.py -> rust_physical_plan_shim.py dependency.

daft/runners/ray_runner.py Outdated Show resolved Hide resolved
Copy link

codecov bot commented Feb 26, 2024

Codecov Report

Attention: Patch coverage is 88.00000% with 3 lines in your changes are missing coverage. Please review.

Project coverage is 83.93%. Comparing base (0a51db1) to head (7a5126b).

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1950   +/-   ##
=======================================
  Coverage   83.93%   83.93%           
=======================================
  Files          55       55           
  Lines        6111     6112    +1     
=======================================
+ Hits         5129     5130    +1     
  Misses        982      982           
Files Coverage Δ
daft/execution/rust_physical_plan_shim.py 95.45% <100.00%> (+2.35%) ⬆️
daft/runners/ray_runner.py 89.81% <100.00%> (+0.04%) ⬆️
daft/execution/execution_step.py 92.84% <85.71%> (-0.40%) ⬇️

@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Feb 26, 2024

Benchmarking

Setup

28 GiB Parquet data, 32 partitions, 8 i3.2xlarge Ray nodes each with 61 GiB RAM (18.3 GiB object store)

Results

Before PR

  • Most of the reads go to 2-4 nodes
  • 10s of GBs are spilled
  • During one of the runs, one of the workers OOMed
  • Mean execution time (3 trials): 70 seconds

Screenshot 2024-02-26 at 2 51 48 PM

Screenshot 2024-02-26 at 2 52 09 PM

After PR

  • Even spread of reads across all nodes
  • No data spilling
  • Mean execution time (3 trials): 25 seconds

Screenshot 2024-02-26 at 2 47 16 PM

Screenshot 2024-02-26 at 2 48 17 PM

@clarkzinzow
Copy link
Contributor Author

I'm considering this change to be validated, merging now!

@clarkzinzow clarkzinzow merged commit 1a94752 into main Feb 26, 2024
29 checks passed
@clarkzinzow clarkzinzow deleted the clark/spread-scans branch February 26, 2024 23:17
samster25 pushed a commit that referenced this pull request Feb 27, 2024
This PR forces a `SPREAD` scheduling strategy for scan tasks when using
the Ray runner. This should result in better load balancing of read
tasks across the Ray cluster, yielding:
- better utilization of the aggregate network bandwidth of the cluster,
- better memory stability due to a more even post-read object
distribution,
- better performance of downstream parallel compute operations due to a
more even distribution of data over the compute bandwidth of the
cluster.

Closes #1940
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make Scan tasks use a spread scheduling strategy
2 participants