You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When I try to call Dataset.zip() between two datasets, where one has a nested dict in while the other doesn't, zip() fails with the error ValueError: Cannot zip <class 'ray.data._internal.pandas_block.PandasBlockAccessor'> with block of type <class 'pyarrow.lib.Table'>.
However, it seems like it works if for the same dataset a nested dict is added.
So, it seems like the decision to use PandasBlockAccessor and ArrowBlockAccessor is predicted on this distinction- whether there are nested dicts or not, and it seems like there is a hacky way out for me. I can convert it to PandasBlockAccessor by adding an extra, unnecessary column that has nested dicts, and try to zip them.
I am wondering if there is a way less hacky way to achieve the same. I am new to Ray, so the terminology I use here may not Ray specifc. Please help me understand the zip() function. I have included code to reproduce this issue. Also, please let me know if I am using map_batches() in an unintended way.
Versions / Dependencies
Ray - 2.63.0
Python 3.10.12
Pyarrow - 13.0.0
Reproduction script
pokedex = ray.data.read_json('https://raw.githubusercontent.com/Biuni/PokemonGO-Pokedex/master/pokedex.json')
def convert(batch):
batch = batch["pokemon"]
new_batch = {"name": [], "info": []}
for item in batch[0]:
new_batch["name"].append(item["name"])
new_batch["info"].append(item)
return new_batch
p1 = pokedex.map_batches(convert) # This Dataset has nested dicts in it
def create_id(batch):
l = []
for item in batch["info"]:
l.append(item["id"])
l = np.array(l)
return {"id": l}
p2 = p1.map_batches(create_id) # This Dataset does not have nested dicts.
Dataset.zip(p1, p2).show() ---> This throws the following error
2023-09-05 17:44:22,426 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON->SplitBlocks(20)] -> TaskPoolMapOperator[MapBatches(convert)], InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON->SplitBlocks(20)] -> TaskPoolMapOperator[MapBatches(convert)->MapBatches(create_id)] -> ZipOperator[Zip]
2023-09-05 17:44:22,426 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=True, actor_locality_enabled=True, verbose_progress=False)
2023-09-05 17:44:22,426 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Traceback (most recent call last):
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-90-0bfde85b5e5f>", line 1, in <module>
ray.data.Dataset.zip(p1, p1.map_batches(create_id)).show()
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/dataset.py", line 2227, in show
for row in self.take(limit):
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/dataset.py", line 2185, in take
for row in self.iter_rows():
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/iterator.py", line 237, in iter_rows
for batch in self.iter_batches(**iter_batch_args):
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/iterator.py", line 159, in iter_batches
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 32, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 538, in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 51, in execute_to_legacy_block_iterator
for bundle in bundle_iter:
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces.py", line 548, in __next__
return self.get_next()
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 129, in get_next
raise item
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 187, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 267, in _scheduling_loop_step
update_operator_states(topology)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 359, in update_operator_states
op.all_inputs_done()
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/operators/zip_operator.py", line 63, in all_inputs_done
self._output_buffer, self._stats = self._zip(
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/operators/zip_operator.py", line 179, in _zip
output_metadata = ray.get(output_metadata)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
return fn(*args, **kwargs)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::_zip_one_block() (pid=83227, ip=127.0.0.1)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/execution/operators/zip_operator.py", line 242, in _zip_one_block
result = BlockAccessor.for_block(block).zip(other_block)
File "/Users/kvasist/opt/miniconda3/envs/AWSMargaretContainers/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 209, in zip
raise ValueError(
ValueError: Cannot zip <class 'ray.data._internal.pandas_block.PandasBlockAccessor'> with block of type <class 'pyarrow.lib.Table'>
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered:
keerthanvasist
added
bug
Something that is supposed to be working; but isn't
triage
Needs triage (eg: priority, bug/not-bug, and owning component)
labels
Sep 6, 2023
scottjlee
added
P1
Issue that should be fixed within a few weeks
data
Ray Data-related issues
and removed
triage
Needs triage (eg: priority, bug/not-bug, and owning component)
labels
Sep 23, 2023
What happened + What you expected to happen
When I try to call Dataset.zip() between two datasets, where one has a nested dict in while the other doesn't, zip() fails with the error
ValueError: Cannot zip <class 'ray.data._internal.pandas_block.PandasBlockAccessor'> with block of type <class 'pyarrow.lib.Table'>
.However, it seems like it works if for the same dataset a nested dict is added.
So, it seems like the decision to use
PandasBlockAccessor
andArrowBlockAccessor
is predicted on this distinction- whether there are nested dicts or not, and it seems like there is a hacky way out for me. I can convert it toPandasBlockAccessor
by adding an extra, unnecessary column that has nested dicts, and try to zip them.I am wondering if there is a way less hacky way to achieve the same. I am new to Ray, so the terminology I use here may not Ray specifc. Please help me understand the
zip()
function. I have included code to reproduce this issue. Also, please let me know if I am usingmap_batches()
in an unintended way.Versions / Dependencies
Ray - 2.63.0
Python 3.10.12
Pyarrow - 13.0.0
Reproduction script
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered: