Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add eval worker sub-env fault tolerance test. #26276

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,30 @@ py_test(
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/cartpole-crashing-with_remote-envs-pg.yaml"],
data = ["tuned_examples/pg/cartpole-crashing-with-remote-envs-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg"]
)

py_test(
name = "learning_tests_multi_agent_cartpole_crashing_pg",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/multi-agent-cartpole-crashing-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg"]
)

py_test(
name = "learning_tests_multi_agent_cartpole_crashing_with_remote_envs_pg",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/pg/multi-agent-cartpole-crashing-with-remote-envs-pg.yaml"],
args = ["--yaml-dir=tuned_examples/pg", "--num-cpus=14"]
)

py_test(
name = "learning_tests_cartpole_pg_fake_gpus",
main = "tests/run_regression_tests.py",
Expand Down Expand Up @@ -1665,14 +1685,16 @@ py_test(
name = "test_json_reader",
tags = ["team:rllib", "offline"],
size = "small",
srcs = ["offline/tests/test_json_reader.py"]
srcs = ["offline/tests/test_json_reader.py"],
data = ["tests/data/pendulum/large.json"],
)

py_test(
name = "test_ope",
tags = ["team:rllib", "offline", "torch_only"],
size = "medium",
srcs = ["offline/estimators/tests/test_ope.py"]
srcs = ["offline/estimators/tests/test_ope.py"],
data = ["tests/data/cartpole/large.json"],
)


Expand Down Expand Up @@ -2485,6 +2507,15 @@ py_test(
args = ["--num-cpus=4", "--as-test", "--framework=torch"]
)

py_test(
name = "examples/custom_eval_parallel_to_training_torch",
main = "examples/custom_eval.py",
tags = ["team:rllib", "exclusive", "examples", "examples_C", "examples_C_UtoZ"],
size = "medium",
srcs = ["examples/custom_eval.py"],
args = ["--num-cpus=4", "--as-test", "--framework=torch", "--evaluation-parallel-to-training"]
)

py_test(
name = "examples/custom_experiment",
main = "examples/custom_experiment.py",
Expand Down
54 changes: 40 additions & 14 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import ray
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError
from ray.exceptions import GetTimeoutError, RayActorError, RayError
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.env.env_context import EnvContext
Expand Down Expand Up @@ -768,7 +768,8 @@ def duration_fn(num_units_done):
env_steps_this_iter += batch.env_steps()
metrics = collect_metrics(
self.workers.local_worker(),
keep_custom_metrics=self.config["keep_per_episode_custom_metrics"],
keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"],
timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug: timeout setting was not passed to eval collect-metrics call.

)

# Evaluation worker set only has local worker.
Expand All @@ -794,16 +795,35 @@ def duration_fn(num_units_done):
break

round_ += 1
batches = ray.get(
[
w.sample.remote()
for i, w in enumerate(
self.evaluation_workers.remote_workers()
try:
batches = ray.get(
[
w.sample.remote()
for i, w in enumerate(
self.evaluation_workers.remote_workers()
)
if i * (1 if unit == "episodes" else rollout * num_envs)
< units_left_to_do
],
timeout=self.config["evaluation_sample_timeout_s"],
)
except GetTimeoutError:
logger.warning(
"Calling `sample()` on your remote evaluation worker(s) "
"resulted in a timeout (after the configured "
f"{self.config['evaluation_sample_timeout_s']} seconds)! "
"Try to set `evaluation_sample_timeout_s` in your config"
" to a larger value."
+ (
" If your episodes don't terminate easily, you may "
"also want to set `evaluation_duration_unit` to "
"'timesteps' (instead of 'episodes')."
if unit == "episodes"
else ""
)
if i * (1 if unit == "episodes" else rollout * num_envs)
< units_left_to_do
]
)
)
break

_agent_steps = sum(b.agent_steps() for b in batches)
_env_steps = sum(b.env_steps() for b in batches)
# 1 episode per returned batch.
Expand Down Expand Up @@ -834,6 +854,7 @@ def duration_fn(num_units_done):
self.evaluation_workers.local_worker(),
self.evaluation_workers.remote_workers(),
keep_custom_metrics=self.config["keep_per_episode_custom_metrics"],
timeout_seconds=eval_cfg["metrics_episode_collection_timeout_s"],
)
metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter
metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter
Expand Down Expand Up @@ -2328,6 +2349,14 @@ def _run_one_evaluation(
"recreate_failed_workers"
),
)

# Add number of healthy evaluation workers after this iteration.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric was missing.

eval_results["evaluation"]["num_healthy_workers"] = (
len(self.evaluation_workers.remote_workers())
if self.evaluation_workers is not None
else 0
)

return eval_results

def _run_one_training_iteration_and_evaluation_in_parallel(
Expand Down Expand Up @@ -2387,9 +2416,6 @@ def _compile_iteration_results(
# Evaluation results.
if "evaluation" in iteration_results:
results["evaluation"] = iteration_results.pop("evaluation")
results["evaluation"]["num_healthy_workers"] = len(
self.evaluation_workers.remote_workers()
)

# Custom metrics and episode media.
results["custom_metrics"] = iteration_results.pop("custom_metrics", {})
Expand Down
15 changes: 12 additions & 3 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(self, algo_class=None):
self.evaluation_interval = None
self.evaluation_duration = 10
self.evaluation_duration_unit = "episodes"
self.evaluation_sample_timeout_s = 180.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New config value. Triggers a meaningful warning if breached with various things to try to fix the timeouts.

self.evaluation_parallel_to_training = False
self.evaluation_config = {}
self.off_policy_estimation_methods = {}
Expand All @@ -187,7 +188,7 @@ def __init__(self, algo_class=None):

# `self.reporting()`
self.keep_per_episode_custom_metrics = False
self.metrics_episode_collection_timeout_s = 180
self.metrics_episode_collection_timeout_s = 60.0
self.metrics_num_episodes_for_smoothing = 100
self.min_time_s_per_iteration = None
self.min_train_timesteps_per_iteration = 0
Expand Down Expand Up @@ -803,6 +804,7 @@ def evaluation(
evaluation_interval: Optional[int] = None,
evaluation_duration: Optional[int] = None,
evaluation_duration_unit: Optional[str] = None,
evaluation_sample_timeout_s: Optional[float] = None,
evaluation_parallel_to_training: Optional[bool] = None,
evaluation_config: Optional[
Union["AlgorithmConfig", PartialAlgorithmConfigDict]
Expand Down Expand Up @@ -832,6 +834,11 @@ def evaluation(
- For `evaluation_parallel_to_training=False`: Error.
evaluation_duration_unit: The unit, with which to count the evaluation
duration. Either "episodes" (default) or "timesteps".
evaluation_sample_timeout_s: The timeout (in seconds) for the ray.get call
to the remote evaluation worker(s) `sample()` method. After this time,
the user will receive a warning and instructions on how to fix the
issue. This could be either to make sure the episode ends, increasing
the timeout, or switching to `evaluation_duration_unit=timesteps`.
evaluation_parallel_to_training: Whether to run evaluation in parallel to
a Algorithm.train() call using threading. Default=False.
E.g. evaluation_interval=2 -> For every other training iteration,
Expand Down Expand Up @@ -880,6 +887,8 @@ def evaluation(
self.evaluation_duration = evaluation_duration
if evaluation_duration_unit is not None:
self.evaluation_duration_unit = evaluation_duration_unit
if evaluation_sample_timeout_s is not None:
self.evaluation_sample_timeout_s = evaluation_sample_timeout_s
if evaluation_parallel_to_training is not None:
self.evaluation_parallel_to_training = evaluation_parallel_to_training
if evaluation_config is not None:
Expand Down Expand Up @@ -909,7 +918,7 @@ def offline_data(
actions_in_input_normalized=None,
input_evaluation=None,
off_policy_estimation_methods=None,
postprocess_inputs=None,
postprocess_inputs=None, # `def postprocess_trajectory()`
shuffle_buffer_size=None,
output=None,
output_config=None,
Expand Down Expand Up @@ -1080,7 +1089,7 @@ def reporting(
self,
*,
keep_per_episode_custom_metrics: Optional[bool] = None,
metrics_episode_collection_timeout_s: Optional[int] = None,
metrics_episode_collection_timeout_s: Optional[float] = None,
metrics_num_episodes_for_smoothing: Optional[int] = None,
min_time_s_per_iteration: Optional[int] = None,
min_train_timesteps_per_iteration: Optional[int] = None,
Expand Down
13 changes: 7 additions & 6 deletions rllib/algorithms/tests/test_algorithm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy

import gym
import numpy as np
import os
Expand Down Expand Up @@ -230,24 +231,24 @@ def test_evaluation_wo_evaluation_worker_set(self):
for _ in framework_iterator(frameworks=("tf", "torch")):
# Setup algorithm w/o evaluation worker set and still call
# evaluate() -> Expect error.
algo_wo_env_on_driver = config.build()
algo_wo_env_on_local_worker = config.build()
self.assertRaisesRegex(
ValueError,
"Cannot evaluate w/o an evaluation worker set",
algo_wo_env_on_driver.evaluate,
algo_wo_env_on_local_worker.evaluate,
)
algo_wo_env_on_driver.stop()
algo_wo_env_on_local_worker.stop()

# Try again using `create_env_on_driver=True`.
# This force-adds the env on the local-worker, so this Algorithm
# can `evaluate` even though it doesn't have an evaluation-worker
# set.
config.create_env_on_local_worker = True
algo_w_env_on_driver = config.build()
results = algo_w_env_on_driver.evaluate()
algo_w_env_on_local_worker = config.build()
results = algo_w_env_on_local_worker.evaluate()
assert "evaluation" in results
assert "episode_reward_mean" in results["evaluation"]
algo_w_env_on_driver.stop()
algo_w_env_on_local_worker.stop()
config.create_env_on_local_worker = False

def test_space_inference_from_remote_workers(self):
Expand Down
26 changes: 26 additions & 0 deletions rllib/algorithms/tests/test_worker_failures.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import gym
import numpy as np
import unittest

import ray
import ray.rllib.algorithms.pg as pg
from ray.rllib.algorithms.registry import get_algorithm_class
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.utils.test_utils import framework_iterator
from ray.tune.registry import register_env

Expand Down Expand Up @@ -264,6 +267,29 @@ def test_eval_workers_failing_fatal(self):
eval_only=True,
)

def test_eval_workers_on_infinite_episodes(self):
"""Tests whether eval workers warn appropriately after some episode timeout."""
# Create infinitely running episodes, but with horizon setting (RLlib will
# auto-terminate the episode). However, in the eval workers, don't set a
# horizon -> Expect warning and no proper evaluation results.
config = (
pg.PGConfig()
.rollouts(num_rollout_workers=2, horizon=100)
.reporting(metrics_episode_collection_timeout_s=5.0)
.environment(env=RandomEnv, env_config={"p_done": 0.0})
.evaluation(
evaluation_num_workers=2,
evaluation_interval=1,
evaluation_sample_timeout_s=5.0,
evaluation_config={
"horizon": None,
},
)
)
algo = config.build()
results = algo.train()
self.assertTrue(np.isnan(results["evaluation"]["episode_reward_mean"]))


if __name__ == "__main__":
import sys
Expand Down
2 changes: 2 additions & 0 deletions rllib/env/base_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def to_base_env(
num_envs: int = 1,
remote_envs: bool = False,
remote_env_batch_wait_ms: int = 0,
restart_failed_sub_environments: bool = False,
) -> "BaseEnv":
"""Converts an RLlib-supported env into a BaseEnv object.
Expand Down Expand Up @@ -451,6 +452,7 @@ def convert_to_base_env(
num_envs=num_envs,
remote_envs=remote_envs,
remote_env_batch_wait_ms=remote_env_batch_wait_ms,
restart_failed_sub_environments=restart_failed_sub_environments,
)
# `env` is not a BaseEnv yet -> Need to convert/vectorize.
else:
Expand Down
1 change: 1 addition & 0 deletions rllib/env/external_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def to_base_env(
num_envs: int = 1,
remote_envs: bool = False,
remote_env_batch_wait_ms: int = 0,
restart_failed_sub_environments: bool = False,
) -> "BaseEnv":
"""Converts an RLlib MultiAgentEnv into a BaseEnv object.
Expand Down
Loading