Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ray DATA] preprocessor OneHotEncoder AttributeError: 'ClientObjectRef' object has no attribute 'exec_stats' #28262

Closed
Alxe1 opened this issue Sep 2, 2022 · 8 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@Alxe1
Copy link

Alxe1 commented Sep 2, 2022

What happened + What you expected to happen

I preprocess data using ray.data.preprocessors OneHotEncoder,I followed the normal progress:

one_hot_encoder = OneHotEncoder(columns=bucket_features)
one_hot_encoder.fit(ray_dataset)
transformed_dataset = one_hot_encoder.transform(ray_dataset)

When it transforms the ray dataset, it raised an error:

Traceback (most recent call last):
  File "/mnt1/mytest/test.py", line 542, in <module>
    train_test()
  File "/mnt1/mytest/test.py", line 506, in train_test
    config, dataset = preprocessing_data(ray_dataset)
  File "/mnt1/mytest/test.py", line 408, in preprocessing_data
    transformed_dataset = one_hot_encoder.transform(ray_dataset)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/preprocessor.py", line 145, in transform
    self._transform_stats = transformed_ds.stats()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/dataset.py", line 3374, in stats
    return self._plan.stats().summary_string()
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/_internal/stats.py", line 235, in summary_string
    parent_sum = p.summary_string(already_printed)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/_internal/stats.py", line 247, in summary_string
    out += self._summarize_blocks(metadata, is_substage=False)
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/_internal/stats.py", line 289, in _summarize_blocks
    exec_stats = [m.exec_stats for m in blocks if m.exec_stats is not None]
  File "/mnt/softwares/hvd_env/lib/python3.7/site-packages/ray/data/_internal/stats.py", line 289, in <listcomp>
    exec_stats = [m.exec_stats for m in blocks if m.exec_stats is not None]
AttributeError: 'ClientObjectRef' object has no attribute 'exec_stats'

The ray dataset is converted by pyspark dataframe(Because of using spark 2.4.6, I can not use raydp, so I convert pyspark dataframe myself):

import os
import time
from typing import Union, List

import pyspark.sql
import ray
from pyspark.serializers import ArrowStreamSerializer
from pyspark.traceback_utils import SCCallSiteSync

import pyarrow as pa

from ray.types import ObjectRef
from ray.data import Dataset
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.block_list import BlockList
from ray.data._internal.plan import ExecutionPlan
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockExecStats, BlockAccessor, BlockMetadata


def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMetadata:
   stats = BlockExecStats.builder()
   return BlockAccessor.for_block(table).get_metadata(input_files=None, exec_stats=stats.build())


class SparkDFToRayDataset(object):
   """Convert Pyspark DataFrame to Ray dataset[ArrowRow]"""
   def __init__(self, sdf: pyspark.sql.DataFrame):
       self._df = sdf
       # noinspection PyProtectedMember
       self._jdf = sdf._jdf
       # noinspection PyProtectedMember
       self._sc = sdf.sql_ctx._sc

   def _collect_arrow_table_refs(self, batch_size):

       with SCCallSiteSync(self._sc):
           # noinspection PyProtectedMember
           from pyspark.rdd import _load_from_socket
           port, auth_secret, jsocket_auth_server = self._jdf.collectAsArrowToPython()
           try:
               buffer_batches = []
               obj_refs = []
               for arrow_record_batch in _load_from_socket((port, auth_secret), ArrowStreamSerializer()):
                   buffer_batches.append(arrow_record_batch)
                   if len(buffer_batches) >= batch_size:
                       _pa_table = pa.Table.from_batches(buffer_batches)
                       _obj_ref = ray.put(_pa_table)
                       obj_refs.append(_obj_ref)
                       # reset
                       buffer_batches = []
               if len(buffer_batches) != 0:
                   _pa_table = pa.Table.from_batches(buffer_batches)
                   _obj_ref = ray.put(_pa_table)
                   obj_refs.append(_obj_ref)

               return obj_refs
           finally:
               # Join serving thread and raise any exceptions
               jsocket_auth_server.getResult()

   def from_arrow_refs(self,
                       tables: Union[ObjectRef[Union["pyarrow.Table", bytes]],
                                     List[ObjectRef[Union["pyarrow.Table", bytes]]]]) -> Dataset[ArrowRow]:
       if isinstance(tables, ray.ObjectRef):
           tables = [tables]

       get_metadata = cached_remote_fn(_get_metadata)
       metadata = [get_metadata.remote(t) for t in tables]
       execution_plan = ExecutionPlan(
           BlockList(tables, ray.get(metadata), owned_by_consumer=True),
           DatasetStats(stages={"from_arrow_refs": metadata}, parent=None),
           run_by_consumer=True)

       return Dataset(execution_plan, 0, False)

   def to_ray_dataset(self, batch_size):
       arrow_table_refs = self._collect_arrow_table_refs(batch_size=batch_size)
       ray_dataset = self.from_arrow_refs(arrow_table_refs)
       return ray_dataset

