Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to set the owner of objects when calling _save_spark_df_to_object_store() #375

Closed
max-509 opened this issue Aug 17, 2023 · 1 comment

Comments

@max-509
Copy link
Contributor

max-509 commented Aug 17, 2023

Hello! Thank you for awesome library that helps me use Spark and Ray advantages.

When I transform Spark Dataframe to ray Dataset, I have only 2 options for specifying the owner of serialized partitions:

  1. Each executor owns its own partitions (_use_owner=False)
  2. Ray DP Master is the owner of all serialized partitions (_use_owner=True).

I will give a usage scenario when none of the ownership options can be satisfactory.

I want to do some preprocessing in Spark, convert a preprocessed DataFrame into a ray Dataset, and stop Spark (call raydp.stop_spark()) to free up ray cluster resources. But after stopping Spark, I can't use the created ray Dataset because the owner of the serialized tables has died.
I suggest adding a function that can accept an actor who should become the owner of serialized partitions. For example:

@dataclass
class ObjectsOwner:
    # Actor owner name
    actor_name: str 
    # Function that set serialized parquet objects to actor owner state and return result of .remote() calling
    set_reference_as_state: Callable[[ray.actor.ActorHandle, List[ObjectRef]], ObjectRef]

def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
                                   objects_owner: Optional[ObjectsOwner] = None):
    # call java function from python
    jvm = df.sql_ctx.sparkSession.sparkContext._jvm
    jdf = df._jdf
    object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf)
    if objects_owner is None:
        records = object_store_writer.save(use_batch, "")
    else:
        records = object_store_writer.save(use_batch, objects_owner.actor_name) # !

    record_tuples = [(record.objectId(), record.ownerAddress(), record.numRecords())
                        for record in records]
    blocks, block_sizes = _register_objects(record_tuples)

    if actor_owner is not None:
        actor = ray.get_actor(objects_owner.actor_name)
        ray.get(objects_owner.set_reference_as_state(actor, blocks)) # !

    return blocks, block_sizes

I hope that my suggestion will be useful.

@kira-lin
Copy link
Collaborator

Hi @max-509 , thanks for using RayDP!
In this case, you can assign ownership to RayDPMaster, and use raydp.stop_spark(cleanup_data=False) to stop the session and free up the resources. By setting cleanup_data to False, RayDPMaster is actually not killed, so the data is still accessible.

But yes, your suggestion makes sense, ownership should be able to be assigned to a user specified actor. This should be very easy, are you willing to submit a PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants