-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Async iter_batches
#33510
[Data] Async iter_batches
#33510
Conversation
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
…atches Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall structure makes a lot of sense...
Some meta-review comments:
- Should probably split this PR into a few pieces: common thread pool infra, pieces of the new batching code, and then the integrations
- Should add more types
|
||
logger = logging.getLogger(__name__) | ||
|
||
PREFETCHER_ACTOR_NAMESPACE = "ray.dataset" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should really try to get rid of the actor prefetcher in 2.5...
yield enter_result | ||
|
||
|
||
def iter_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function sounds very innocuous when it's doing a lot under the hood (creating threads, etc.).
How about making it a class, such as ThreadPoolBatcher
, to make it more clear when reading code that we're setting up this state and executing batching in parallel?
# Step 5: Make sure to preserve order from threadpool results. | ||
yield from _preserve_order(batch_iter) | ||
else: | ||
# If no batch prefetching is specified, then don't use a threadpool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be preferable to always use a threadpool, but use size 1 if there isn't any prefetching. This avoids two different code paths being in play.
also be None, meaning the entirety of the last block is included in this | ||
batch. If this value is None, this allows us to eagerly clear the last | ||
block in this batch after reading, since the last block is not included in | ||
any other batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, was wondering how the GC would work.
|
||
def _async_iter_batches(block_refs): | ||
# Step 1: Construct logical batches based on the metadata. | ||
batch_iter = _bundle_block_refs_to_logical_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to add type annotations for each of these iterators.
yield from async_batch_iter | ||
|
||
|
||
def legacy_iter_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should move legacy code to its own file.
|
||
def threadpool_computations(logical_batch_iter: Iterator[LogicalBatch]): | ||
# Step 4.1: Resolve the blocks. | ||
resolved_batch_iter = _resolve_blocks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you do block deletion here, but isn't there a risk of a race condition if a block gets deleted that another thread still needs? I would expect you would have to do deletion at the end in serial order / from a single thread.
fetch_queue.put(e, block=True) | ||
|
||
threads = [ | ||
threading.Thread(target=execute_computation, args=(i,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to mark these as daemon threads?
batch_iter, fn=threadpool_computations, num_workers=prefetch_batches | ||
) | ||
# Step 5: Make sure to preserve order from threadpool results. | ||
yield from _preserve_order(batch_iter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yield from _preserve_order(batch_iter) | |
yield from _restore_original_order(batch_iter) |
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
TODO:
Why are these changes needed?
Related issue number
Closes #33508
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.