The code can print the ray dataset, but can not use preprocessor to transform it. How can I deal with it?

Versions / Dependencies

ray 2.0.0

Reproduction script

In What happened.

Issue Severity

No response

@Alxe1 Alxe1 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 2, 2022
@matthewdeng matthewdeng added P1 Issue that should be fixed within a few weeks data Ray Data-related issues and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Sep 2, 2022
@matthewdeng
Copy link
Contributor

Hey can you try this? Looks like the unresolved metadata is being passed into the DatasetStats constructor.

       metadata = [get_metadata.remote(t) for t in tables]
+     metadata = ray.get(metadata)
       execution_plan = ExecutionPlan(
-           BlockList(tables, ray.get(metadata), owned_by_consumer=True),
+           BlockList(tables, metadata, owned_by_consumer=True),
           DatasetStats(stages={"from_arrow_refs": metadata}, parent=None),
           run_by_consumer=True)

@matthewdeng matthewdeng self-assigned this Sep 2, 2022
@Alxe1
Copy link
Author

Alxe1 commented Sep 2, 2022

Hey can you try this? Looks like the unresolved metadata is being passed into the DatasetStats constructor.

       metadata = [get_metadata.remote(t) for t in tables]
+     metadata = ray.get(metadata)
       execution_plan = ExecutionPlan(
-           BlockList(tables, ray.get(metadata), owned_by_consumer=True),
+           BlockList(tables, metadata, owned_by_consumer=True),
           DatasetStats(stages={"from_arrow_refs": metadata}, parent=None),
           run_by_consumer=True)

Solved it. Thank you very much!

@richardliaw
Copy link
Contributor

Great to hear! @Alxe1 could you share more about what you're trying to do with Ray and Spark?

@Alxe1
Copy link
Author

Alxe1 commented Sep 3, 2022

Great to hear! @Alxe1 could you share more about what you're trying to do with Ray and Spark?

All my data in hive data warehouse, and I should use spark sql to read or join tables as output, and then use spark to preprocess data such as string indexing, bucketizing, one-hot encoding and so on. I find some preprocessors in Ray are more efficient than spark, but Ray only have a few preprocessors, so I should get use of them. And most importantly, I can use Ray to train and test deep learning models in distributed mode.
In one word, I use spark to read and preprocess data and use Ray to train model. And I hope the Ray can merge raydp project, and support spark2.x version, I think it still has many users to use spark2.x but not spark3.x.

@richardliaw
Copy link
Contributor

thanks that's great to hear!

which preprocessors would you like us to add in Ray?

@Alxe1
Copy link
Author

Alxe1 commented Sep 3, 2022

thanks that's great to hear!

which preprocessors would you like us to add in Ray?

One important preprocessor I think is continuous features K-bins discretization, and others such as TF-IDF, binarization transform also take a place. :)

@richardliaw
Copy link
Contributor

Captured those issues #28301 #28302!

@richardliaw
Copy link
Contributor

Closing this issue for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

No branches or pull requests

3 participants