Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[<Ray component: Ray>] map_group doen't support actor #41406

Closed
Bye-legumes opened this issue Nov 27, 2023 · 10 comments · Fixed by #45310
Closed

[<Ray component: Ray>] map_group doen't support actor #41406

Bye-legumes opened this issue Nov 27, 2023 · 10 comments · Fixed by #45310
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. P0 Issues that should be fixed in short order ray 2.10 size:small

Comments

@Bye-legumes
Copy link
Contributor

Bye-legumes commented Nov 27, 2023

What happened + What you expected to happen

What happened

What happened was that when I used the group_by operation with the batch_size parameter, I encountered an error with “multiple values”. However, when I tried to use the group_by operation without the batch_size parameter, I encountered an exception that required the batch_size parameter.”

sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
        batch_size = 1000,
        batch_format  = 'pandas',
        compute=ActorPoolStrategy(min_size=10, max_size=200))

for the code above, there is an error

  File "gpb.py", line 41, in <module>
    sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
  File "/home/zhilong/miniconda3/lib/python3.8/site-packages/ray/data/grouped_data.py", line 336, in map_groups
    return sorted_ds.map_batches(
TypeError: map_batches() got multiple values for keyword argument 'batch_size'

If I delete the key_word batch_size, the error is

ValueError: `batch_size` must be provided to `map_batches` when requesting GPUs. The optimal batch size depends on the model, data, and GPU used. It is recommended to use the largest batch size that doesn't result in your GPU device running out of memory. You can view the GPU memory usage via the Ray dashboard.

What I expected

Ray data can perform the groupby+map_groups for actors as mentioned in the doc. https://docs.ray.io/en/latest/data/api/doc/ray.data.grouped_data.GroupedData.map_groups.html

Versions / Dependencies

ray 2.8.0

Reproduction script

ray.init(address = "123.123.123.123:6274")
from ray.data import ActorPoolStrategy

ctx = ray.data.context.DatasetContext.get_current()
use_push_based_shuffle = True
num_items = 100001
parallelism = 200
import pandas as pd
import numpy as np
import time


t1 = time.time()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = use_push_based_shuffle

a = list(reversed(range(num_items)))

shard = int(np.ceil(num_items / parallelism))
b = [1]*1
offset = 0
dfs = []
while offset < num_items:
    dfs.append(
        pd.DataFrame(
            {"a": a[offset : offset + shard], "b": [b]*len(a[offset : offset + shard])}
        )
    )
    offset += shard
if offset < num_items:
    dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.data.from_pandas(dfs)

class DEMO:
    def __init__(self):
        pass
    
    def __call__(a):
        return a
sorted_ds = ds.groupby(key="a").map_groups(DEMO,num_gpus=0.1,
        batch_size = 1000,
        batch_format  = 'pandas',
        compute=ActorPoolStrategy(min_size=10, max_size=200))
sorted_ds.show(5)
print(f"time used : \n{time.time()-t1}")

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@Bye-legumes Bye-legumes added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 27, 2023
@Bye-legumes Bye-legumes changed the title [<Ray component: Ray>] map_group doen't support acotr [<Ray component: Ray>] map_group doen't support actor Nov 27, 2023
@anyscalesam anyscalesam added the data Ray Data-related issues label Nov 28, 2023
@Bye-legumes
Copy link
Contributor Author

Bye-legumes commented Nov 28, 2023

in ray 2.4.0, the error for the map_group+ actor is same as this one
#26244
tmp

@anyscalesam anyscalesam added P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 28, 2023
@anyscalesam
Copy link
Contributor

after discussion we should change map_groups() to allow (and require) batch_size iff gpu num is set. @scottjlee to follow up on this change,

@Bye-legumes
Copy link
Contributor Author

I think it's not only a problem of the gpu + batch_size
Currently, I cannot run the map_groups + actor.

sorted_ds = ds.groupby(key="a").map_groups(DEMO,
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')

it will failed with error.

2023-11-29 11:03:55,624  WARNING actor_pool_map_operator.py:271 -- To ensure full parallelization across an actor pool of size 10, the Dataset should consist of at least 10 distinct blocks. Consider increasing the parallelism when creating the Dataset.
Traceback (most recent call last):
  File "python/ray/_raylet.pyx", line 347, in ray._raylet.StreamingObjectRefGenerator._next_sync
  File "python/ray/_raylet.pyx", line 4643, in ray._raylet.CoreWorker.try_read_next_object_ref_stream
  File "python/ray/_raylet.pyx", line 447, in ray._raylet.check_status
ray.exceptions.ObjectRefStreamEndOfStreamError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 80, in on_data_ready
    meta = ray.get(next(self._streaming_gen))
  File "python/ray/_raylet.pyx", line 302, in ray._raylet.StreamingObjectRefGenerator.__next__
  File "python/ray/_raylet.pyx", line 365, in ray._raylet.StreamingObjectRefGenerator._next_sync
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/zhilong/ray_28_bug/gpb.py", line 48, in <module>
    sorted_ds.show(5)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/dataset.py", line 2466, in show
    for row in self.take(limit):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/dataset.py", line 2390, in take
    for row in limited_ds.iter_rows():
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/iterator.py", line 219, in _wrapped_iterator
    for batch in batch_iterable:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/iterator.py", line 164, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 32, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 548, in execute_to_iterator
    block_iter = itertools.chain([next(gen)], gen)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 54, in execute_to_legacy_block_iterator
    for bundle in bundle_iter:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 141, in get_next
    raise item
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 201, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 252, in _scheduling_loop_step
    process_completed_tasks(topology, self._backpressure_policies)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 365, in process_completed_tasks
    num_blocks_read = task.on_data_ready(
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/physical_operator.py", line 88, in on_data_ready
    ex = ray.get(block_ref)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/_private/worker.py", line 2563, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::MapBatches(group_fn)() (pid=3943892, ip=10.218.163.85, actor_id=501d51070249f810d044d51801000000, repr=MapWorker(MapBatches(group_fn)))
    yield from _map_task(
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 416, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 215, in __call__
    yield from self._batch_fn(input, ctx)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 190, in transform_fn
    res = fn(batch)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 120, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/grouped_data.py", line 362, in group_fn
    builder.add_batch(applied)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 39, in add_batch
    return self.add_block(block)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 42, in add_block
    accessor = BlockAccessor.for_block(block)
  File "/home/zhilong/miniconda3/envs/ray28/lib/python3.10/site-packages/ray/data/block.py", line 402, in for_block
    raise TypeError("Not a block type: {} ({})".format(block, type(block)))
TypeError: Not a block type: <__main__.DEMO object at 0x7f112590b490> (<class '__main__.DEMO'>)

it works for the function as the input

sorted_ds = ds.groupby(key="a").map_groups(test,
        batch_format  = 'pandas')

here are the whole codes

import ray
ray.init(address = "10.218.163.85:6274")
from ray.data import ActorPoolStrategy

ctx = ray.data.context.DatasetContext.get_current()
use_push_based_shuffle = True
num_items = 100001
parallelism = 200
import pandas as pd
import numpy as np
import time


t1 = time.time()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = use_push_based_shuffle

a = list(reversed(range(num_items)))
a = [x%10 for x in a]
shard = int(np.ceil(num_items / parallelism))
b = [1]*1
offset = 0
dfs = []
while offset < num_items:
    dfs.append(
        pd.DataFrame(
            {"a": a[offset : offset + shard], "b": [b]*len(a[offset : offset + shard])}
        )
    )
    offset += shard
if offset < num_items:
    dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.data.from_pandas(dfs)

class DEMO:
    def __init__(self,*args, **kwargs):
        print(args)
        #print(kwargs)
        pass
    
    def __call__(self, a):
        return a
def test(a):
    return a
sorted_ds = ds.groupby(key="a").map_groups(DEMO,
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')
sorted_ds.show(5)
print(f"time used : \n{time.time()-t1}")


@wingkitlee0
Copy link
Contributor

Ya, I think map_groups does not support "callable class". It currently only supports running the mapping function as actors (e.g., compute=ray.data.ActorPoolStrategy().
In your example, using DEMO() would work if you are okay with creating the instance first..

sorted_ds = ds.groupby(key="a").map_groups(DEMO(),
        compute=ray.data.ActorPoolStrategy(size=10),
        batch_format  = 'pandas')

@Bye-legumes
Copy link
Contributor Author

Bye-legumes commented Dec 1, 2023

@wingkitlee0 I see! Thanks. Based on my search results, the map_groups function is indeed based on the map_batch function, but the batch_size parameter is set to None.
This means that if the original data is stored across different nodes in a distributed way, all the data will be gathered to one single node, as batch_size is None . Therefore, it is possible that the memory requirements could be very large, depending on the size of the data being processed. Also the max number of parallelism is also depended on single node? Or it's possible one batch of data can be across different nodes?(but batch_to_block is used in side the group_fn)?
Thanks!

 # Note we set batch_size=None here, so it will use the entire block as a batch,
    # which ensures that each group will be contained within a batch in entirety.
    return sorted_ds.map_batches(
        group_fn,
        batch_size=None,
        compute=compute,
        batch_format=batch_format,
        fn_args=fn_args,
        fn_kwargs=fn_kwargs,
        **ray_remote_args,
    )

@Bye-legumes
Copy link
Contributor Author

@wingkitlee0 Also what is the motivation that we use
map_batchs(DEMO) and map_groups(DEMO()), can we unified them? I think map_groups and map_batchs should have similiar parameter and the difference is map_groups is performced on grouped data. Something I want to understand is the difference that we use one map_batch + group_fn in map_group and we use [map_batch(group) for group in grouped data].

@wingkitlee0
Copy link
Contributor

[Not Ray team, but I used map_groups quite a lot lately] You may find previous discussions about map_groups useful (search on github).

all the data will be gathered to one single node, as batch_size is None

According to Ray team, the data are materialized, but not necessarily gathered into one node. (After all, sorting is distributed)

what is the motivation that we use map_batchs(DEMO) and map_groups(DEMO()),

I believe this is simply because they haven't implemented for callable class yet. This should be a straight-forward PR unless there are some limitations (in that case Ray team knows better)

the difference that we use one map_batch + group_fn in map_group and we use [map_batch(group) for group in grouped data].

It's about the block boundary (in Ray) vs group boundary (in data). We want the data that goes into the mapping function contains the whole group.

@Bye-legumes
Copy link
Contributor Author

@wingkitlee0 Thanks a lot for you classification!

@wayi1
Copy link

wayi1 commented Feb 26, 2024

I encountered this issue as well. I have some relatively expensive state to initialize in an actor. However, unlike the operators map_batches, map, or flat_map, I realize that map_groups op does not support a fn_constructor_kwargs arg.

The workaround w/o using an actor means that I have to initialize such as state per call. Really look forward to ray 2.10 for this feature.

@anyscalesam
Copy link
Contributor

@Bye-legumes - per our meeting last week; do you think you'd have the bandwidth in May to pick this up? We can help shepherd/review to get this merged but it'll be faster to resolve/mitigate this issue especially if it's impacting your scenario/use-case.

@anyscalesam anyscalesam added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Apr 26, 2024
@bveeramani bveeramani self-assigned this May 13, 2024
bveeramani added a commit that referenced this issue May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. P0 Issues that should be fixed in short order ray 2.10 size:small
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants