Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Cleanup examples folder 04: Curriculum and checkpoint-by-custom-criteria examples moved to new API stack. #44706

Merged
merged 8 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2114,14 +2114,13 @@ py_test(
srcs = ["examples/checkpoints/cartpole_dqn_export.py"],
)

#@OldAPIStack
py_test(
name = "examples/checkpoints/checkpoint_by_custom_criteria",
main = "examples/checkpoints/checkpoint_by_custom_criteria.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "medium",
srcs = ["examples/checkpoints/checkpoint_by_custom_criteria.py"],
args = ["--stop-iters=3 --num-cpus=3"]
args = ["--enable-new-api-stack", "--stop-reward=150.0", "--num-cpus=9"]
)

#@OldAPIStack
Expand Down Expand Up @@ -2252,14 +2251,13 @@ py_test(
# subdirectory: curriculum/
# ....................................

#@OldAPIStack
py_test(
name = "examples/curriculum/curriculum_learning",
main = "examples/curriculum/curriculum_learning.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "medium",
srcs = ["examples/curriculum/curriculum_learning.py"],
args = ["--as-test", "--stop-reward=800.0"]
args = ["--enable-new-api-stack", "--as-test"]
)

# subdirectory: debugging/
Expand Down
6 changes: 1 addition & 5 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1468,11 +1468,7 @@ def environment(
if env is not NotProvided:
self.env = env
if env_config is not NotProvided:
deep_update(
self.env_config,
env_config,
True,
)
deep_update(self.env_config, env_config, True)
if observation_space is not NotProvided:
self.observation_space = observation_space
if action_space is not NotProvided:
Expand Down
36 changes: 29 additions & 7 deletions rllib/env/multi_agent_env_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gymnasium as gym
import logging

from collections import defaultdict
from functools import partial
Expand All @@ -19,6 +20,8 @@
from ray.util.annotations import PublicAPI
from ray.tune.registry import ENV_CREATOR, _global_registry

logger = logging.getLogger("ray.rllib")


@PublicAPI(stability="alpha")
class MultiAgentEnvRunner(EnvRunner):
Expand Down Expand Up @@ -52,7 +55,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs):
# Create the vectorized gymnasium env.
self.env: Optional[gym.Wrapper] = None
self.num_envs: int = 0
self._make_env()
self.make_env()

# Global counter for environment steps from all workers. This is
# needed for schedulers used by `RLModule`s.
Expand Down Expand Up @@ -666,13 +669,24 @@ def assert_healthy(self):
# Make sure, we have built our gym.vector.Env and RLModule properly.
assert self.env and self.module

@override(EnvRunner)
def stop(self):
# Note, `MultiAgentEnv` inherits `close()`-method from `gym.Env`.
self.env.close()
def make_env(self):
"""Creates a MultiAgentEnv (is-a gymnasium env).

Note that users can change the EnvRunner's config (e.g. change
`self.config.env_config`) and then call this method to create new environments
with the updated configuration.
"""
# If an env already exists, try closing it first (to allow it to properly
# cleanup).
if self.env is not None:
try:
self.env.close()
except Exception as e:
logger.warning(
"Tried closing the existing env (multi-agent), but failed with "
f"error: {e.args[0]}"
)

def _make_env(self):
"""Creates a MultiAgentEnv (is-a gymnasium env)."""
env_ctx = self.config.env_config
if not isinstance(env_ctx, EnvContext):
env_ctx = EnvContext(
Expand Down Expand Up @@ -714,13 +728,21 @@ def _make_env(self):
"to inherit from `ray.rllib.env.multi_agent_env.MultiAgentEnv`."
)

# Set the flag to reset all envs upon the next `sample()` call.
self._needs_initial_reset = True

# Call the `on_environment_created` callback.
self._callbacks.on_environment_created(
env_runner=self,
env=self.env,
env_context=env_ctx,
)

@override(EnvRunner)
def stop(self):
# Note, `MultiAgentEnv` inherits `close()`-method from `gym.Env`.
self.env.close()

def _make_module(self):
# Create our own instance of the (single-agent) `RLModule` (which
# the needs to be weight-synched) each iteration.
Expand Down
35 changes: 28 additions & 7 deletions rllib/env/single_agent_env_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gymnasium as gym
import logging
import tree

from collections import defaultdict
Expand All @@ -24,6 +25,7 @@
from ray.util.annotations import PublicAPI

_, tf, _ = try_import_tf()
logger = logging.getLogger("ray.rllib")


@PublicAPI(stability="alpha")
Expand All @@ -48,7 +50,7 @@ def __init__(self, config: AlgorithmConfig, **kwargs):
# Create the vectorized gymnasium env.
self.env: Optional[gym.Wrapper] = None
self.num_envs: int = 0
self._make_env()
self.make_env()

# Global counter for environment steps from all workers. This is
# needed for schedulers used by `RLModule`s.
Expand Down Expand Up @@ -627,13 +629,24 @@ def assert_healthy(self):
# Make sure, we have built our gym.vector.Env and RLModule properly.
assert self.env and self.module

@override(EnvRunner)
def stop(self):
# Close our env object via gymnasium's API.
self.env.close()
def make_env(self) -> None:
"""Creates a vectorized gymnasium env and stores it in `self.env`.

Note that users can change the EnvRunner's config (e.g. change
`self.config.env_config`) and then call this method to create new environments
with the updated configuration.
"""
# If an env already exists, try closing it first (to allow it to properly
# cleanup).
if self.env is not None:
try:
self.env.close()
except Exception as e:
logger.warning(
"Tried closing the existing env, but failed with error: "
f"{e.args[0]}"
)

def _make_env(self):
"""Creates a vectorized gymnasium env."""
env_ctx = self.config.env_config
if not isinstance(env_ctx, EnvContext):
env_ctx = EnvContext(
Expand Down Expand Up @@ -672,13 +685,21 @@ def _make_env(self):
self.num_envs: int = self.env.num_envs
assert self.num_envs == self.config.num_envs_per_worker

# Set the flag to reset all envs upon the next `sample()` call.
self._needs_initial_reset = True

# Call the `on_environment_created` callback.
self._callbacks.on_environment_created(
env_runner=self,
env=self.env,
env_context=env_ctx,
)

@override(EnvRunner)
def stop(self):
# Close our env object via gymnasium's API.
self.env.close()

def _new_episode(self):
return SingleAgentEpisode(
observation_space=self.env.single_observation_space,
Expand Down
143 changes: 85 additions & 58 deletions rllib/examples/checkpoints/checkpoint_by_custom_criteria.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,90 @@
# TODO (sven): Move this example script into the new API stack.
"""Example extracting a checkpoint from n trials using one or more custom criteria.

import argparse
import os
This example:
- runs a simple CartPole experiment with three different learning rates (three tune
"trials"). During the experiment, for each trial, we create a checkpoint at each
iteration.
- at the end of the experiment, we compare the trials and pick the one that performed
best, based on the criterion: Lowest episode count per single iteration (for CartPole,
a low episode count means the episodes are very long and thus the reward is also very
high).
- from that best trial (with the lowest episode count), we then pick those checkpoints
that a) have the lowest policy loss (good) and b) have the highest value function loss
(bad).

import ray
from ray import air, tune
from ray.tune.registry import get_trainable_cls

parser = argparse.ArgumentParser()
parser.add_argument(
"--run", type=str, default="PPO", help="The RLlib-registered algorithm to use."
)
parser.add_argument("--num-cpus", type=int, default=0)
parser.add_argument(
"--framework",
choices=["tf", "tf2", "torch"],
default="torch",
help="The DL framework specifier.",
How to run this script
----------------------
`python [script file name].py --enable-new-api-stack`

For debugging, use the following additional command line options
`--no-tune --num-env-runners=0`
which should allow you to set breakpoints anywhere in the RLlib code and
have the execution stop there for inspection and debugging.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works also with tune, but --local-mode :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely! I'm always afraid, we are going to get rid of Ray local-mode at some point. Also, for any number of Learner workers > 0, local mode doesn't work (not sure why, actually).


For logging to your WandB account, use:
`--wandb-key=[your WandB API key] --wandb-project=[some project name]
--wandb-run-name=[optional: WandB run name (within the defined project)]`


Results to expect
-----------------
In the console output, you can see the performance of the three different learning
rates used here:

+-----------------------------+------------+-----------------+--------+--------+
| Trial name | status | loc | lr | iter |
|-----------------------------+------------+-----------------+--------+--------+
| PPO_CartPole-v1_d7dbe_00000 | TERMINATED | 127.0.0.1:98487 | 0.01 | 17 |
| PPO_CartPole-v1_d7dbe_00001 | TERMINATED | 127.0.0.1:98488 | 0.001 | 8 |
| PPO_CartPole-v1_d7dbe_00002 | TERMINATED | 127.0.0.1:98489 | 0.0001 | 9 |
+-----------------------------+------------+-----------------+--------+--------+

+------------------+-------+----------+----------------------+----------------------+
| total time (s) | ts | reward | episode_reward_max | episode_reward_min |
|------------------+-------+----------+----------------------+----------------------+
| 28.1068 | 39797 | 151.11 | 500 | 12 |
| 13.304 | 18728 | 158.91 | 500 | 15 |
| 14.8848 | 21069 | 167.36 | 500 | 13 |
+------------------+-------+----------+----------------------+----------------------+

+--------------------+
| episode_len_mean |
|--------------------|
| 151.11 |
| 158.91 |
| 167.36 |
+--------------------+
"""

from ray import tune
from ray.rllib.utils.test_utils import (
add_rllib_example_script_args,
run_rllib_example_script_experiment,
)
parser.add_argument("--stop-iters", type=int, default=200)
parser.add_argument("--stop-timesteps", type=int, default=100000)
parser.add_argument("--stop-reward", type=float, default=150.0)
parser.add_argument(
"--local-mode",
action="store_true",
help="Init Ray in local mode for easier debugging.",
from ray.tune.registry import get_trainable_cls

parser = add_rllib_example_script_args(
default_reward=450.0, default_timesteps=100000, default_iters=200
)


if __name__ == "__main__":
args = parser.parse_args()

ray.init(num_cpus=args.num_cpus or None, local_mode=args.local_mode)
# Force-set `args.checkpoint_freq` to 1.
args.checkpoint_freq = 1

# Simple generic config.
config = (
get_trainable_cls(args.run)
base_config = (
get_trainable_cls(args.algo)
.get_default_config()
.environment("CartPole-v1")
# Run with tracing enabled for tf2.
.framework(args.framework)
# Run 3 trials.
.training(
lr=tune.grid_search([0.01, 0.001, 0.0001]), train_batch_size=2341
) # TEST
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
.resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")))
.training(lr=tune.grid_search([0.01, 0.001, 0.0001]), train_batch_size=2341)
)

stop = {
"training_iteration": args.stop_iters,
"timesteps_total": args.stop_timesteps,
"episode_reward_mean": args.stop_reward,
}

# Run tune for some iterations and generate checkpoints.
tuner = tune.Tuner(
args.run,
param_space=config.to_dict(),
run_config=air.RunConfig(
stop=stop, checkpoint_config=air.CheckpointConfig(checkpoint_frequency=1)
),
)
results = tuner.fit()
results = run_rllib_example_script_experiment(base_config, args)

# Get the best of the 3 trials by using some metric.
# NOTE: Choosing the min `episodes_this_iter` automatically picks the trial
Expand All @@ -79,28 +102,32 @@
best_result = results.get_best_result(metric=metric, mode="min", scope="all")
value_best_metric = best_result.metrics_dataframe[metric].min()
print(
"Best trial's lowest episode length (over all "
"iterations): {}".format(value_best_metric)
f"Best trial was the one with lr={best_result.metrics['config']['lr']}. "
"Reached lowest episode count ({value_best_metric}) in a single iteration."
)

# Confirm, we picked the right trial.
assert value_best_metric <= results.get_dataframe()[metric].min()

# Get the best checkpoints from the trial, based on different metrics.
# Checkpoint with the lowest policy loss value:
if config._enable_new_api_stack:
if args.enable_new_api_stack:
policy_loss_key = "info/learner/default_policy/policy_loss"
else:
policy_loss_key = "info/learner/default_policy/learner_stats/policy_loss"
ckpt = results.get_best_result(metric=policy_loss_key, mode="min").checkpoint
print("Lowest pol-loss: {}".format(ckpt))
best_result = results.get_best_result(metric=policy_loss_key, mode="min")
ckpt = best_result.checkpoint
lowest_policy_loss = best_result.metrics_dataframe[policy_loss_key].min()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also ask here for the best checkpoint along the training path best_result.get_best_checkpoint(metric=policy_loss_key, mode="min")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, cool, so ckpt = best_result.checkpoint returns the very last checkpoint only?

And if the last is not the best one, it's better to do:
best_result.get_best_checkpoint(metric=policy_loss_key, mode="min")
??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually doesn't seem to work well with nested keys.
If I do best_result.get_best_checkpoint(policy_loss_key, mode="min"), I get:

RuntimeError: Invalid metric name ('info', 'learner', 'default_policy', 'learner_stats', 'policy_loss')! You may choose from the following metrics: dict_keys(['custom_metrics', 'episode_media', 'info', 'sampler_results', 'episode_reward_max', 'episode_reward_min', 'episode_reward_mean', 'episode_len_mean', 'episodes_this_iter', 'episodes_timesteps_total', 'policy_reward_min', 'policy_reward_max', 'policy_reward_mean', 'hist_stats', 'sampler_perf', 'num_faulty_episodes', 'connector_metrics', 'num_healthy_workers', 'num_in_flight_async_reqs', 'num_remote_worker_restarts', 'num_agent_steps_sampled', 'num_agent_steps_trained', 'num_env_steps_sampled', 'num_env_steps_trained', 'num_env_steps_sampled_this_iter', 'num_env_steps_trained_this_iter', 'num_env_steps_sampled_throughput_per_sec', 'num_env_steps_trained_throughput_per_sec', 'timesteps_total', 'num_steps_trained_this_iter', 'agent_timesteps_total', 'timers', 'counters', 'done', 'episodes_total', 'training_iteration', 'trial_id', 'date', 'timestamp', 'time_this_iter_s', 'time_total_s', 'pid', 'hostname', 'node_ip', 'config', 'time_since_restore', 'iterations_since_restore', 'perf', 'experiment_tag']).

print(f"Checkpoint w/ lowest policy loss: {ckpt}")
print(f"Lowest policy loss: {lowest_policy_loss}")

# Checkpoint with the highest value-function loss:
if config._enable_new_api_stack:
if args.enable_new_api_stack:
vf_loss_key = "info/learner/default_policy/vf_loss"
else:
vf_loss_key = "info/learner/default_policy/learner_stats/vf_loss"
ckpt = results.get_best_result(metric=vf_loss_key, mode="max").checkpoint
print("Highest vf-loss: {}".format(ckpt))

ray.shutdown()
best_result = results.get_best_result(metric=vf_loss_key, mode="max")
ckpt = best_result.checkpoint
highest_value_fn_loss = best_result.metrics_dataframe[vf_loss_key].max()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

print(f"Checkpoint w/ highest value function loss: {ckpt}")
print(f"Highest value function loss: {highest_value_fn_loss}")
Loading
Loading