diff --git a/python/ray/actor.py b/python/ray/actor.py index ea29780024f..180b903ec33 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -869,7 +869,8 @@ def _serialization_helper(self, ray_forking): _ray_actor_creation_dummy_object_id.id(), "actor_method_cpus": self._ray_actor_method_cpus, "actor_driver_id": self._ray_actor_driver_id.id(), - "previous_actor_handle_id": self._ray_actor_handle_id.id(), + "previous_actor_handle_id": self._ray_actor_handle_id.id() + if self._ray_actor_handle_id else None, "ray_forking": ray_forking } diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index e9697eee48d..58005f44300 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -7,10 +7,11 @@ flush_redis_unsafe, flush_task_and_object_metadata_unsafe, flush_finished_tasks_unsafe, flush_evicted_objects_unsafe, _flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard) +from .named_actors import get_actor, register_actor __all__ = [ "TensorFlowVariables", "flush_redis_unsafe", "flush_task_and_object_metadata_unsafe", "flush_finished_tasks_unsafe", "flush_evicted_objects_unsafe", "_flush_finished_tasks_unsafe_shard", - "_flush_evicted_objects_unsafe_shard" + "_flush_evicted_objects_unsafe_shard", "get_actor", "register_actor" ] diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py new file mode 100644 index 00000000000..9ae7972fc37 --- /dev/null +++ b/python/ray/experimental/named_actors.py @@ -0,0 +1,61 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +import ray.cloudpickle as pickle + + +def _calculate_key(name): + """Generate a Redis key with the given name. + + Args: + name: The name of the named actor. + + Returns: + The key to use for storing a named actor in Redis. + """ + return b"Actor:" + name.encode("ascii") + + +def get_actor(name): + """Get a named actor which was previously created. + + If the actor doesn't exist, an exception will be raised. + + Args: + name: The name of the named actor. + + Returns: + The ActorHandle object corresponding to the name. + """ + worker = ray.worker.get_global_worker() + actor_hash = _calculate_key(name) + pickled_state = worker.redis_client.hget(actor_hash, name) + if pickled_state is None: + raise ValueError("The actor with name={} doesn't exist".format(name)) + handle = pickle.loads(pickled_state) + return handle + + +def register_actor(name, actor_handle): + """Register a named actor under a string key. + + Args: + name: The name of the named actor. + actor_handle: The actor object to be associated with this name + """ + worker = ray.worker.get_global_worker() + if not isinstance(name, str): + raise TypeError("The name argument must be a string.") + if not isinstance(actor_handle, ray.actor.ActorHandle): + raise TypeError("The actor_handle argument must be an ActorHandle " + "object.") + actor_hash = _calculate_key(name) + pickled_state = pickle.dumps(actor_handle) + + # Add the actor to Redis if it does not already exist. + updated = worker.redis_client.hsetnx(actor_hash, name, pickled_state) + if updated == 0: + raise ValueError( + "Error: the actor with name={} already exists".format(name)) diff --git a/test/actor_test.py b/test/actor_test.py index 594a82f67b7..4509748cf87 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1906,6 +1906,44 @@ def method(self): # we should also test this from a different driver. ray.get(new_f.method.remote()) + def testRegisterAndGetNamedActors(self): + # TODO(heyucongtom): We should test this from another driver. + ray.worker.init(num_workers=1) + + @ray.remote + class Foo(object): + def __init__(self): + self.x = 0 + + def method(self): + self.x += 1 + return self.x + + f1 = Foo.remote() + # Test saving f. + ray.experimental.register_actor("f1", f1) + # Test getting f. + f2 = ray.experimental.get_actor("f1") + self.assertEqual(f1._actor_id, f2._actor_id) + + # Test same name register shall raise error. + with self.assertRaises(ValueError): + ray.experimental.register_actor("f1", f2) + + # Test register with wrong object type. + with self.assertRaises(TypeError): + ray.experimental.register_actor("f3", 1) + + # Test getting a nonexistent actor. + with self.assertRaises(ValueError): + ray.experimental.get_actor("nonexistent") + + # Test method + self.assertEqual(ray.get(f1.method.remote()), 1) + self.assertEqual(ray.get(f2.method.remote()), 2) + self.assertEqual(ray.get(f1.method.remote()), 3) + self.assertEqual(ray.get(f2.method.remote()), 4) + class ActorPlacementAndResources(unittest.TestCase): def tearDown(self):