Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Provide more efficient + intuitive block clearing semantics for different execution modes #24127

Merged
merged 2 commits into from
Apr 30, 2022

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Apr 22, 2022

TL;DR: Don't clear for eager, clear all but non-lazy input blocks if lazy, clear everything if pipelining.

This PR provides more efficient and intuitive block clearing semantics for eager mode, lazy mode, and pipelining, while still supporting multiple operations applied to the same base dataset, i.e. fan-out. For example, two different map operations are applied to the same base ds in this example:

ds = ray.data.range(10).map(lambda x: x+1)
ds1 = ds.map(lambda x: 2*x)
ds2 = ds.map(lambda x: 3*x)

If naively clear the blocks when executing the map to produce ds1, the map producing ds2 will fail.

Desired Semantics

  • Eager mode - don’t clear input blocks, thereby supporting fan-out from cached data at any point in the stage chain without triggering unexpected recomputation.
  • Lazy mode - if lazy datasource, clear the input blocks for every stage, relying on recomputing via stage lineage if fan-out occurs; if non-lazy datasource, do not clear source blocks for execution plan when executing first stage, but do clear input blocks for every subsequent stage.
  • Pipelines - Same as lazy mode, although the only fan-out that can occur is from the pipeline source blocks when repeating a dataset/pipeline, so unintended intermediate recomputation will never happen.

Related issue number

Closes #18791

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

self, clear_input_blocks: bool = True, force_read: bool = False
self,
clear_input_blocks: bool = True,
destroy_unrecoverable_input_blocks: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name of this argument!

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM at a high level. @jianoaix can you review?

@jjyao
Copy link
Collaborator

jjyao commented Apr 24, 2022

What's fan-out here?

@clarkzinzow
Copy link
Contributor Author

@jjyao By fan-out, we mean more than one dataset operation applied to a single base dataset, e.g. two different map operations applied to the same base ds in this example:

ds = ray.data.range(10).map(lambda x: x+1)
ds1 = ds.map(lambda x: 2*x)
ds2 = ds.map(lambda x: 3*x)

I'll add this to the PR description!

@jianoaix
Copy link
Contributor

Actually, with fan-out, I think there is an issue with execution plan, which will be a DAG, rather than a chain.

For example, continue the code snippet, we can have a fan-in: ds3 = ds1.union(ds2). Now the lineage of ds3 is a DAG. And user can continue such pattern to construct complex DAG.

@clarkzinzow
Copy link
Contributor Author

@jianoaix Agreed that an explicit DAG representation would be more ideal (that's the future plan), but what do you see as the concrete current issue with fan-out here? Fan-out is supported in eager mode via reusing the base dataset's snapshot blocks, and is supported in lazy mode via recomputation from source, so fan-out should be covered. I.e., the stage DAG for fan-out of lazy datasets is currently supported by independent chains for each dataset sink, with the chain going back to the shared source dataset.

For example, continue the code snippet, we can have a fan-in: ds3 = ds1.union(ds2). Now the lineage of ds3 is a DAG. And user can continue such pattern to construct complex DAG.

Right now, lazy fan-in isn't supported: all fan-in operations trigger computation for lazy datasets. One thing that's currently untested is lazy fan-out after a union. Upon a quick eye-test, it looks like it might not work if one or more of the unioned datasets are created from non-lazy datasources, since recomputing from the base won't work on the dummy read tasks.

@jianoaix
Copy link
Contributor

With the lineage/execution plan being used for both eager and lazy dataset (e.g. the other PR claimed the eager dataset can also serialize the lineage), isn't this already a problem for both cases using linear data structure for DAG?

@clarkzinzow
Copy link
Contributor Author

Synced offline, opened this PR to make it explicit that lineage-based serialization of unions/zips are not supported.

python/ray/data/impl/plan.py Outdated Show resolved Hide resolved
python/ray/data/impl/pipeline_executor.py Outdated Show resolved Hide resolved
python/ray/data/impl/plan.py Show resolved Hide resolved
python/ray/data/impl/plan.py Show resolved Hide resolved
python/ray/data/tests/test_optimize.py Show resolved Hide resolved
python/ray/data/tests/test_optimize.py Show resolved Hide resolved
@clarkzinzow clarkzinzow force-pushed the datasets/feat/block-clearing branch 2 times, most recently from 37d9297 to 7473238 Compare April 29, 2022 21:13
meminfo = memory_summary(info["address"], stats_only=True)
# TODO(ekl) we can add this back once we clear inputs on eager exec as well.
# assert "Spilled" not in meminfo, meminfo
assert "Spilled" not in meminfo, meminfo
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test might prove to be flaky due to the memory reclaiming issues within Ray, so we may need to put this in a retry loop similar to test_memory_release_lazy_shuffle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Even 2s not enough to escape the reclaiming uncertainty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s was a bit flaky, didn't see 2s fail while testing locally, but who knows in CI. For context, we use an "OOM grace period" of 2 seconds in core Ray, where we let reference counting + garbage collection and object spilling free up space, so I'd expect this to be ~enough.

Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a few minor comments

"""Force full evaluation of the blocks of this dataset.

This can be used to read all blocks into memory. By default, Datasets
doesn't read blocks from the datasource until the first transform.

Args:
preserve_original: Whether the original unexecuted dataset should be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "original dataset" for a dataset? (I think users may have same question if they read this API)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought that when calling d2 = ds.fully_executed(preserve_original=False), original referring to ds would be obvious. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I was having a chain of operations in mind, in that case it's unclear which one of them should be thought as "original" (maybe the first one? maybe each intermediate ones? or others?).

python/ray/data/impl/plan.py Show resolved Hide resolved
python/ray/data/impl/plan.py Outdated Show resolved Hide resolved
import time

# TODO(Clark): Remove this sleep once we have fixed memory pressure handling.
time.sleep(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a bug to track this issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep can do!

meminfo = memory_summary(info["address"], stats_only=True)
# TODO(ekl) we can add this back once we clear inputs on eager exec as well.
# assert "Spilled" not in meminfo, meminfo
assert "Spilled" not in meminfo, meminfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Even 2s not enough to escape the reclaiming uncertainty?

@ericl ericl merged commit f725552 into ray-project:master Apr 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize memory usage of datasets
4 participants