Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Fix type hints for original_batches in callbacks. #24214

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rllib/agents/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import os
import tracemalloc
from typing import Dict, Optional, TYPE_CHECKING
from typing import Dict, Optional, Tuple, TYPE_CHECKING

from ray.rllib.env.base_env import BaseEnv
from ray.rllib.env.env_context import EnvContext
Expand Down Expand Up @@ -196,7 +196,7 @@ def on_postprocess_trajectory(
policy_id: PolicyID,
policies: Dict[PolicyID, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[AgentID, SampleBatch],
original_batches: Dict[AgentID, Tuple[Policy, SampleBatch]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what, that's surprising ...
how can we mix these 2 things together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre_batches = {}
for (eps_id, agent_id), collector in self.agent_collectors.items():
# Build only if there is data and agent is part of given episode.
if collector.agent_steps == 0 or eps_id != episode_id:
continue
pid = self.agent_key_to_policy_id[(eps_id, agent_id)]
policy = self.policy_map[pid]
pre_batch = collector.build(policy.view_requirements)
pre_batches[agent_id] = (policy, pre_batch)

pre_batches is a dict with value type Tuple[Policy, SampleBatch]. Then it is fed to on_postprocess_trajectory as name original_batches.

for agent_id, post_batch in sorted(post_batches.items()):
agent_key = (episode_id, agent_id)
pid = self.agent_key_to_policy_id[agent_key]
policy = self.policy_map[pid]
self.callbacks.on_postprocess_trajectory(
worker=get_global_worker(),
episode=episode,
agent_id=agent_id,
policy_id=pid,
policies=self.policy_map,
postprocessed_batch=post_batch,
original_batches=pre_batches,
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh it's Tuple, not Union, I got scared.
Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) @gjoliver

I think there is only very few places in RLlib where we mix different types e.g. in a return value (for example in the sampler code _process_observations()) and no, we probably shouldn't do this.

**kwargs,
) -> None:
"""Called immediately after a policy's postprocess_fn is called.
Expand Down Expand Up @@ -470,7 +470,7 @@ def on_postprocess_trajectory(
policy_id: PolicyID,
policies: Dict[PolicyID, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[AgentID, SampleBatch],
original_batches: Dict[AgentID, Tuple[Policy, SampleBatch]],
**kwargs,
) -> None:
for callback in self._callback_list:
Expand Down
4 changes: 2 additions & 2 deletions rllib/examples/custom_metrics_and_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
custom metric.
"""

from typing import Dict
from typing import Dict, Tuple
import argparse
import numpy as np
import os
Expand Down Expand Up @@ -129,7 +129,7 @@ def on_postprocess_trajectory(
policy_id: str,
policies: Dict[str, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[str, SampleBatch],
original_batches: Dict[str, Tuple[Policy, SampleBatch]],
**kwargs
):
print("postprocessed {} steps".format(postprocessed_batch.count))
Expand Down