Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rllib] Rename PolicyEvaluator => RolloutWorker #4820

Merged
merged 26 commits into from
Jun 2, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/jenkins_tests/run_rllib_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_checkpoint_restore.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_policy_evaluator.py
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_rollout_worker.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/tests/test_nested_spaces.py
Expand Down Expand Up @@ -387,7 +387,7 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_loss.py --iters=2

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/policy_evaluator_custom_workflow.py
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/rollout_worker_custom_workflow.py

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/ci/suppress_output python /ray/python/ray/rllib/examples/custom_metrics_and_callbacks.py --num-iters=2
Expand Down
6 changes: 3 additions & 3 deletions doc/source/rllib-concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ Most interaction with deep learning frameworks is isolated to the `Policy interf
Policy Evaluation
-----------------

Given an environment and policy, policy evaluation produces `batches <https://github.com/ray-project/ray/blob/master/python/ray/rllib/policy/sample_batch.py>`__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `PolicyEvaluator <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/policy_evaluator.py>`__ class that manages all of this, and this class is used in most RLlib algorithms.
Given an environment and policy, policy evaluation produces `batches <https://github.com/ray-project/ray/blob/master/python/ray/rllib/policy/sample_batch.py>`__ of experiences. This is your classic "environment interaction loop". Efficient policy evaluation can be burdensome to get right, especially when leveraging vectorization, RNNs, or when operating in a multi-agent environment. RLlib provides a `RolloutWorker <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/rollout_worker.py>`__ class that manages all of this, and this class is used in most RLlib algorithms.

You can use policy evaluation standalone to produce batches of experiences. This can be done by calling ``ev.sample()`` on an evaluator instance, or ``ev.sample.remote()`` in parallel on evaluator instances created as Ray actors (see ``PolicyEvaluator.as_remote()``).
You can use rollout workers standalone to produce batches of experiences. This can be done by calling ``worker.sample()`` on an worker instance, or ``worker.sample.remote()`` in parallel on worker instances created as Ray actors (see `WorkerSet <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/worker_set.py>`__).

Here is an example of creating a set of policy evaluation actors and using the to gather experiences in parallel. The trajectories are concatenated, the policy learns on the trajectory batch, and then we broadcast the policy weights to the evaluators for the next round of rollouts:
ericl marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -108,7 +108,7 @@ This is how the example in the previous section looks when written using a polic
Trainers
--------

Trainers are the boilerplate classes that put the above components together, making algorithms accessible via Python API and the command line. They manage algorithm configuration, setup of the policy evaluators and optimizer, and collection of training metrics. Trainers also implement the `Trainable API <https://ray.readthedocs.io/en/latest/tune-usage.html#training-api>`__ for easy experiment management.
Trainers are the boilerplate classes that put the above components together, making algorithms accessible via Python API and the command line. They manage algorithm configuration, setup of the rollout workers and optimizer, and collection of training metrics. Trainers also implement the `Trainable API <https://ray.readthedocs.io/en/latest/tune-usage.html#training-api>`__ for easy experiment management.

Example of two equivalent ways of interacting with the PPO trainer:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/rllib-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Training Workflows
Example of how to adjust the configuration of an environment over time.
- `Custom metrics <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_metrics_and_callbacks.py>`__:
Example of how to output custom training metrics to TensorBoard.
- `Using policy evaluators directly for control over the whole training workflow <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/policy_evaluator_custom_workflow.py>`__:
- `Using rollout workers directly for control over the whole training workflow <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/rollout_worker_custom_workflow.py>`__:
Example of how to use RLlib's lower-level building blocks to implement a fully customized training workflow.

Custom Envs and Models
Expand Down
18 changes: 9 additions & 9 deletions doc/source/rllib-training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,27 @@ Custom Training Workflows

In the `basic training example <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_env.py>`__, Tune will call ``train()`` on your trainer once per iteration and report the new training results. Sometimes, it is desirable to have full control over training, but still run inside Tune. Tune supports `custom trainable functions <tune-usage.html#training-api>`__ that can be used to implement `custom training workflows (example) <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_train_fn.py>`__.

For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/policy_evaluator_custom_workflow.py>`__.
For even finer-grained control over training, you can use RLlib's lower-level `building blocks <rllib-concepts.html>`__ directly to implement `fully customized training workflows <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/rollout_worker_custom_workflow.py>`__.

Accessing Policy State
~~~~~~~~~~~~~~~~~~~~~~
It is common to need to access a trainer's internal state, e.g., to set or get internal weights. In RLlib trainer state is replicated across multiple *policy evaluators* (Ray actors) in the cluster. However, you can easily get and update this state between calls to ``train()`` via ``trainer.optimizer.foreach_evaluator()`` or ``trainer.optimizer.foreach_evaluator_with_index()``. These functions take a lambda function that is applied with the evaluator as an arg. You can also return values from these functions and those will be returned as a list.
It is common to need to access a trainer's internal state, e.g., to set or get internal weights. In RLlib trainer state is replicated across multiple *rollout workers* (Ray actors) in the cluster. However, you can easily get and update this state between calls to ``train()`` via ``trainer.workers.foreach_worker()`` or ``trainer.workers.foreach_worker_with_index()``. These functions take a lambda function that is applied with the worker as an arg. You can also return values from these functions and those will be returned as a list.

You can also access just the "master" copy of the trainer state through ``trainer.get_policy()`` or ``trainer.local_evaluator``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``trainer.get_policy().get_weights()``. This is also equivalent to ``trainer.local_evaluator.policy_map["default_policy"].get_weights()``:
You can also access just the "master" copy of the trainer state through ``trainer.get_policy()`` or ``trainer.workers.local_worker()``, but note that updates here may not be immediately reflected in remote replicas if you have configured ``num_workers > 0``. For example, to access the weights of a local TF policy, you can run ``trainer.get_policy().get_weights()``. This is also equivalent to ``trainer.workers.local_worker().policy_map["default_policy"].get_weights()``:

.. code-block:: python

# Get weights of the default local policy
trainer.get_policy().get_weights()

# Same as above
trainer.local_evaluator.policy_map["default_policy"].get_weights()
trainer.workers.local_worker().policy_map["default_policy"].get_weights()

# Get list of weights of each evaluator, including remote replicas
trainer.optimizer.foreach_evaluator(lambda ev: ev.get_policy().get_weights())
# Get list of weights of each worker, including remote replicas
trainer.workers.foreach_worker(lambda ev: ev.get_policy().get_weights())

# Same as above
trainer.optimizer.foreach_evaluator_with_index(lambda ev, i: ev.get_policy().get_weights())
trainer.workers.foreach_worker_with_index(lambda ev, i: ev.get_policy().get_weights())

Global Coordination
~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -299,7 +299,7 @@ Approach 1: Use the Trainer API and update the environment between calls to ``tr
phase = 1
else:
phase = 0
trainer.optimizer.foreach_evaluator(
trainer.workers.foreach_worker(
lambda ev: ev.foreach_env(
lambda env: env.set_phase(phase)))

Expand Down Expand Up @@ -333,7 +333,7 @@ Approach 2: Use the callbacks API to update the environment on new training resu
else:
phase = 0
trainer = info["trainer"]
trainer.optimizer.foreach_evaluator(
trainer.workers.foreach_worker(
lambda ev: ev.foreach_env(
lambda env: env.set_phase(phase)))

Expand Down
3 changes: 2 additions & 1 deletion python/ray/rllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.evaluation.policy_evaluator import PolicyEvaluator
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.env.base_env import BaseEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.env.vector_env import VectorEnv
Expand Down Expand Up @@ -49,6 +49,7 @@ def _register_all():
"PolicyGraph",
"TFPolicy",
"TFPolicyGraph",
"RolloutWorker",
"PolicyEvaluator",
"SampleBatch",
"BaseEnv",
Expand Down
4 changes: 1 addition & 3 deletions python/ray/rllib/agents/a3c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ class A2CTrainer(A3CTrainer):
@override(A3CTrainer)
def _make_optimizer(self):
return SyncSamplesOptimizer(
self.local_evaluator,
self.remote_evaluators,
train_batch_size=self.config["train_batch_size"])
self.workers, train_batch_size=self.config["train_batch_size"])
9 changes: 3 additions & 6 deletions python/ray/rllib/agents/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ def _init(self, config, env_creator):
if config["entropy_coeff"] < 0:
raise DeprecationWarning("entropy_coeff must be >= 0")

self.local_evaluator = self.make_local_evaluator(
env_creator, policy_cls)
self.remote_evaluators = self.make_remote_evaluators(
env_creator, policy_cls, config["num_workers"])
self.workers = self._make_workers(env_creator, policy_cls, config,
config["num_workers"])
self.optimizer = self._make_optimizer()

@override(Trainer)
Expand All @@ -75,6 +73,5 @@ def _train(self):
return result

def _make_optimizer(self):
return AsyncGradientsOptimizer(self.local_evaluator,
self.remote_evaluators,
return AsyncGradientsOptimizer(self.workers,
**self.config["optimizer"])
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/ddpg/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1
4 changes: 2 additions & 2 deletions python/ray/rllib/agents/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ def _train(self):
if pure_expl_steps:
# tell workers whether they should do pure exploration
only_explore = self.global_timestep < pure_expl_steps
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.set_pure_exploration_phase(only_explore))
for e in self.remote_evaluators:
for e in self.workers.remote_workers():
e.foreach_trainable_policy.remote(
lambda p, _: p.set_pure_exploration_phase(only_explore))
return super(DDPGTrainer, self)._train()
Expand Down
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/ddpg/ddpg_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def make_uniform_random_actions():

stochastic_actions = tf.cond(
# need to condition on noise_scale > 0 because zeroing
# noise_scale is how evaluator signals no noise should be used
# noise_scale is how a worker signals no noise should be used
# (this is ugly and should be fixed by adding an "eval_mode"
# config flag or something)
tf.logical_and(enable_pure_exploration, noise_scale > 0),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/rllib/agents/dqn/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1
42 changes: 21 additions & 21 deletions python/ray/rllib/agents/dqn/dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,26 @@ def on_episode_end(info):
config["callbacks"]["on_episode_end"] = tune.function(
on_episode_end)

self.local_evaluator = self.make_local_evaluator(
env_creator, self._policy)

def create_remote_evaluators():
return self.make_remote_evaluators(env_creator, self._policy,
config["num_workers"])

if config["optimizer_class"] != "AsyncReplayOptimizer":
self.remote_evaluators = create_remote_evaluators()
self.workers = self._make_workers(
env_creator,
self._policy,
config,
num_workers=self.config["num_workers"])
workers_needed = 0
else:
# Hack to workaround https://github.com/ray-project/ray/issues/2541
self.remote_evaluators = None
self.workers = self._make_workers(
env_creator, self._policy, config, num_workers=0)
workers_needed = self.config["num_workers"]

self.optimizer = getattr(optimizers, config["optimizer_class"])(
self.local_evaluator, self.remote_evaluators,
**config["optimizer"])
# Create the remote evaluators *after* the replay actors
if self.remote_evaluators is None:
self.remote_evaluators = create_remote_evaluators()
self.optimizer._set_evaluators(self.remote_evaluators)
self.workers, **config["optimizer"])

# Create the remote workers *after* the replay actors
if workers_needed > 0:
self.workers.add_workers(workers_needed)
self.optimizer._set_workers(self.workers.remote_workers())

self.last_target_update_ts = 0
self.num_target_updates = 0
Expand All @@ -226,9 +226,9 @@ def _train(self):

# Update worker explorations
exp_vals = [self.exploration0.value(self.global_timestep)]
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.set_epsilon(exp_vals[0]))
for i, e in enumerate(self.remote_evaluators):
for i, e in enumerate(self.workers.remote_workers()):
exp_val = self.explorations[i].value(self.global_timestep)
e.foreach_trainable_policy.remote(
lambda p, _: p.set_epsilon(exp_val))
Expand All @@ -245,8 +245,8 @@ def _train(self):
if self.config["per_worker_exploration"]:
# Only collect metrics from the third of workers with lowest eps
result = self.collect_metrics(
selected_evaluators=self.remote_evaluators[
-len(self.remote_evaluators) // 3:])
selected_workers=self.workers.remote_workers()[
-len(self.workers.remote_workers()) // 3:])
else:
result = self.collect_metrics()

Expand All @@ -263,7 +263,7 @@ def _train(self):
def update_target_if_needed(self):
if self.global_timestep - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.local_evaluator.foreach_trainable_policy(
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.global_timestep
self.num_target_updates += 1
Expand All @@ -275,7 +275,7 @@ def global_timestep(self):
def _evaluate(self):
logger.info("Evaluating current policy for {} episodes".format(
self.config["evaluation_num_episodes"]))
self.evaluation_ev.restore(self.local_evaluator.save())
self.evaluation_ev.restore(self.workers.local_worker().save())
self.evaluation_ev.foreach_policy(lambda p, _: p.set_epsilon(0))
for _ in range(self.config["evaluation_num_episodes"]):
self.evaluation_ev.sample()
Expand Down
10 changes: 5 additions & 5 deletions python/ray/rllib/agents/es/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _init(self, config, env_creator):

# Create the actors.
logger.info("Creating actors.")
self.workers = [
self._workers = [
Worker.remote(config, policy_params, env_creator, noise_id)
for _ in range(config["num_workers"])
]
Expand Down Expand Up @@ -270,7 +270,7 @@ def _train(self):
# Now sync the filters
FilterManager.synchronize({
DEFAULT_POLICY_ID: self.policy.get_filter()
}, self.workers)
}, self._workers)

info = {
"weights_norm": np.square(theta).sum(),
Expand All @@ -296,7 +296,7 @@ def compute_action(self, observation):
@override(Trainer)
def _stop(self):
# workaround for https://github.com/ray-project/ray/issues/1516
for w in self.workers:
for w in self._workers:
w.__ray_terminate__.remote()

def _collect_results(self, theta_id, min_episodes, min_timesteps):
Expand All @@ -307,7 +307,7 @@ def _collect_results(self, theta_id, min_episodes, min_timesteps):
"Collected {} episodes {} timesteps so far this iter".format(
num_episodes, num_timesteps))
rollout_ids = [
worker.do_rollouts.remote(theta_id) for worker in self.workers
worker.do_rollouts.remote(theta_id) for worker in self._workers
]
# Get the results of the rollouts.
for result in ray_get_and_free(rollout_ids):
Expand All @@ -334,4 +334,4 @@ def __setstate__(self, state):
self.policy.set_filter(state["filter"])
FilterManager.synchronize({
DEFAULT_POLICY_ID: self.policy.get_filter()
}, self.workers)
}, self._workers)
Loading