Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] MultiAgentEpisode: Fix/enhance cut() API. #44677

Merged
merged 12 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions rllib/env/multi_agent_episode.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,15 @@ def add_env_step(
) - {"__all__"}
for agent_id in agent_ids_with_data:
if agent_id not in self.agent_episodes:
self.agent_episodes[agent_id] = SingleAgentEpisode(
sa_episode = SingleAgentEpisode(
agent_id=agent_id,
module_id=self.module_for(agent_id),
multi_agent_episode_id=self.id_,
observation_space=self.observation_space.get(agent_id),
action_space=self.action_space.get(agent_id),
)
sa_episode: SingleAgentEpisode = self.agent_episodes[agent_id]
else:
sa_episode = self.agent_episodes.get(agent_id)

# Collect value to be passed (at end of for-loop) into `add_env_step()`
# call.
Expand Down Expand Up @@ -489,7 +490,6 @@ def add_env_step(
agent_id, None
)
_reward = self._hanging_rewards_end.pop(agent_id, 0.0) + _reward
# _agent_step = len(sa_episode)
# First observation for this agent, we have no hanging action.
# ... [done]? ... -> [1st obs for agent ID]
else:
Expand All @@ -506,6 +506,10 @@ def add_env_step(
)
# Make `add_env_reset` call and continue with next agent.
sa_episode.add_env_reset(observation=_observation, infos=_infos)
# Add possible reward to begin cache.
self._hanging_rewards_begin[agent_id] += _reward
# Now that the SAEps is valid, add it to our dict.
self.agent_episodes[agent_id] = sa_episode
continue

# CASE 3: Step is started (by an action), but not completed (no next obs).
Expand Down Expand Up @@ -584,9 +588,12 @@ def add_env_step(
_reward = self._hanging_rewards_end.pop(agent_id, 0.0) + _reward
# The agent is still alive, just add current reward to cache.
else:
self._hanging_rewards_end[agent_id] = (
self._hanging_rewards_end.get(agent_id, 0.0) + _reward
)
# But has never stepped in this episode -> add to begin cache.
if agent_id not in self.agent_episodes:
self._hanging_rewards_begin[agent_id] += _reward
# Otherwise, add to end cache.
else:
self._hanging_rewards_end[agent_id] += _reward

# If agent is stepping, add timestep to `SingleAgentEpisode`.
if _observation is not None:
Expand Down Expand Up @@ -829,13 +836,6 @@ def cut(self, len_lookback_buffer: int = 0) -> "MultiAgentEpisode":
successor = MultiAgentEpisode(
# Same ID.
id_=self.id_,
# Same agent IDs.
# Same single agents' episode IDs.
agent_episode_ids=self.agent_episode_ids,
agent_module_ids={
aid: self.agent_episodes[aid].module_id for aid in self.agent_ids
},
agent_to_module_mapping_fn=self.agent_to_module_mapping_fn,
observations=self.get_observations(
indices=indices_obs_and_infos, return_list=True
),
Expand All @@ -853,17 +853,28 @@ def cut(self, len_lookback_buffer: int = 0) -> "MultiAgentEpisode":
),
terminateds=self.get_terminateds(),
truncateds=self.get_truncateds(),
# Continue with `self`'s current timestep.
# Continue with `self`'s current timesteps.
env_t_started=self.env_t,
agent_t_started={
aid: self.agent_episodes[aid].t
for aid in self.agent_ids
if not self.agent_episodes[aid].is_done
},
# Same AgentIDs and SingleAgentEpisode IDs.
agent_episode_ids=self.agent_episode_ids,
agent_module_ids={
aid: self.agent_episodes[aid].module_id for aid in self.agent_ids
},
agent_to_module_mapping_fn=self.agent_to_module_mapping_fn,
# All data we provided to the c'tor goes into the lookback buffer.
len_lookback_buffer="auto",
)

# Copy over the current hanging values.
# TODO (sven): These should go into `_begin` instead (follow up PR, in which
# we'll fix the behavior of `cut()`).
successor._hanging_actions_end = copy.deepcopy(self._hanging_actions_end)
successor._hanging_rewards_end = self._hanging_rewards_end.copy()
successor._hanging_extra_model_outputs_end = copy.deepcopy(
# Copy over the hanging (end) values into the hanging (begin) chaches of the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"chaches" -> "caches". And can we leave a note here, why we need the _hanging_actions_begin cache here? Why not writing it into the successor._hanging_actions_end ones?

# successor.
successor._hanging_actions_begin = copy.deepcopy(self._hanging_actions_end)
successor._hanging_rewards_begin = self._hanging_rewards_end.copy()
successor._hanging_extra_model_outputs_begin = copy.deepcopy(
self._hanging_extra_model_outputs_end
)

Expand Down Expand Up @@ -1631,12 +1642,12 @@ def get_sample_batch(self) -> MultiAgentBatch:

def get_return(
self,
consider_hanging_rewards: bool = False,
include_hanging_rewards: bool = False,
) -> float:
"""Returns all-agent return.

Args:
consider_hanging_rewards: Whether we should also consider
include_hanging_rewards: Whether we should also consider
hanging rewards wehn calculating the overall return. Agents might
have received partial rewards, i.e. rewards without an
observation. These are stored in the "hanging" caches (begin and end)
Expand All @@ -1650,7 +1661,7 @@ def get_return(
env_return = sum(
agent_eps.get_return() for agent_eps in self.agent_episodes.values()
)
if consider_hanging_rewards:
if include_hanging_rewards:
for hanging_r in self._hanging_rewards_begin.values():
env_return += hanging_r
for hanging_r in self._hanging_rewards_end.values():
Expand Down Expand Up @@ -1829,11 +1840,13 @@ def _init_single_agent_episodes(
len(observations_per_agent[agent_id]) - 1
)

# Those agents that did NOT step get self.SKIP_ENV_TS_TAG added to their
# mapping.
# Those agents that did NOT step:
# - Get self.SKIP_ENV_TS_TAG added to their env_t_to_agent_t mapping.
# - Get their reward (if any) added up.
for agent_id in all_agent_ids:
if agent_id not in obs and agent_id not in done_per_agent:
self.env_t_to_agent_t[agent_id].append(self.SKIP_ENV_TS_TAG)
self._hanging_rewards_end[agent_id] += rew.get(agent_id, 0.0)

# Update per-agent lookback buffer sizes to be used when creating the
# indiviual `SingleAgentEpisode` objects below.
Expand Down Expand Up @@ -2427,6 +2440,10 @@ def _get_hanging_value(self, what: str, agent_id: AgentID) -> Any:

def _del_hanging(self, agent_id: AgentID) -> None:
"""Deletes all hanging action, reward, extra_model_outputs of given agent."""
self._hanging_actions_begin.pop(agent_id, None)
self._hanging_extra_model_outputs_begin.pop(agent_id, None)
self._hanging_rewards_begin.pop(agent_id, None)

self._hanging_actions_end.pop(agent_id, None)
self._hanging_extra_model_outputs_end.pop(agent_id, None)
self._hanging_rewards_end.pop(agent_id, None)
Expand Down
184 changes: 173 additions & 11 deletions rllib/env/tests/test_multi_agent_episode.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,14 @@ def test_add_env_step(self):
terminateds=terminated,
truncateds=truncated,
)
# Assert that the action buffer for agent 4 is full.
# Assert that the action cache for agent 4 is used.
# Note, agent 4 acts, but receives no observation.
# Note also, all other buffers are always full, due to their defaults.
# Note also, all other caches are always used, due to their defaults.
self.assertTrue(episode._hanging_actions_end["agent_4"] is not None)
# Assert that the reward buffers of agents 3 and 5 are at 1.0.
# Assert that the reward caches of agents 3 and 5 are there.
# For agent_5 (b/c it has never done anything), we add to the begin cache.
check(episode._hanging_rewards_end["agent_3"], 2.2)
check(episode._hanging_rewards_end["agent_5"], 1.0)
check(episode._hanging_rewards_begin["agent_5"], 1.0)

def test_get_observations(self):
# Generate simple records for a multi agent environment.
Expand Down Expand Up @@ -2210,15 +2211,176 @@ def test_other_getters(self):
# --- is_terminated, is_truncated ---

def test_cut(self):
# Simple multi-agent episode, in which all agents always step.
episode = self._create_simple_episode(
[
{"a0": 0, "a1": 0},
{"a0": 1, "a1": 1},
{"a0": 2, "a1": 2},
]
)
successor = episode.cut()
check(len(successor), 0)
check(successor.env_t_started, 2)
check(successor.env_t, 2)
check(successor.env_t_to_agent_t, {"a0": [0], "a1": [0]})
a0 = successor.agent_episodes["a0"]
a1 = successor.agent_episodes["a1"]
check((len(a0), len(a1)), (0, 0))
check((a0.t_started, a1.t_started), (2, 2))
check((a0.t, a1.t), (2, 2))
check((a0.observations, a1.observations), ([2], [2]))
check((a0.actions, a1.actions), ([], []))
check((a0.rewards, a1.rewards), ([], []))
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {})
check(successor._hanging_extra_model_outputs_end, {})

# Multi-agent episode with lookback buffer, in which all agents always step.
episode = self._create_simple_episode(
[
{"a0": 0, "a1": 0},
{"a0": 1, "a1": 1},
{"a0": 2, "a1": 2},
{"a0": 3, "a1": 3},
],
len_lookback_buffer=2,
)
# Cut with lookback=0 argument (default).
successor = episode.cut()
check(len(successor), 0)
check(successor.env_t_started, 1)
check(successor.env_t, 1)
check(successor.env_t_to_agent_t, {"a0": [0], "a1": [0]})
a0 = successor.agent_episodes["a0"]
a1 = successor.agent_episodes["a1"]
check((len(a0), len(a1)), (0, 0))
check((a0.t_started, a1.t_started), (1, 1))
check((a0.t, a1.t), (1, 1))
check((a0.observations, a1.observations), ([3], [3]))
check((a0.actions, a1.actions), ([], []))
check((a0.rewards, a1.rewards), ([], []))
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {})
check(successor._hanging_extra_model_outputs_end, {})
# Cut with lookback=2 argument.
successor = episode.cut(len_lookback_buffer=2)
check(len(successor), 0)
check(successor.env_t_started, 1)
check(successor.env_t, 1)
check(successor.env_t_to_agent_t["a0"].data, [0, 1, 2])
check(successor.env_t_to_agent_t["a1"].data, [0, 1, 2])
check(successor.env_t_to_agent_t["a0"].lookback, 2)
check(successor.env_t_to_agent_t["a1"].lookback, 2)
a0 = successor.agent_episodes["a0"]
a1 = successor.agent_episodes["a1"]
check((len(a0), len(a1)), (0, 0))
check((a0.t_started, a1.t_started), (1, 1))
check((a0.t, a1.t), (1, 1))
check((a0.observations, a1.observations), ([3], [3]))
check((a0.actions, a1.actions), ([], []))
check((a0.rewards, a1.rewards), ([], []))
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {})
check(successor._hanging_extra_model_outputs_end, {})

# Multi-agent episode, in which one agent has a long sequence of not acting,
# but does receive (intermittend/hanging) rewards during this time.
observations = [
{"a0": 0, "a1": 0}, # 0
{"a0": 1}, # 1
{"a0": 2}, # 2
{"a0": 3}, # 3
]
episode = MultiAgentEpisode(
observations=observations,
actions=observations[:-1],
rewards=[
{"a0": 0.0, "a1": 0.0}, # 0
{"a0": 0.1, "a1": 0.1}, # 1
{"a0": 0.2, "a1": 0.2}, # 2
],
len_lookback_buffer=0,
)
successor = episode.cut()
check(len(successor), 0)
check(successor.env_t_started, 3)
check(successor.env_t, 3)
a0 = successor.agent_episodes["a0"]
self.assertTrue("a1" not in successor.agent_episodes)
check(len(a0), 0)
check(a0.t_started, 3)
check(a0.t, 3)
check(a0.observations, [3])
check(a0.actions, [])
check(a0.rewards, [])
check(successor._hanging_actions_begin, {"a1": 0})
check(successor._hanging_rewards_begin, {"a1": 0.3})
check(successor._hanging_extra_model_outputs_begin, {"a1": {}})
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {"a1": 0.0})
check(successor._hanging_extra_model_outputs_end, {})
# Add a few timesteps to successor and test the resulting episode.
successor.add_env_step(
observations={"a0": 4},
actions={"a0": 3},
rewards={"a0": 0.3, "a1": 0.3},
)
check(len(successor), 1)
check(successor.env_t_started, 3)
check(successor.env_t, 4)
# Just b/c we added an intermittend reward for a1 does not mean it should
# already have a SAEps in `successor`. It still hasn't received its first obs
# yet after the cut.
self.assertTrue("a1" not in successor.agent_episodes)
check(len(a0), 1)
check(a0.t_started, 3)
check(a0.t, 4)
check(a0.observations, [3, 4])
check(a0.actions, [3])
check(a0.rewards, [0.3])
check(successor._hanging_actions_begin, {"a1": 0})
check(successor._hanging_rewards_begin, {"a1": 0.6})
check(successor._hanging_extra_model_outputs_begin, {"a1": {}})
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {"a1": 0.0})
check(successor._hanging_extra_model_outputs_end, {})
# Now a1 actually does receive its next obs.
successor.add_env_step(
observations={"a0": 5, "a1": 5}, # <- this is a1's 1st obs in this chunk
actions={"a0": 4},
rewards={"a0": 0.4, "a1": 0.4},
)
check(len(successor), 2)
check(successor.env_t_started, 3)
check(successor.env_t, 5)
a1 = successor.agent_episodes["a1"]
check((len(a0), len(a1)), (2, 0))
check((a0.t_started, a1.t_started), (3, 0))
check((a0.t, a1.t), (5, 0))
check((a0.observations, a1.observations), ([3, 4, 5], [5]))
check((a0.actions, a1.actions), ([3, 4], []))
check((a0.rewards, a1.rewards), ([0.3, 0.4], []))
# Begin caches keep accumulating a1's rewards.
check(successor._hanging_actions_begin, {"a1": 0})
check(successor._hanging_rewards_begin, {"a1": 1.0})
check(successor._hanging_extra_model_outputs_begin, {"a1": {}})
# But end caches are now empty (due to a1's observation/finished step).
check(successor._hanging_actions_end, {})
check(successor._hanging_rewards_end, {"a1": 0.0})
check(successor._hanging_extra_model_outputs_end, {})

# Generate a simple multi-agent episode and check all internals after
# construction.
observations = [{"a0": 0, "a1": 0}, {"a1": 1}, {"a1": 2}, {"a1": 3}]
actions = [{"a0": 0, "a1": 0}, {"a1": 1}, {"a1": 2}]
rewards = [{"a0": 0.1, "a1": 0.1}, {"a1": 0.2}, {"a1": 0.3}]
episode_1 = MultiAgentEpisode(
observations=observations, actions=actions, rewards=rewards
episode_1 = self._create_simple_episode(
[
{"a0": 0, "a1": 0},
{"a1": 1},
{"a1": 2},
{"a1": 3},
],
len_lookback_buffer="auto",
)

episode_2 = episode_1.cut()
check(episode_1.id_, episode_2.id_)
check(len(episode_1), 0)
Expand Down Expand Up @@ -3126,7 +3288,7 @@ def test_get_return(self):
# `get_return()`.
buffered_rewards = sum(episode._hanging_rewards_end.values())
self.assertTrue(
episode.get_return(consider_hanging_rewards=True),
episode.get_return(include_hanging_rewards=True),
agent_returns + buffered_rewards,
)

Expand Down
Loading