Skip to content

Commit

Permalink
[RLlib] restart_failed_sub_environments now works for MA cases and …
Browse files Browse the repository at this point in the history
…crashes during `reset()`; +more tests and logging; add eval worker sub-env fault tolerance test. (ray-project#26276)

Signed-off-by: Xiaowei Jiang <[email protected]>
  • Loading branch information
sven1977 authored and xwjiang2010 committed Jul 19, 2022
1 parent 617fca7 commit 00e161c
Show file tree
Hide file tree
Showing 22 changed files with 402 additions and 116 deletions.
37 changes: 34 additions & 3 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,30 @@ py_test(
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/cartpole-crashing-with_remote-envs-pg.yaml"],
data = ["tuned_examples/pg/cartpole-crashing-with-remote-envs-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg"]
)

py_test(
name = "learning_tests_multi_agent_cartpole_crashing_pg",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/multi-agent-cartpole-crashing-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg"]
)

py_test(
name = "learning_tests_multi_agent_cartpole_crashing_with_remote_envs_pg",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/multi-agent-cartpole-crashing-with-remote-envs-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg", "--num-cpus=14"]
)

py_test(
name = "learning_tests_cartpole_pg_fake_gpus",
main = "tests/run_regression_tests.py",
Expand Down Expand Up @@ -1665,14 +1685,16 @@ py_test(
name = "test_json_reader",
tags = ["team:rllib", "offline"],
size = "small",
srcs = ["offline/tests/test_json_reader.py"]
srcs = ["offline/tests/test_json_reader.py"],
data = ["tests/data/pendulum/large.json"],
)

py_test(
name = "test_ope",
tags = ["team:rllib", "offline", "torch_only"],
size = "medium",
srcs = ["offline/estimators/tests/test_ope.py"]
srcs = ["offline/estimators/tests/test_ope.py"],
data = ["tests/data/cartpole/large.json"],
)


Expand Down Expand Up @@ -2485,6 +2507,15 @@ py_test(
args = ["--num-cpus=4", "--as-test", "--framework=torch"]
)

py_test(
name = "examples/custom_eval_parallel_to_training_torch",
main = "examples/custom_eval.py",
tags = ["team:rllib", "exclusive", "examples", "examples_C", "examples_C_UtoZ"],
size = "medium",
srcs = ["examples/custom_eval.py"],
args = ["--num-cpus=4", "--as-test", "--framework=torch", "--evaluation-parallel-to-training"]
)

py_test(
name = "examples/custom_experiment",
main = "examples/custom_experiment.py",
Expand Down
54 changes: 40 additions & 14 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import ray
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError
from ray.exceptions import GetTimeoutError, RayActorError, RayError
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.env.env_context import EnvContext
Expand Down Expand Up @@ -768,7 +768,8 @@ def duration_fn(num_units_done):
env_steps_this_iter += batch.env_steps()
metrics = collect_metrics(
self.workers.local_worker(),
keep_custom_metrics=self.config["keep_per_episode_custom_metrics"],
keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"],
timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"],
)

# Evaluation worker set only has local worker.
Expand All @@ -794,16 +795,35 @@ def duration_fn(num_units_done):
break

round_ += 1
batches = ray.get(
[
w.sample.remote()
for i, w in enumerate(
self.evaluation_workers.remote_workers()
try:
batches = ray.get(
[
w.sample.remote()
for i, w in enumerate(
self.evaluation_workers.remote_workers()
)
if i * (1 if unit == "episodes" else rollout * num_envs)
< units_left_to_do
],
timeout=self.config["evaluation_sample_timeout_s"],
)
except GetTimeoutError:
logger.warning(
"Calling `sample()` on your remote evaluation worker(s) "
"resulted in a timeout (after the configured "
f"{self.config['evaluation_sample_timeout_s']} seconds)! "
"Try to set `evaluation_sample_timeout_s` in your config"
" to a larger value."
+ (
" If your episodes don't terminate easily, you may "
"also want to set `evaluation_duration_unit` to "
"'timesteps' (instead of 'episodes')."
if unit == "episodes"
else ""
)
if i * (1 if unit == "episodes" else rollout * num_envs)
< units_left_to_do
]
)
)
break

_agent_steps = sum(b.agent_steps() for b in batches)
_env_steps = sum(b.env_steps() for b in batches)
# 1 episode per returned batch.
Expand Down Expand Up @@ -834,6 +854,7 @@ def duration_fn(num_units_done):
self.evaluation_workers.local_worker(),
self.evaluation_workers.remote_workers(),
keep_custom_metrics=self.config["keep_per_episode_custom_metrics"],
timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"],
)
metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter
metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter
Expand Down Expand Up @@ -2328,6 +2349,14 @@ def _run_one_evaluation(
"recreate_failed_workers"
),
)

# Add number of healthy evaluation workers after this iteration.
eval_results["evaluation"]["num_healthy_workers"] = (
len(self.evaluation_workers.remote_workers())
if self.evaluation_workers is not None
else 0
)

return eval_results

def _run_one_training_iteration_and_evaluation_in_parallel(
Expand Down Expand Up @@ -2387,9 +2416,6 @@ def _compile_iteration_results(
# Evaluation results.
if "evaluation" in iteration_results:
results["evaluation"] = iteration_results.pop("evaluation")
results["evaluation"]["num_healthy_workers"] = len(
self.evaluation_workers.remote_workers()
)

# Custom metrics and episode media.
results["custom_metrics"] = iteration_results.pop("custom_metrics", {})
Expand Down
15 changes: 12 additions & 3 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(self, algo_class=None):
self.evaluation_interval = None
self.evaluation_duration = 10
self.evaluation_duration_unit = "episodes"
self.evaluation_sample_timeout_s = 180.0
self.evaluation_parallel_to_training = False
self.evaluation_config = {}
self.off_policy_estimation_methods = {}
Expand All @@ -187,7 +188,7 @@ def __init__(self, algo_class=None):

# `self.reporting()`
self.keep_per_episode_custom_metrics = False
self.metrics_episode_collection_timeout_s = 180
self.metrics_episode_collection_timeout_s = 60.0
self.metrics_num_episodes_for_smoothing = 100
self.min_time_s_per_iteration = None
self.min_train_timesteps_per_iteration = 0
Expand Down Expand Up @@ -803,6 +804,7 @@ def evaluation(
evaluation_interval: Optional[int] = None,
evaluation_duration: Optional[int] = None,
evaluation_duration_unit: Optional[str] = None,
evaluation_sample_timeout_s: Optional[float] = None,
evaluation_parallel_to_training: Optional[bool] = None,
evaluation_config: Optional[
Union["AlgorithmConfig", PartialAlgorithmConfigDict]
Expand Down Expand Up @@ -832,6 +834,11 @@ def evaluation(
- For `evaluation_parallel_to_training=False`: Error.
evaluation_duration_unit: The unit, with which to count the evaluation
duration. Either "episodes" (default) or "timesteps".
evaluation_sample_timeout_s: The timeout (in seconds) for the ray.get call
to the remote evaluation worker(s) `sample()` method. After this time,
the user will receive a warning and instructions on how to fix the
issue. This could be either to make sure the episode ends, increasing
the timeout, or switching to `evaluation_duration_unit=timesteps`.
evaluation_parallel_to_training: Whether to run evaluation in parallel to
a Algorithm.train() call using threading. Default=False.
E.g. evaluation_interval=2 -> For every other training iteration,
Expand Down Expand Up @@ -880,6 +887,8 @@ def evaluation(
self.evaluation_duration = evaluation_duration
if evaluation_duration_unit is not None:
self.evaluation_duration_unit = evaluation_duration_unit
if evaluation_sample_timeout_s is not None:
self.evaluation_sample_timeout_s = evaluation_sample_timeout_s
if evaluation_parallel_to_training is not None:
self.evaluation_parallel_to_training = evaluation_parallel_to_training
if evaluation_config is not None:
Expand Down Expand Up @@ -909,7 +918,7 @@ def offline_data(
actions_in_input_normalized=None,
input_evaluation=None,
off_policy_estimation_methods=None,
postprocess_inputs=None,
postprocess_inputs=None, # `def postprocess_trajectory()`
shuffle_buffer_size=None,
output=None,
output_config=None,
Expand Down Expand Up @@ -1080,7 +1089,7 @@ def reporting(
self,
*,
keep_per_episode_custom_metrics: Optional[bool] = None,
metrics_episode_collection_timeout_s: Optional[int] = None,
metrics_episode_collection_timeout_s: Optional[float] = None,
metrics_num_episodes_for_smoothing: Optional[int] = None,
min_time_s_per_iteration: Optional[int] = None,
min_train_timesteps_per_iteration: Optional[int] = None,
Expand Down
13 changes: 7 additions & 6 deletions rllib/algorithms/tests/test_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy

import gym
import numpy as np
import os
Expand Down Expand Up @@ -230,24 +231,24 @@ def test_evaluation_wo_evaluation_worker_set(self):
for _ in framework_iterator(frameworks=("tf", "torch")):
# Setup algorithm w/o evaluation worker set and still call
# evaluate() -> Expect error.
algo_wo_env_on_driver = config.build()
algo_wo_env_on_local_worker = config.build()
self.assertRaisesRegex(
ValueError,
"Cannot evaluate w/o an evaluation worker set",
algo_wo_env_on_driver.evaluate,
algo_wo_env_on_local_worker.evaluate,
)
algo_wo_env_on_driver.stop()
algo_wo_env_on_local_worker.stop()

# Try again using `create_env_on_driver=True`.
# This force-adds the env on the local-worker, so this Algorithm
# can `evaluate` even though it doesn't have an evaluation-worker
# set.
config.create_env_on_local_worker = True
algo_w_env_on_driver = config.build()
results = algo_w_env_on_driver.evaluate()
algo_w_env_on_local_worker = config.build()
results = algo_w_env_on_local_worker.evaluate()
assert "evaluation" in results
assert "episode_reward_mean" in results["evaluation"]
algo_w_env_on_driver.stop()
algo_w_env_on_local_worker.stop()
config.create_env_on_local_worker = False

def test_space_inference_from_remote_workers(self):
Expand Down
26 changes: 26 additions & 0 deletions rllib/algorithms/tests/test_worker_failures.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import gym
import numpy as np
import unittest

import ray
import ray.rllib.algorithms.pg as pg
from ray.rllib.algorithms.registry import get_algorithm_class
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.utils.test_utils import framework_iterator
from ray.tune.registry import register_env

Expand Down Expand Up @@ -264,6 +267,29 @@ def test_eval_workers_failing_fatal(self):
eval_only=True,
)

def test_eval_workers_on_infinite_episodes(self):
"""Tests whether eval workers warn appropriately after some episode timeout."""
# Create infinitely running episodes, but with horizon setting (RLlib will
# auto-terminate the episode). However, in the eval workers, don't set a
# horizon -> Expect warning and no proper evaluation results.
config = (
pg.PGConfig()
.rollouts(num_rollout_workers=2, horizon=100)
.reporting(metrics_episode_collection_timeout_s=5.0)
.environment(env=RandomEnv, env_config={"p_done": 0.0})
.evaluation(
evaluation_num_workers=2,
evaluation_interval=1,
evaluation_sample_timeout_s=5.0,
evaluation_config={
"horizon": None,
},
)
)
algo = config.build()
results = algo.train()
self.assertTrue(np.isnan(results["evaluation"]["episode_reward_mean"]))


if __name__ == "__main__":
import sys
Expand Down
2 changes: 2 additions & 0 deletions rllib/env/base_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def to_base_env(
num_envs: int = 1,
remote_envs: bool = False,
remote_env_batch_wait_ms: int = 0,
restart_failed_sub_environments: bool = False,
) -> "BaseEnv":
"""Converts an RLlib-supported env into a BaseEnv object.
Expand Down Expand Up @@ -451,6 +452,7 @@ def convert_to_base_env(
num_envs=num_envs,
remote_envs=remote_envs,
remote_env_batch_wait_ms=remote_env_batch_wait_ms,
restart_failed_sub_environments=restart_failed_sub_environments,
)
# `env` is not a BaseEnv yet -> Need to convert/vectorize.
else:
Expand Down
1 change: 1 addition & 0 deletions rllib/env/external_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def to_base_env(
num_envs: int = 1,
remote_envs: bool = False,
remote_env_batch_wait_ms: int = 0,
restart_failed_sub_environments: bool = False,
) -> "BaseEnv":
"""Converts an RLlib MultiAgentEnv into a BaseEnv object.
Expand Down
Loading

0 comments on commit 00e161c

Please sign in to comment.