Skip to content

Commit

Permalink
[RLlib; Offline RL] Support writing and reading composite spaces samp…
Browse files Browse the repository at this point in the history
…les. (#47046)
  • Loading branch information
simonsays1980 authored Aug 13, 2024
1 parent 0e03169 commit e066289
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 26 deletions.
4 changes: 4 additions & 0 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,10 @@ def setup(self, config: AlgorithmConfig) -> None:
else:
self.offline_data.learner_handles = [self.learner_group._learner]

# Provide the `OfflineData` instance with space information. It might
# need it for reading recorded experiences.
self.offline_data.spaces = self.env_runner_group.get_spaces()

# Run `on_algorithm_init` callback after initialization is done.
self.callbacks.on_algorithm_init(algorithm=self, metrics_logger=self.metrics)

Expand Down
31 changes: 20 additions & 11 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ray
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from ray.rllib.core import DEFAULT_MODULE_ID
from ray.rllib.core.columns import Columns
from ray.rllib.core.rl_module import validate_module_id
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
Expand Down Expand Up @@ -433,6 +434,8 @@ def __init__(self, algo_class: Optional[type] = None):
self.input_read_method_kwargs = {}
self.input_read_schema = {}
self.input_read_episodes = False
self.input_compress_columns = [Columns.OBS, Columns.NEXT_OBS]
self.input_spaces_jsonable = True
self.map_batches_kwargs = {}
self.iter_batches_kwargs = {}
self.prelearner_class = None
Expand All @@ -444,7 +447,7 @@ def __init__(self, algo_class: Optional[type] = None):
self.shuffle_buffer_size = 0
self.output = None
self.output_config = {}
self.output_compress_columns = ["obs", "new_obs"]
self.output_compress_columns = [Columns.OBS, Columns.NEXT_OBS]
self.output_max_file_size = 64 * 1024 * 1024
self.output_max_rows_per_file = None
self.output_write_method = "write_parquet"
Expand Down Expand Up @@ -2388,6 +2391,7 @@ def offline_data(
input_read_method_kwargs: Optional[Dict] = NotProvided,
input_read_schema: Optional[Dict[str, str]] = NotProvided,
input_read_episodes: Optional[bool] = NotProvided,
input_compress_columns: Optional[List[str]] = NotProvided,
map_batches_kwargs: Optional[Dict] = NotProvided,
iter_batches_kwargs: Optional[Dict] = NotProvided,
prelearner_class: Optional[Type] = NotProvided,
Expand All @@ -2399,7 +2403,7 @@ def offline_data(
shuffle_buffer_size: Optional[int] = NotProvided,
output: Optional[str] = NotProvided,
output_config: Optional[Dict] = NotProvided,
output_compress_columns: Optional[bool] = NotProvided,
output_compress_columns: Optional[List[str]] = NotProvided,
output_max_file_size: Optional[float] = NotProvided,
output_max_rows_per_file: Optional[int] = NotProvided,
output_write_method: Optional[str] = NotProvided,
Expand Down Expand Up @@ -2440,16 +2444,19 @@ def offline_data(
schema used is `ray.rllib.offline.offline_data.SCHEMA`. If your data set
contains already the names in this schema, no `input_read_schema` is
needed.
input_read_episodes: If offline data is already stored in RLlib's
input_read_episodes: Whether offline data is already stored in RLlib's
`EpisodeType` format, i.e. `ray.rllib.env.SingleAgentEpisode` (multi
-agent is planned but not supported, yet). Reading directly episodes
avoids an additional transforming step and is usually faster and
therefore the adviced format when your application remains fully inside
of RLlib's schema. The other format is a columnar format and is agnostic
to the RL framework used. Use the latter format, if you are unsure when
to use the data or in which RL framework. The default is to read column
data, i.e. `False`. See also `output_write_episodes` to define the
output data format when recording.
-agent is planned but not supported, yet). Reading episodes directly
avoids additional transform steps and is usually faster and
therefore the recommended format when your application remains fully
inside of RLlib's schema. The other format is a columnar format and is
agnostic to the RL framework used. Use the latter format, if you are
unsure when to use the data or in which RL framework. The default is
to read column data, i.e. `False`. See also `output_write_episodes`
to define the output data format when recording.
input_compress_columns: What input columns are compressed with LZ4 in the
input data. If data is stored in `RLlib`'s `SingleAgentEpisode` (
`MultiAgentEpisode` not supported, yet).
map_batches_kwargs: `kwargs` for the `map_batches` method. These will be
passed into the `ray.data.Dataset.map_batches` method when sampling
without checking. If no arguments passed in the default arguments `{
Expand Down Expand Up @@ -2543,6 +2550,8 @@ def offline_data(
self.input_read_schema = input_read_schema
if input_read_episodes is not NotProvided:
self.input_read_episodes = input_read_episodes
if input_compress_columns is not NotProvided:
self.input_compress_columns = input_compress_columns
if map_batches_kwargs is not NotProvided:
self.map_batches_kwargs = map_batches_kwargs
if iter_batches_kwargs is not NotProvided:
Expand Down
2 changes: 2 additions & 0 deletions rllib/offline/offline_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def sample(
fn_constructor_kwargs={
"config": self.config,
"learner": self.learner_handles[0],
"spaces": self.spaces["__env__"],
},
batch_size=num_samples,
**self.map_batches_kwargs,
Expand Down Expand Up @@ -106,6 +107,7 @@ def sample(
fn_constructor_kwargs={
"config": self.config,
"learner": self.learner_handles,
"spaces": self.spaces["__env__"],
"locality_hints": self.locality_hints,
"module_spec": self.module_spec,
"module_state": module_state,
Expand Down
27 changes: 21 additions & 6 deletions rllib/offline/offline_env_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
from ray.rllib.utils.annotations import override
from ray.rllib.utils.compression import pack_if_needed
from ray.rllib.utils.spaces.space_utils import to_jsonable_if_needed
from ray.rllib.utils.typing import EpisodeType

logger = logging.Logger(__file__)
Expand Down Expand Up @@ -209,6 +210,8 @@ def _map_episodes_to_data(self, samples: List[EpisodeType]) -> None:
samples: List of episodes to be converted.
"""
# Loop through all sampled episodes.
obs_space = self.env.observation_space
action_space = self.env.action_space
for sample in samples:
# Loop through all items of the episode.
for i in range(len(sample)):
Expand All @@ -217,18 +220,30 @@ def _map_episodes_to_data(self, samples: List[EpisodeType]) -> None:
Columns.AGENT_ID: sample.agent_id,
Columns.MODULE_ID: sample.module_id,
# Compress observations, if requested.
Columns.OBS: pack_if_needed(sample.get_observations(i))
Columns.OBS: pack_if_needed(
to_jsonable_if_needed(sample.get_observations(i), obs_space)
)
if Columns.OBS in self.output_compress_columns
else sample.get_observations(i),
else obs_space.to_jsonable_if_needed(
sample.get_observations(i), obs_space
),
# Compress actions, if requested.
Columns.ACTIONS: pack_if_needed(sample.get_actions(i))
Columns.ACTIONS: pack_if_needed(
to_jsonable_if_needed(sample.get_actions(i), action_space)
)
if Columns.OBS in self.output_compress_columns
else sample.get_actions(i),
else action_space.to_jsonable_if_needed(
sample.get_actions(i), action_space
),
Columns.REWARDS: sample.get_rewards(i),
# Compress next observations, if requested.
Columns.NEXT_OBS: pack_if_needed(sample.get_observations(i + 1))
Columns.NEXT_OBS: pack_if_needed(
to_jsonable_if_needed(sample.get_observations(i + 1), obs_space)
)
if Columns.OBS in self.output_compress_columns
else sample.get_observations(i + 1),
else obs_space.to_jsonable_if_needed(
sample.get_observations(i + 1), obs_space
),
Columns.TERMINATEDS: False
if i < len(sample) - 1
else sample.is_terminated,
Expand Down
74 changes: 67 additions & 7 deletions rllib/offline/offline_prelearner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import gymnasium as gym
import numpy as np
import random
import ray
from ray.actor import ActorHandle
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
from typing import Any, Dict, List, Optional, Union, Tuple, TYPE_CHECKING

from ray.rllib.core.columns import Columns
from ray.rllib.core.learner import Learner
Expand All @@ -15,6 +16,7 @@
OverrideToImplementCustomLogic_CallToSuperRecommended,
)
from ray.rllib.utils.compression import unpack_if_needed
from ray.rllib.utils.spaces.space_utils import from_jsonable_if_needed
from ray.rllib.utils.typing import EpisodeType, ModuleID

if TYPE_CHECKING:
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(
self,
config: "AlgorithmConfig",
learner: Union[Learner, list[ActorHandle]],
spaces: Optional[Tuple[gym.Space, gym.Space]] = None,
locality_hints: Optional[list] = None,
module_spec: Optional[MultiRLModuleSpec] = None,
module_state: Optional[Dict[ModuleID, Any]] = None,
Expand Down Expand Up @@ -117,10 +120,16 @@ def __init__(
# Build the module from spec. Note, this will be a MultiRLModule.
self._module = module_spec.build()
self._module.set_state(module_state)

# Store the observation and action space if defined, otherwise we
# set them to `None`. Note, if `None` the `convert_from_jsonable`
# will not convert the input space samples.
self.observation_space, self.action_space = spaces or (None, None)

# Build the learner connector pipeline.
self._learner_connector = self.config.build_learner_connector(
input_observation_space=None,
input_action_space=None,
input_observation_space=self.observation_space,
input_action_space=self.action_space,
)
# Cache the policies to be trained to update weights only for these.
self._policies_to_train = self.config.policies_to_train
Expand All @@ -141,7 +150,12 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, List[EpisodeType]]
self._is_multi_agent,
batch,
schema=SCHEMA | self.config.input_read_schema,
finalize=False,
input_compress_columns=self.config.input_compress_columns,
observation_space=self.observation_space,
action_space=self.action_space,
)["episodes"]

# TODO (simon): Make synching work. Right now this becomes blocking or never
# receives weights. Learners appear to be non accessable via other actors.
# Increase the counter for updating the module.
Expand Down Expand Up @@ -216,9 +230,25 @@ def _map_to_episodes(
batch: Dict[str, np.ndarray],
schema: Dict[str, str] = SCHEMA,
finalize: bool = False,
input_compress_columns: Optional[List[str]] = None,
observation_space: gym.Space = None,
action_space: gym.Space = None,
) -> Dict[str, List[EpisodeType]]:
"""Maps a batch of data to episodes."""

# Set to empty list, if `None`.
input_compress_columns = input_compress_columns or []

# If spaces are given, we can use the space-specific
# conversion method to convert space samples.
if observation_space and action_space:
convert = from_jsonable_if_needed
# Otherwise we use an identity function.
else:

def convert(sample, space):
return sample

episodes = []
# TODO (simon): Give users possibility to provide a custom schema.
for i, obs in enumerate(batch[schema[Columns.OBS]]):
Expand Down Expand Up @@ -248,17 +278,39 @@ def _map_to_episodes(
episode = SingleAgentEpisode(
id_=batch[schema[Columns.EPS_ID]][i],
agent_id=agent_id,
# Observations might be (a) serialized and/or (b) converted
# to a JSONable (when a composite space was used). We unserialize
# and then reconvert from JSONable to space sample.
observations=[
unpack_if_needed(obs),
unpack_if_needed(batch[schema[Columns.NEXT_OBS]][i]),
convert(unpack_if_needed(obs), observation_space)
if Columns.OBS in input_compress_columns
else convert(obs, observation_space),
convert(
unpack_if_needed(batch[schema[Columns.NEXT_OBS]][i]),
observation_space,
)
if Columns.NEXT_OBS in input_compress_columns
else convert(
batch[schema[Columns.NEXT_OBS]][i], observation_space
),
],
infos=[
{},
batch[schema[Columns.INFOS]][i]
if schema[Columns.INFOS] in batch
else {},
],
actions=[batch[schema[Columns.ACTIONS]][i]],
# Actions might be (a) serialized and/or (b) converted to a JSONable
# (when a composite space was used). We unserializer and then
# reconvert from JSONable to space sample.
actions=[
convert(
unpack_if_needed(batch[schema[Columns.ACTIONS]][i]),
action_space,
)
if Columns.ACTIONS in input_compress_columns
else convert(batch[schema[Columns.ACTIONS]][i], action_space)
],
rewards=[batch[schema[Columns.REWARDS]][i]],
terminated=batch[
schema[Columns.TERMINATEDS]
Expand All @@ -272,8 +324,16 @@ def _map_to_episodes(
# t_started=batch[Columns.T if Columns.T in batch else
# "unroll_id"][i][0],
# TODO (simon): Single-dimensional columns are not supported.
# Extra model outputs might be serialized. We unserialize them here
# if needed.
# TODO (simon): Check, if we need here also reconversion from
# JSONable in case of composite spaces.
extra_model_outputs={
k: [v[i]]
k: [
unpack_if_needed(v[i])
if k in input_compress_columns
else v[i]
]
for k, v in batch.items()
if (k not in schema and k not in schema.values())
},
Expand Down
2 changes: 1 addition & 1 deletion rllib/tuned_examples/bc/cartpole_bc.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
# To increase learning speed with multiple learners,
# increase the learning rate correspondingly.
lr=0.0008 * max(1, args.num_gpus**0.5),
train_batch_size_per_learner=2000,
train_batch_size_per_learner=256,
)
)

Expand Down
7 changes: 6 additions & 1 deletion rllib/tuned_examples/bc/cartpole_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@
)
.offline_data(
output="local:///tmp/cartpole/",
output_write_episodes=True,
output_write_episodes=False,
output_max_rows_per_file=1000,
# LZ4-compress columns 'obs', 'new_obs', and 'actions' to
# save disk space and increase performance. Note, this means
# that you have to use `input_compress_columns` in the same
# way when using the data for training in `RLlib`.
output_compress_columns=["obs", "new_obs", "actions"],
)
)

Expand Down
Loading

0 comments on commit e066289

Please sign in to comment.