Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] PolicyMap LRU cache enhancements: Swap out policies (instead of GC'ing and recreating) + use Ray object store (instead of file system). #29513

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b59d74e
wip
sven1977 Sep 28, 2022
76b3c1c
Merge branch 'master' into policy_map_lru_cache_enhancements
sven1977 Oct 12, 2022
d3c2214
wip
sven1977 Oct 14, 2022
e9f2901
wip
sven1977 Oct 19, 2022
3f256d4
wip
sven1977 Oct 19, 2022
ea12a2e
wip
sven1977 Oct 20, 2022
a9c441a
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Oct 27, 2022
9c1be5b
wip
sven1977 Oct 27, 2022
e5def7d
LINT
sven1977 Oct 27, 2022
be5c5bb
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Oct 31, 2022
3c77434
wip
sven1977 Oct 31, 2022
763b150
wip
sven1977 Nov 1, 2022
ec6dbed
wip
sven1977 Nov 1, 2022
a42b7df
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 1, 2022
492ca66
wip
sven1977 Nov 1, 2022
b9bb2a4
wip
sven1977 Nov 2, 2022
0d6100a
wip
sven1977 Nov 3, 2022
4e10e5b
wip
sven1977 Nov 3, 2022
f10fa48
Merge branch 'master' of https://github.com/ray-project/ray into only…
sven1977 Nov 3, 2022
fec7127
wip
sven1977 Nov 3, 2022
6abd07a
LINT
sven1977 Nov 3, 2022
b0995e2
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 3, 2022
1e70273
wip
sven1977 Nov 3, 2022
c605dcb
wip
sven1977 Nov 3, 2022
7918fd3
wip
sven1977 Nov 3, 2022
d98fd28
wip
sven1977 Nov 3, 2022
a8cc1ea
wip
sven1977 Nov 3, 2022
a0575a8
LINT.
sven1977 Nov 3, 2022
717d1c4
wip
sven1977 Nov 3, 2022
963d8a5
Merge branch 'run_regression_test_should_handle_py_files' into policy…
sven1977 Nov 3, 2022
0863f0c
Merge branch 'only_sync_updated_policy_weights' into policy_map_lru_c…
sven1977 Nov 3, 2022
988c943
wip
sven1977 Nov 3, 2022
b29bb06
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 17, 2022
035bd59
wip
sven1977 Nov 17, 2022
85fe5b5
wip
sven1977 Nov 17, 2022
7f872b3
wip
sven1977 Nov 17, 2022
b411c7e
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 22, 2022
98bbfac
LINT
sven1977 Nov 22, 2022
28bb8e3
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 25, 2022
bb8acaa
wip
sven1977 Nov 25, 2022
1a2c28f
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 29, 2022
7f657ec
fixes
sven1977 Nov 29, 2022
3165b3b
fixes
sven1977 Nov 29, 2022
ac0e706
fixes
sven1977 Nov 29, 2022
439ec75
wip
sven1977 Nov 29, 2022
7c8838a
wip
sven1977 Nov 29, 2022
9ce7997
fix
sven1977 Nov 29, 2022
352da04
wip
sven1977 Nov 29, 2022
3a37b97
wip
sven1977 Nov 30, 2022
0979b53
wip
sven1977 Nov 30, 2022
8dbc53e
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 30, 2022
d16d855
wip
sven1977 Nov 30, 2022
7125de0
Merge branch 'master' of https://github.com/ray-project/ray into poli…
sven1977 Nov 30, 2022
9cfd910
wip
sven1977 Nov 30, 2022
7eed7c0
wip
sven1977 Nov 30, 2022
3efb710
wip
sven1977 Nov 30, 2022
e52d67b
wip
sven1977 Nov 30, 2022
e39237d
wip
sven1977 Nov 30, 2022
e5d1362
wip
sven1977 Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .buildkite/pipeline.gpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# - TUNE_TESTING=1 ./ci/env/install-dependencies.sh
# - pip install -Ur ./python/requirements/ml/requirements_ml_docker.txt
# - ./ci/env/env_info.sh
# - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=gpu,gpu_only python/ray/tune/...
# - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=gpu python/ray/tune/...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned this up for a new test that swaps out policy weights on GPU.


- label: ":tv: :brain: RLlib: GPU Examples {A/B}"
- label: ":tv: :brain: RLlib: GPU Tests"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_RLLIB_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
Expand Down
28 changes: 26 additions & 2 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
# the RLlib Team.
# - "needs_gpu": Indicating that a test needs to have a GPU in order to run.
# - "gpu": Indicating that a test may (but doesn't have to) be run in the GPU
# pipeline, defined in .buildkite/pipeline.gpu.yaml.
# pipeline, defined in .buildkite/pipeline.gpu.yml.
# - "multi-gpu": Indicating that a test will definitely be run in the Large GPU
# pipeline, defined in .buildkite/pipeline.gpu.large.yaml.
# pipeline, defined in .buildkite/pipeline.gpu.large.yml.
# - "no_gpu": Indicating that a test should not be run in the GPU pipeline due
# to certain incompatibilities.
# - "no_tf_eager_tracing": Exclude this test from tf-eager tracing tests.
Expand Down Expand Up @@ -201,6 +201,16 @@ py_test(
args = ["--dir=tuned_examples/appo"]
)

py_test(
name = "learning_tests_multi_agent_cartpole_w_100_policies_appo",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/appo/multi-agent-cartpole-w-100-policies-appo.py"],
args = ["--dir=tuned_examples/appo"]
)

# py_test(
# name = "learning_tests_frozenlake_appo",
# main = "tests/run_regression_tests.py",
Expand Down Expand Up @@ -1975,6 +1985,20 @@ py_test(
srcs = ["policy/tests/test_policy.py"]
)

py_test(
name = "policy/tests/test_policy_map",
tags = ["team:rllib", "policy"],
size = "small",
srcs = ["policy/tests/test_policy_map.py"]
)

py_test(
name = "policy/tests/test_policy_state_swapping",
tags = ["team:rllib", "policy", "gpu"],
size = "medium",
srcs = ["policy/tests/test_policy_state_swapping.py"]
)

py_test(
name = "policy/tests/test_rnn_sequencing",
tags = ["team:rllib", "policy"],
Expand Down
73 changes: 57 additions & 16 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
from gym.spaces import Space
import logging
import math
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Type, Union
from typing import (
Any,
Callable,
Container,
Dict,
Optional,
Tuple,
Type,
TYPE_CHECKING,
Union,
)

import ray
from ray.rllib.evaluation.rollout_worker import RolloutWorker
Expand All @@ -12,6 +22,7 @@
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.evaluation.collectors.sample_collector import SampleCollector
from ray.rllib.evaluation.collectors.simple_list_collector import SimpleListCollector
from ray.rllib.evaluation.episode import Episode
from ray.rllib.models import MODEL_DEFAULTS
from ray.rllib.policy.policy import Policy, PolicySpec
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
Expand All @@ -28,6 +39,7 @@
from ray.rllib.utils.from_config import from_config
from ray.rllib.utils.policy import validate_policy_id
from ray.rllib.utils.typing import (
AgentID,
AlgorithmConfigDict,
EnvConfigDict,
EnvType,
Expand Down Expand Up @@ -267,11 +279,11 @@ def __init__(self, algo_class=None):
# `self.multi_agent()`
self.policies = {DEFAULT_POLICY_ID: PolicySpec()}
self.policy_map_capacity = 100
self.policy_map_cache = None
self.policy_mapping_fn = (
lambda aid, episode, worker, **kwargs: DEFAULT_POLICY_ID
)
self.policies_to_train = None
self.policy_states_are_swappable = False
self.observation_fn = None
self.count_steps_by = "env_steps"

Expand Down Expand Up @@ -344,6 +356,12 @@ def __init__(self, algo_class=None):
self.timesteps_per_iteration = DEPRECATED_VALUE
self.min_iter_time_s = DEPRECATED_VALUE
self.collect_metrics_timeout = DEPRECATED_VALUE
self.min_time_s_per_reporting = DEPRECATED_VALUE
self.min_train_timesteps_per_reporting = DEPRECATED_VALUE
self.min_sample_timesteps_per_reporting = DEPRECATED_VALUE
self.input_evaluation = DEPRECATED_VALUE
self.policy_map_cache = DEPRECATED_VALUE

# The following values have moved because of the new ReplayBuffer API
self.buffer_size = DEPRECATED_VALUE
self.prioritized_replay = DEPRECATED_VALUE
Expand All @@ -358,7 +376,6 @@ def __init__(self, algo_class=None):
self.min_time_s_per_reporting = DEPRECATED_VALUE
self.min_train_timesteps_per_reporting = DEPRECATED_VALUE
self.min_sample_timesteps_per_reporting = DEPRECATED_VALUE
self.input_evaluation = DEPRECATED_VALUE
self.horizon = DEPRECATED_VALUE
self.soft_horizon = DEPRECATED_VALUE

Expand Down Expand Up @@ -458,9 +475,9 @@ def update_from_dict(
for k in [
"policies",
"policy_map_capacity",
"policy_map_cache",
"policy_mapping_fn",
"policies_to_train",
"policy_states_are_swappable",
"observation_fn",
"count_steps_by",
]
Expand Down Expand Up @@ -1601,13 +1618,21 @@ def multi_agent(
self,
*,
policies=NotProvided,
policy_map_capacity=NotProvided,
policy_map_cache=NotProvided,
policy_mapping_fn=NotProvided,
policies_to_train=NotProvided,
observation_fn=NotProvided,
count_steps_by=NotProvided,
policy_map_capacity: Optional[int] = NotProvided,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed type annotations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

policy_mapping_fn: Optional[
Callable[[AgentID, "Episode"], PolicyID]
] = NotProvided,
policies_to_train: Optional[
Union[Container[PolicyID], Callable[[PolicyID, SampleBatchType], bool]]
] = NotProvided,
policy_states_are_swappable: Optional[bool] = NotProvided,
observation_fn: Optional[Callable] = NotProvided,
count_steps_by: Optional[str] = NotProvided,
# Deprecated args:
replay_mode=DEPRECATED_VALUE,
# Now done via Ray object store, which has its own cloud-supported
# spillover mechanism.
policy_map_cache=DEPRECATED_VALUE,
) -> "AlgorithmConfig":
"""Sets the config's multi-agent settings.

Expand All @@ -1622,9 +1647,6 @@ def multi_agent(
observation- and action spaces of the policies, and any extra config.
policy_map_capacity: Keep this many policies in the "policy_map" (before
writing least-recently used ones to disk/S3).
policy_map_cache: Where to store overflowing (least-recently used) policies?
Could be a directory (str) or an S3 location. None for using the
default output dir.
policy_mapping_fn: Function mapping agent ids to policy ids. The signature
is: `(agent_id, episode, worker, **kwargs) -> PolicyID`.
policies_to_train: Determines those policies that should be updated.
Expand All @@ -1636,6 +1658,19 @@ def multi_agent(
or not, given the particular batch). This allows you to have a policy
trained only on certain data (e.g. when playing against a certain
opponent).
policy_states_are_swappable: Whether all Policy objects in this map can be
"swapped out" via a simple `state = A.get_state(); B.set_state(state)`,
where `A` and `B` are policy instances in this map. You should set
this to True for significantly speeding up the PolicyMap's cache lookup
times, iff your policies all share the same neural network
architecture and optimizer types. If True, the PolicyMap will not
have to garbage collect old, least recently used policies, but instead
keep them in memory and simply override their state with the state of
the most recently accessed one.
For example, in a league-based training setup, you might have 100s of
the same policies in your map (playing against each other in various
combinations), but all of them share the same state structure
(are "swappable").
observation_fn: Optional function that can be used to enhance the local
agent observations to include more state. See
rllib/evaluation/observation_function.py for more info.
Expand Down Expand Up @@ -1681,9 +1716,6 @@ def multi_agent(
if policy_map_capacity is not NotProvided:
self.policy_map_capacity = policy_map_capacity

if policy_map_cache is not NotProvided:
self.policy_map_cache = policy_map_cache

if policy_mapping_fn is not NotProvided:
# Attempt to create a `policy_mapping_fn` from config dict. Helpful
# is users would like to specify custom callable classes in yaml files.
Expand All @@ -1694,6 +1726,12 @@ def multi_agent(
if observation_fn is not NotProvided:
self.observation_fn = observation_fn

if policy_map_cache != DEPRECATED_VALUE:
deprecation_warning(
old="AlgorithmConfig.multi_agent(policy_map_cache=..)",
error=True,
)

if replay_mode != DEPRECATED_VALUE:
deprecation_warning(
old="AlgorithmConfig.multi_agent(replay_mode=..)",
Expand Down Expand Up @@ -1730,6 +1768,9 @@ def multi_agent(
)
self.policies_to_train = policies_to_train

if policy_states_are_swappable is not None:
self.policy_states_are_swappable = policy_states_are_swappable

return self

def is_multi_agent(self) -> bool:
Expand Down
5 changes: 4 additions & 1 deletion rllib/algorithms/apex_dqn/apex_dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ def sample_from_replay_buffer_place_on_learner_queue_non_blocking(
"""

def wait_on_replay_actors() -> List[Tuple[int, SampleBatchType]]:
"""Wait for the replay actors to finish sampling for timeout seconds."""
"""Wait for the replay actors to finish sampling for timeout seconds.

If the timeout is None, then block on the actors indefinitely.
"""
results = self._replay_actor_manager.fetch_ready_async_reqs(
timeout_seconds=self._replay_req_timeout_s
)
Expand Down
5 changes: 2 additions & 3 deletions rllib/algorithms/dt/tests/test_dt_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,13 @@ def test_loss_coef(self):
config2 = config.copy()
config2[f"loss_coef_{key}"] = 10.0
policy2 = DTTorchPolicy(observation_space, action_space, config2)
# copy the weights over so they output the same loss without scaling
policy2.set_state(policy1.get_state())
# Copy the weights over so they output the same loss without scaling.
policy2.set_weights(policy1.get_weights())

loss2 = policy2.loss(policy2.model, policy2.dist_class, batch)
loss2 = loss2.detach().cpu().item()

# compare loss, should be factor of 10 difference
# Compare loss, should be factor of 10 difference.
self.assertAlmostEqual(
loss2 / loss1,
10.0,
Expand Down
Loading