Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement named actors. #1424

Closed
robertnishihara opened this issue Jan 15, 2018 · 13 comments
Closed

Implement named actors. #1424

robertnishihara opened this issue Jan 15, 2018 · 13 comments

Comments

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jan 15, 2018

The goal of "named actors" is to make it possible for one job to get a handle to an actor that was created by a different job (both jobs would have to be part of the same Ray cluster).

We have to decide on the semantics as well as the API. @pcmoritz proposed some possible APIs.

Assuming we have created an actor with

a = ActorClass.remote()

Possible API 1

# Create named actor.
ray.actors["actor1"] = a

# Get named actor handle (done by a different driver).
a = ray.actors["actor1"]

Possible API 2

# Create named actor.
ray.register_actor("actor1", a)

# Get named actor handle (done by a different driver).
a = ray.get_actor("actor1")

Possible API 3

Another approach, suggested by @stephanie-wang is to just create a single global actor (global to the cluster), which each driver has a handle to, and which the user can use to implement named actors if they want to.

Possible API 4

@ray.remote(name="actor1")
class ActorClass(object):
    pass

# Create named actor.
a = ActorClass.remote()

# The second instantiation would raise an Exception because
# only one actor can be created with a given name.
a_new = ActorClass.remote()  # ERROR!

# Get named actor handle (done by a different driver).
a = ray.get_actor("actor1")
@yaroslavvb
Copy link
Contributor

One use-case that would be useful to support is when you have separate actors for different workers in parameter server architecture. IE, in the diagram below, 3 actors have different recovery procedures. More specifically, actor1 may be restoring it's dataset queue checkpoint from /efs/checkpoints/actor1.

actors 2x

So when, say machine of worker1 machine fails and actor needs to be recreated on a new machine, it would be the "same" actor in a sense, ie, it would load dataset checkpoint from /efs/checkpoints/actor1 and continue the work from that location

@robertnishihara
Copy link
Collaborator Author

For completeness, a few other API suggestions that were made.

Some of these could be more general and include non-actor objects.

Possible API 5 (very similar to API 2)

# Publishing (one of the following).
ray.set_name(a, "actor1")  # Could work for object IDs also.

# Retrieving (one of the following).
a = ray.get_by_name("actor1")

Possible API 6

The main difference here is that it anticipates potentially adding other properties other than "names" (do we even want that?).

Also, properties imply that an actor could have multiple properties and that multiple actors could have the same property.

# Publishing (one of the following).
ray.set_property(a, name="actor1")  # Could work for object IDs also.

# Retrieving (one of the following).
list_of_handles = ray.get_by_property(name="actor1")

Possible API 7

# Publishing.
ray.put(a, name="actor1")  # Could work for object IDs also.

# Retrieving
a = ray.get(name="actor1")

@ericl
Copy link
Contributor

ericl commented May 5, 2018

Another (most minimal) approach is to allow actor handles to be serializable. The user could then save the handle ID and communicate it out of band to other jobs.

This would have the similar advantages and pitfalls of allowing object IDs to be serializable.

The main downside of this I can see is that you still can't allocate actors with a specific id, just like you can't put objects to a specific id. This restriction could be relaxed however.

Edit: this is similar to Option 7 but not necessarily requiring ray.put

@pcmoritz
Copy link
Contributor

pcmoritz commented May 5, 2018

I like @ericl 's approach if it can be implemented without too many downsides (i.e. if creating an actor from the serialized handle is cheap and it doesn't introduce overhead in the common case of unnamed actors; furthermore if we don't need a list of "exported actors" for garbage collection reasons). In that case the user can just have their own way of managing the actors and we don't impose an API on them.

@robertnishihara
Copy link
Collaborator Author

I think that should work, the main thing we need to make sure is that when we deserialize an actor handle, we need to get a new actor_handle_id. cc @stephanie-wang

@robertnishihara
Copy link
Collaborator Author

One thought. Instead of creating an actor first and then turning it into a "named actor", it probably makes more sense to "create it as a named actor", e.g.,

a = ActorClass.create_named_actor(args=[], kwargs={}, name='actor1')

@dementrock
Copy link

Looking forward to this feature!

@robertnishihara
Copy link
Collaborator Author

robertnishihara commented May 8, 2018

Once #2007 is merged (probably later tonight), the following will work (not the "cleanest" API, but it contains the bulk of the functionality).

In interpreter 1:

import cloudpickle
import numpy as np
import ray

ray.init()

@ray.remote
class ParameterServer:
    def __init__(self):
        self.params = np.zeros(10)
    def get(self):
        return self.params
    def update(self, update):
        self.params += update

ps = ParameterServer.remote()

@ray.remote
def f(ps):
    ps.update.remote(np.ones(10))

ray.get(ps.get.remote())  # array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

f.remote(ps)

ray.get(ps.get.remote())  # array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

cloudpickle.dumps(ps)  # b'\x80\x04\x95>\x03\x00\x00\x00\x00\x00\x00\x8c\tray.actor\x94\x8c\x0bActorHandle\x94\x93\x94)\x81\x94}\x94(\x8c\x08actor_id\x94C\x14\xed\x83\xdb\xf6\xb6\x8d\xca\xcc\xf1W1k\x86\xd72\xfe\x19\x10\x12S\x94\x8c\nclass_name\x94C\x0fParameterServer\x94\x8c\x0bactor_forks\x94K\x01\x8c\x0cactor_cursor\x94C\x14ba_\x8c\x8d\xbd\x90\x14b\x810\x80\xb9\xb2\x82\x1c\xe0o\x9c\xaa\x94\x8c\ractor_counter\x94K\x00\x8c\x12actor_method_names\x94]\x94(\x8c\x08__init__\x94\x8c\x12__ray_checkpoint__\x94\x8c\x1a__ray_checkpoint_restore__\x94\x8c\x1f__ray_restore_from_checkpoint__\x94\x8c\x17__ray_save_checkpoint__\x94\x8c\x11__ray_terminate__\x94\x8c\x03get\x94\x8c\x06update\x94e\x8c\x1cactor_method_num_return_vals\x94]\x94(K\x01K\x01K\x01K\x01K\x01K\x01K\x01K\x01e\x8c\x11method_signatures\x94}\x94(h\x0f\x8c\rray.signature\x94\x8c\x11FunctionSignature\x94\x93\x94(]\x94]\x94]\x94\x8f\x94h\x0ft\x94\x81\x94h\x10h\x1d(]\x94]\x94]\x94\x8f\x94h\x10t\x94\x81\x94h\x11h\x1d(]\x94]\x94]\x94\x8f\x94h\x11t\x94\x81\x94h\x12h\x1d(]\x94]\x94]\x94\x8f\x94h\x12t\x94\x81\x94h\x13h\x1d(]\x94]\x94]\x94\x8f\x94h\x13t\x94\x81\x94h\x14h\x1d(]\x94]\x94]\x94\x8f\x94h\x14t\x94\x81\x94h\x15h\x1d(]\x94]\x94]\x94\x8f\x94h\x15t\x94\x81\x94h\x16h\x1d(]\x94h\x16a]\x94\x8c\x08funcsigs\x94\x8c\x06_empty\x94\x93\x94a]\x94\x89a\x8f\x94h\x16t\x94\x81\x94u\x8c\x1eactor_creation_dummy_object_id\x94C\x14hc\x8e\xadpM\xe8\xdc\x1f7Miy\xbf\xaf\xeb\xc2\xc0Y\xa3\x94\x8c\x11actor_method_cpus\x94K\x01\x8c\x0factor_driver_id\x94C\x14DY\xc3\x1e\nm\xcfJ\xc7:\xf0m\xc0\x9d_\x8b\x90\xe1\xca~\x94\x8c\x18previous_actor_handle_id\x94C\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x94\x8c\x0bray_forking\x94\x89ub.'

In interpreter 2:

import cloudpickle
import ray

ray.init(redis_address=...)  # Fill this in with the correct address.

ps = cloudpickle.loads(...)  # Fill in the output of cloudpickle.dumps from above

ray.get(ps.get.remote())  # array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])

@dementrock
Copy link

It seems that the above usage works when two interpreters are on the same machine, but doesn't work when they sit on different machines within the same network (the second interpreter simply hangs after calling ray.get). I already launched ray on the second machine via ray start --redis-address=.... Any idea why?

@robertnishihara
Copy link
Collaborator Author

robertnishihara commented May 17, 2018

It seems to work for me when the two interpreters are on different machines.

Can you double check the following:

  • Are enough ports open between the two machines? The relevant ports are
    • one for the primary redis shard (can be set with --redis-port on the head machine)
    • one for each other redis shard (can be set with --redis-shard-ports on the head machine)
    • one for the object manager (can be set with --object-manager-port on each machine)
  • If you do ray.global_state.client_table(), is it aware of both machines.
  • Can you verify that tasks can get scheduled remotely and then you can retrieve the return values? E.g., start the remote machine with --num-gpus=1 and then verify that you can create a GPU task and get the result?

EDIT:

Also, can you check that

  • The same version of Ray is installed on both machines.
  • You're using a sufficiently recent version of Ray? E.g.,
    pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.4.0-cp36-cp36m-manylinux1_x86_64.whl
    

@robertnishihara
Copy link
Collaborator Author

@dementrock any updates about this?

This was referenced May 22, 2018
@ericl ericl added the api label Jun 14, 2018
@raulchen
Copy link
Contributor

raulchen commented Aug 3, 2018

One thought. We may also want to unregister an actor (for example, 1) when a driver exits, unregister all named actor it registered; 2) switch a named actor to another instance).
Considering this point, I think API 2 looks most natural.

ray.register("actor1", actor) # or maybe name it 'register_actor' to be more specific
actor = ray.get_actor("actor1")
ray.unregister("actor1")

@robertnishihara
Copy link
Collaborator Author

Implemented in an experimental form.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants