Skip to content

Commit

Permalink
Added possibility to set custom actor owner when convert spark datafr…
Browse files Browse the repository at this point in the history
…ame to ray dataset (#376)

* Added possibility to set custom actor owner when convert spark dataframe to ray dataset

* Fixed owner errors in estimators

* Fixed pylint errors

* Fixed test_data_ownership_transfer() test

* Fixed test_data_ownership_transfer() test

---------

Co-authored-by: Vershinin Maxim WX1123714 <[email protected]>
  • Loading branch information
max-509 and Vershinin Maxim WX1123714 authored Oct 19, 2023
1 parent a93059b commit 55248ab
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 35 deletions.
6 changes: 5 additions & 1 deletion python/raydp/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# limitations under the License.
#

from .dataset import spark_dataframe_to_ray_dataset, \
from .dataset import PartitionObjectsOwner, \
get_raydp_master_owner, \
spark_dataframe_to_ray_dataset, \
ray_dataset_to_spark_dataframe, \
from_spark_recoverable
from .interfaces import SparkEstimatorInterface
Expand All @@ -24,6 +26,8 @@
__all__ = [
"SparkCluster",
"SparkEstimatorInterface",
"PartitionObjectsOwner",
"get_raydp_master_owner",
"spark_dataframe_to_ray_dataset",
"ray_dataset_to_spark_dataframe",
"from_spark_recoverable"
Expand Down
51 changes: 37 additions & 14 deletions python/raydp/spark/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import uuid
from typing import Callable, Dict, List, NoReturn, Optional, Iterable, Union
from dataclasses import dataclass

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
from pyspark.sql.pandas.types import from_arrow_type
Expand Down Expand Up @@ -132,6 +133,30 @@ def with_row_ids(self, new_row_ids) -> "ParquetPiece":
return ParquetPiece(self.piece, self.columns, self.partitions, new_row_ids, num_rows)


@dataclass
class PartitionObjectsOwner:
# Actor owner name
actor_name: str
# Function that set serialized parquet objects to actor owner state
# and return result of .remote() calling
set_reference_as_state: Callable[[ray.actor.ActorHandle, List[ObjectRef]], ObjectRef]


def get_raydp_master_owner(spark: Optional[SparkSession] = None) -> PartitionObjectsOwner:
if spark is None:
spark = SparkSession.getActiveSession()
obj_holder_name = spark.sparkContext.appName + RAYDP_SPARK_MASTER_SUFFIX

def raydp_master_set_reference_as_state(
raydp_master_actor: ray.actor.ActorHandle,
objects: List[ObjectRef]) -> ObjectRef:
return raydp_master_actor.add_objects.remote(uuid.uuid4(), objects)

return PartitionObjectsOwner(
obj_holder_name,
raydp_master_set_reference_as_state)


@client_mode_wrap
def _register_objects(records):
worker = ray.worker.global_worker
Expand All @@ -147,36 +172,34 @@ def _register_objects(records):
return blocks, block_sizes

def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
_use_owner: bool = False):
owner: Union[PartitionObjectsOwner, None] = None):
# call java function from python
jvm = df.sql_ctx.sparkSession.sparkContext._jvm
jdf = df._jdf
object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf)
obj_holder_name = df.sql_ctx.sparkSession.sparkContext.appName + RAYDP_SPARK_MASTER_SUFFIX
if _use_owner is True:
records = object_store_writer.save(use_batch, obj_holder_name)
else:
records = object_store_writer.save(use_batch, "")
actor_owner_name = ""
if owner is not None:
actor_owner_name = owner.actor_name
records = object_store_writer.save(use_batch, actor_owner_name)

record_tuples = [(record.objectId(), record.ownerAddress(), record.numRecords())
for record in records]
for record in records]
blocks, block_sizes = _register_objects(record_tuples)

if _use_owner is True:
holder = ray.get_actor(obj_holder_name)
df_id = uuid.uuid4()
ray.get(holder.add_objects.remote(df_id, blocks))
if owner is not None:
actor_owner = ray.get_actor(actor_owner_name)
ray.get(owner.set_reference_as_state(actor_owner, blocks))

return blocks, block_sizes

def spark_dataframe_to_ray_dataset(df: sql.DataFrame,
parallelism: Optional[int] = None,
_use_owner: bool = False):
owner: Union[PartitionObjectsOwner, None] = None):
num_part = df.rdd.getNumPartitions()
if parallelism is not None:
if parallelism != num_part:
df = df.repartition(parallelism)
blocks, _ = _save_spark_df_to_object_store(df, False, _use_owner)
blocks, _ = _save_spark_df_to_object_store(df, False, owner)
return from_arrow_refs(blocks)

# This is an experimental API for now.
Expand Down
89 changes: 80 additions & 9 deletions python/raydp/tests/test_data_owner_transfer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@

import sys
import time
from typing import Any

import pytest
import ray
from ray._private.client_mode_hook import client_mode_wrap
from ray.exceptions import RayTaskError, OwnerDiedError
import raydp
from raydp.spark import PartitionObjectsOwner

from raydp.spark import get_raydp_master_owner


def gen_test_data():
from pyspark.sql.session import SparkSession
s = SparkSession.getActiveSession()

data = []
tmp = [("ming", 20, 15552211521),
("hong", 19, 13287994007),
Expand All @@ -30,10 +34,10 @@ def gen_test_data():
@client_mode_wrap
def ray_gc():
ray._private.internal_api.global_gc()

def test_fail_without_data_ownership_transfer(ray_cluster):
"""
Test shutting down Spark worker after data been put
Test shutting down Spark worker after data been put
into Ray object store without data ownership transfer.
This test should be throw error of data inaccessible after
its owner (e.g. Spark JVM process) has terminated, which is expected.
Expand Down Expand Up @@ -83,7 +87,7 @@ def test_fail_without_data_ownership_transfer(ray_cluster):

def test_data_ownership_transfer(ray_cluster):
"""
Test shutting down Spark worker after data been put
Test shutting down Spark worker after data been put
into Ray object store with data ownership transfer.
This test should be able to execute till the end without crash as expected.
"""
Expand All @@ -93,7 +97,7 @@ def test_data_ownership_transfer(ray_cluster):

from raydp.spark.dataset import spark_dataframe_to_ray_dataset
import numpy as np

num_executor = 1

spark = raydp.init_spark(
Expand All @@ -110,7 +114,8 @@ def test_data_ownership_transfer(ray_cluster):

# convert data from spark dataframe to ray dataset,
# and transfer data ownership to dedicated Object Holder (Singleton)
ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, _use_owner=True)
ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4,
owner=get_raydp_master_owner(df_train.sql_ctx.sparkSession))

# display data
ds.show(5)
Expand All @@ -127,11 +132,77 @@ def test_data_ownership_transfer(ray_cluster):
# confirm that data is still available from object store!
# sanity check the dataset is as functional as normal
assert np.isnan(ds.mean('Age')) is not True

# final clean up
raydp.stop_spark()


def test_custom_ownership_transfer_custom_actor(ray_cluster):
"""
Test shutting down Spark worker after data been put
into Ray object store with data ownership transfer to custom user actor.
This test should be able to execute till the end without crash as expected.
"""

@ray.remote
class CustomActor:
objects: Any

def wake(self):
pass

def set_objects(self, objects):
self.objects = objects

if not ray.worker.global_worker.connected:
pytest.skip("Skip this test if using ray client")

from raydp.spark.dataset import spark_dataframe_to_ray_dataset
import numpy as np

num_executor = 1

spark = raydp.init_spark(
app_name="example",
num_executors=num_executor,
executor_cores=1,
executor_memory="500M"
)

df_train = gen_test_data()

resource_stats = ray.available_resources()
cpu_cnt = resource_stats['CPU']

# create owner
owner_actor_name = 'owner_actor_name'
actor = CustomActor.options(name=owner_actor_name).remote()
# waiting for the actor to be created
ray.get(actor.wake.remote())

# convert data from spark dataframe to ray dataset,
# and transfer data ownership to dedicated Object Holder (Singleton)
ds = spark_dataframe_to_ray_dataset(df_train, parallelism=4, owner=PartitionObjectsOwner(
owner_actor_name,
lambda actor, objects: actor.set_objects.remote(objects)))

# display data
ds.show(5)

# release resource by shutting down spark Java process
raydp.stop_spark()
ray_gc() # ensure GC kicked in
time.sleep(3)

# confirm that resources has been recycled
resource_stats = ray.available_resources()
assert resource_stats['CPU'] == cpu_cnt + num_executor

# confirm that data is still available from object store!
# sanity check the dataset is as functional as normal
assert np.isnan(ds.mean('Age')) is not True


def test_api_compatibility(ray_cluster):
"""
Test the changes been made are not to break public APIs.
Expand Down Expand Up @@ -159,13 +230,13 @@ def test_api_compatibility(ray_cluster):
# confirm that resources is still being occupied
resource_stats = ray.available_resources()
assert resource_stats['CPU'] == cpu_cnt

# final clean up
raydp.stop_spark()

if __name__ == '__main__':
sys.exit(pytest.main(["-v", __file__]))

# test_api_compatibility()
# test_data_ownership_transfer()
# test_fail_without_data_ownership_transfer()
Expand Down
9 changes: 6 additions & 3 deletions python/raydp/tf/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from raydp.estimator import EstimatorInterface
from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF
from raydp import stop_spark
from raydp.spark import spark_dataframe_to_ray_dataset
from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner

class TFEstimator(EstimatorInterface, SparkEstimatorInterface):
def __init__(self,
Expand Down Expand Up @@ -255,12 +255,15 @@ def fit_on_spark(self,
evaluate_df.write.parquet(path+"/test", compression=compression)
evaluate_ds = read_parquet(path+"/test")
else:
owner = None
if stop_spark_after_conversion:
owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession)
train_ds = spark_dataframe_to_ray_dataset(train_df,
_use_owner=stop_spark_after_conversion)
owner=owner)
if evaluate_df is not None:
evaluate_df = self._check_and_convert(evaluate_df)
evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df,
_use_owner=stop_spark_after_conversion)
owner=owner)
if stop_spark_after_conversion:
stop_spark(cleanup_data=False)
return self.fit(
Expand Down
9 changes: 6 additions & 3 deletions python/raydp/torch/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF
from raydp.torch.torch_metrics import TorchMetric
from raydp import stop_spark
from raydp.spark import spark_dataframe_to_ray_dataset
from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner
from raydp.torch.config import TorchConfig

import ray
Expand Down Expand Up @@ -365,12 +365,15 @@ def fit_on_spark(self,
evaluate_df.write.parquet(path+"/test", compression=compression)
evaluate_ds = ray.data.read_parquet(path+"/test")
else:
owner = None
if stop_spark_after_conversion:
owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession)
train_ds = spark_dataframe_to_ray_dataset(train_df,
_use_owner=stop_spark_after_conversion)
owner=owner)
if evaluate_df is not None:
evaluate_df = self._check_and_convert(evaluate_df)
evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df,
_use_owner=stop_spark_after_conversion)
owner=owner)
if stop_spark_after_conversion:
stop_spark(cleanup_data=False)
return self.fit(
Expand Down
13 changes: 8 additions & 5 deletions python/raydp/xgboost/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from raydp.estimator import EstimatorInterface
from raydp.spark.interfaces import SparkEstimatorInterface, DF, OPTIONAL_DF
from raydp import stop_spark
from raydp.spark import spark_dataframe_to_ray_dataset
from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner

import ray
from ray.air.config import ScalingConfig, RunConfig, FailureConfig
Expand Down Expand Up @@ -99,14 +99,17 @@ def fit_on_spark(self,
evaluate_df.write.parquet(path+"/test", compression=compression)
evaluate_ds = ray.data.read_parquet(path+"/test")
else:
owner = None
if stop_spark_after_conversion:
owner = get_raydp_master_owner(train_df.sql_ctx.sparkSession)
train_ds = spark_dataframe_to_ray_dataset(train_df,
parallelism=self._num_workers,
_use_owner=stop_spark_after_conversion)
parallelism=self._num_workers,
owner=owner)
if evaluate_df is not None:
evaluate_df = self._check_and_convert(evaluate_df)
evaluate_ds = spark_dataframe_to_ray_dataset(evaluate_df,
parallelism=self._num_workers,
_use_owner=stop_spark_after_conversion)
parallelism=self._num_workers,
owner=owner)
if stop_spark_after_conversion:
stop_spark(cleanup_data=False)
return self.fit(
Expand Down

0 comments on commit 55248ab

Please sign in to comment.