-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Improve stability of Parquet metadata prefetch task #42044
Conversation
Signed-off-by: Cheng Su <[email protected]>
It's hard to understand a unit test given it depends on failure from prefetching metadata. I can run the Parquet metadata prefetching release test, to make sure it still works - https://github.com/ray-project/ray/blob/master/release/release_tests.yaml#L5485 . |
Signed-off-by: Cheng Su <[email protected]>
# The application-level exceptions to retry for metadata prefetching task. | ||
# Default to retry on `OSError` because AWS S3 would throw this transient | ||
# error when load is too high. | ||
RETRY_EXCEPTIONS_FOR_META_FETCH_TASK = [OSError] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add this in general for all FileBasedDatasource
reads?
OPEN_FILE_RETRY_ON_ERRORS = ["AWS Error SLOW_DOWN"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only Parquet has this metadata prefetching. So we can add it first here. For all file-based datasources, looks like the expoential backoff retry is working.
@@ -47,6 +47,15 @@ | |||
FRAGMENTS_PER_META_FETCH = 6 | |||
PARALLELIZE_META_FETCH_THRESHOLD = 24 | |||
|
|||
# The `num_cpus` for each metadata prefetching task. | |||
# Default to 0.5 instead of 1 because it is cheaper than normal read task. | |||
NUM_CPUS_FOR_META_FETCH_TASK = 0.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so currently, there is no configurable way (e.g. through DataContext
) to modify this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I am hesitating to add to DataContext
(which exposed to users). If we see this is a value needed by users, we can expose it later. For now, let's start w/ a varilable, so at least we (internal developer) can change it.
@@ -206,7 +206,10 @@ def test_parquet_read_meta_provider(ray_start_regular_shared, fs, data_path): | |||
pq.write_table(table, path2, filesystem=fs) | |||
|
|||
class TestMetadataProvider(DefaultParquetMetadataProvider): | |||
def prefetch_file_metadata(self, fragments): | |||
def prefetch_file_metadata(self, fragments, **ray_remote_args): | |||
assert ray_remote_args["num_cpus"] == 0.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert ray_remote_args["num_cpus"] == 0.5 | |
assert ray_remote_args["num_cpus"] == NUM_CPUS_FOR_META_FETCH_TASK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
Signed-off-by: Cheng Su <[email protected]>
Reran the Parquet metadata release test, no regression - https://buildkite.com/ray-project/release/builds/4714#018c89f0-5bde-4011-835d-90d244d2c3f0 . |
…ct#42044) This PR is a fix for Parquet metadata prefetching task, when reading a large amount of Parquet files on S3 (>50k). Before this PR, the Parquet prefetch metadata task is running on head node (w/ `DEFAULT` scheduling strategy), and not retry on S3 transient exception. So it can fail very quickly because it launches too many request from same node, and throttled by S3. This PR does 3 things: * Fix scheduling strategy to use `SPREAD` same as read task, to spread out metadata prefetch task across cluster. This avoids hit S3 w/ too many requests from same node. * Auto-retry on `OSError`, where S3 throws transient error such as `Access Denied`, `Read Timeout`. * Extract `num_cpus` default value out as a variable. So we can tune the value to control the concurrency of prefetch metadata task for particular workload. Sometime `num_cpus=0.5` does not work well. Signed-off-by: Cheng Su <[email protected]>
Why are these changes needed?
This PR is a fix for Parquet metadata prefetching task, when reading a large amount of Parquet files on S3 (>50k). Before this PR, the Parquet prefetch metadata task is running on head node (w/
DEFAULT
scheduling strategy), and not retry on S3 transient exception. So it can fail very quickly because it launches too many request from same node, and throttled by S3.This PR does 3 things:
SPREAD
same as read task, to spread out metadata prefetch task across cluster. This avoids hit S3 w/ too many requests from same node.OSError
, where S3 throws transient error such asAccess Denied
,Read Timeout
.num_cpus
default value out as a variable. So we can tune the value to control the concurrency of prefetch metadata task for particular workload. Sometimenum_cpus=0.5
does not work well.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.