-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Add num_rows_per_file
parameter to file-based writes
#42694
Conversation
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good on the high level. Please ping me for reviewing again when the PR is ready.
python/ray/data/dataset.py
Outdated
@@ -2771,6 +2772,8 @@ def write_parquet( | |||
instead of ``arrow_parquet_args`` if any of your write arguments | |||
can't pickled, or if you'd like to lazily resolve the write | |||
arguments for each dataset block. | |||
num_rows_per_file: The number of rows to write to each file. If ``None``, | |||
Ray Data writes a system-chosen number of rows to each file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of rows to write to each file.
Add a note that this number is not accurate? The actual number of rows will be slightly larger.
If
None
, Ray Data writes a system-chosen number of rows to each file.
We can say the size is based on the max target block size by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's typically less, right? Unless each block contains more rows than the number specified?
We can say the size is based on the max target block size by default.
Not strongly opposed, but this is kinda leaking our implementation? Not sure if we want to expose this as part of the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's typically less, right? Unless each block contains more rows than the number specified?
why is it less? IIRC, the bundler will accumulate input blocks until the total number of rows is more than the target.
Not strongly opposed, but this is kinda leaking our implementation? Not sure if we want to expose this as part of the interface.
agreed not to leak implementation. I felt "system-chosen" was too vague, and wanted to suggest something like "the file size will be based on the upstream outputs" in the beginning, which leaked more details. then I changed to "based on max target size". It's still leaking implementation to some extend though, but doesn't seem too bad because the block size is already a public api.
I don't have strong preference either. it's fine to keep it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it less? IIRC, the bundler will accumulate input blocks until the total number of rows is more than the target.
Not exactly. If an input block would cause the number of rows to exceed the target, it's excluded.
Here's a simple example: you add two two-row bundles to a bundler with a target of three rows. When you get next bundle, there are two rows instead of four.
import pyarrow as pa
import ray
from ray.data._internal.execution.interfaces import RefBundle
from ray.data._internal.execution.operators.map_operator import _BlockRefBundler
from ray.data.block import BlockAccessor
table = pa.Table.from_pylist([{"spam": 0}, {"spam": 1}])
metadata = BlockAccessor.for_block(table).get_metadata(None, None)
bundle = RefBundle(((ray.put(table), metadata),), owns_blocks=True)
bundler = _BlockRefBundler(3)
bundler.add_bundle(bundle)
bundler.add_bundle(bundle)
assert bundler.has_bundle()
print(bundler.get_next_bundle().num_rows()) # 2
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
…into write-batch-size Signed-off-by: Balaji Veeramani <[email protected]>
#42694 introduced a num_rows_per_file parameter to write APIs. With the introduction of this API, users don't need to call Dataset.repartition to control the number of output files. This PR updates the documentation accordingly. Signed-off-by: Balaji Veeramani <[email protected]>
Why are these changes needed?
To avoid creating too many files, you might want write more rows to each file. But, there's no way to do so without using hacks. This PR fixes the issue by adding a
num_rows_per_file
parameter.Related issue number
Fixes #41219
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.