Skip to content

Commit

Permalink
[RLlib; Offline RL] Offline performance cleanup. (ray-project#47731)
Browse files Browse the repository at this point in the history
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
simonsays1980 authored and ujjawal-khare committed Oct 15, 2024
1 parent 0e50a0f commit 961a705
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 54 deletions.
25 changes: 25 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2636,6 +2636,31 @@ def offline_data(
`MultiAgentEpisode` not supported, yet). Note,
`rllib.core.columns.Columns.OBS` will also try to decompress
`rllib.core.columns.Columns.NEXT_OBS`.
materialize_data: Whether the raw data should be materialized in memory.
This boosts performance, but requires enough memory to avoid an OOM, so
make sure that your cluster has the resources available. For very large
data you might want to switch to streaming mode by setting this to
`False` (default). If your algorithm does not need the RLModule in the
Learner connector pipeline or all (learner) connectors are stateless
you should consider setting `materialize_mapped_data` to `True`
instead (and set `materialize_data` to `False`). If your data does not
fit into memory and your Learner connector pipeline requires an RLModule
or is stateful, set both `materialize_data` and
`materialize_mapped_data` to `False`.
materialize_mapped_data: Whether the data should be materialized after
running it through the Learner connector pipeline (i.e. after running
the `OfflinePreLearner`). This improves performance, but should only be
used in case the (learner) connector pipeline does not require an
RLModule and the (learner) connector pipeline is stateless. For example,
MARWIL's Learner connector pipeline requires the RLModule for value
function predictions and training batches would become stale after some
iterations causing learning degradation or divergence. Also ensure that
your cluster has enough memory available to avoid an OOM. If set to
`True` (True), make sure that `materialize_data` is set to `False` to
avoid materialization of two datasets. If your data does not fit into
memory and your Learner connector pipeline requires an RLModule or is
stateful, set both `materialize_data` and `materialize_mapped_data` to
`False`.
map_batches_kwargs: Keyword args for the `map_batches` method. These will be
passed into the `ray.data.Dataset.map_batches` method when sampling
without checking. If no arguments passed in the default arguments
Expand Down
5 changes: 4 additions & 1 deletion rllib/algorithms/marwil/tests/test_marwil.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ def possibly_masked_mean(data_):
.unwrapped()
.forward_train({k: v for k, v in batch[DEFAULT_MODULE_ID].items()})
)
advantages = batch[DEFAULT_MODULE_ID][Columns.ADVANTAGES].detach().cpu().numpy()
advantages = (
batch[DEFAULT_MODULE_ID][Columns.VALUE_TARGETS].detach().cpu().numpy()
- fwd_out["vf_preds"].detach().cpu().numpy()
)
advantages_squared = possibly_masked_mean(np.square(advantages))
c_2 = 100.0 + 1e-8 * (advantages_squared - 100.0)
c = np.sqrt(c_2)
Expand Down
4 changes: 1 addition & 3 deletions rllib/algorithms/marwil/torch/marwil_torch_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def possibly_masked_mean(data_):
# Otherwise, compute advantages.
else:
# cumulative_rewards = batch[Columns.ADVANTAGES]
value_fn_out = module.compute_values(
batch, embeddings=fwd_out.get(Columns.EMBEDDINGS)
)
value_fn_out = fwd_out[Columns.VF_PREDS]
advantages = batch[Columns.VALUE_TARGETS] - value_fn_out
advantages_squared_mean = possibly_masked_mean(torch.pow(advantages, 2.0))

Expand Down
1 change: 0 additions & 1 deletion rllib/core/learner/learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,6 @@ def update_from_iterator(
)

self._check_is_built()
# minibatch_size = minibatch_size or 32

# Call `before_gradient_based_update` to allow for non-gradient based
# preparations-, logging-, and update logic to happen.
Expand Down
61 changes: 30 additions & 31 deletions rllib/offline/offline_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ def __init__(self, config: AlgorithmConfig):
self.default_read_method_kwargs | self.config.input_read_method_kwargs
)

# If data should be materialized.
self.materialize_data = config.materialize_data
# If mapped data should be materialized.
self.materialize_mapped_data = config.materialize_mapped_data
# Flag to identify, if data has already been mapped with the
# `OfflinePreLearner`.
self.data_is_mapped = False

# Set the filesystem.
self.filesystem = self.config.output_filesystem
self.filesystem_kwargs = self.config.output_filesystem_kwargs
self.filesystem = self.config.input_filesystem
self.filesystem_kwargs = self.config.input_filesystem_kwargs
self.filesystem_object = None

# If a specific filesystem is given, set it up. Note, this could
Expand All @@ -51,13 +59,13 @@ def __init__(self, config: AlgorithmConfig):

self.filesystem_object = gcsfs.GCSFileSystem(**self.filesystem_kwargs)
elif self.filesystem == "s3":
from pyarrow import fs

self.filesystem_object = fs.S3FileSystem(**self.filesystem_kwargs)
self.filesystem_object = pyarrow.fs.S3FileSystem(**self.filesystem_kwargs)
elif self.filesystem == "abs":
import adlfs

self.filesystem_object = adlfs.AzureBlobFileSystem(**self.filesystem_kwargs)
elif isinstance(self.filesystem, pyarrow.fs.FileSystem):
self.filesystem_object = self.filesystem
elif self.filesystem is not None:
raise ValueError(
f"Unknown filesystem: {self.filesystem}. Filesystems can be "
Expand Down Expand Up @@ -106,32 +114,23 @@ def sample(
return_iterator: bool = False,
num_shards: int = 1,
):
if (
not return_iterator or (return_iterator and num_shards <= 1)
) and not self.batch_iterator:
# If no iterator should be returned, or if we want to return a single
# batch iterator, we instantiate the batch iterator once, here.
# TODO (simon, sven): The iterator depends on the `num_samples`, i.e.abs
# sampling later with a different batch size would need a
# reinstantiation of the iterator.
self.batch_iterator = self.data.map_batches(
self.prelearner_class,
fn_constructor_kwargs={
"config": self.config,
"learner": self.learner_handles[0],
"spaces": self.spaces[INPUT_ENV_SPACES],
},
batch_size=num_samples,
**self.map_batches_kwargs,
).iter_batches(
batch_size=num_samples,
**self.iter_batches_kwargs,
)

# Do we want to return an iterator or a single batch?
if return_iterator:
# In case of multiple shards, we return multiple
# `StreamingSplitIterator` instances.
# Materialize the mapped data, if necessary. This runs for all the
# data the `OfflinePreLearner` logic and maps them to `MultiAgentBatch`es.
# TODO (simon, sven): This would never update the module nor the
# the connectors. If this is needed we have to check, if we give
# (a) only an iterator and let the learner and OfflinePreLearner
# communicate through the object storage. This only works when
# not materializing.
# (b) Rematerialize the data every couple of iterations. This is
# is costly.
if not self.data_is_mapped:
# Constructor `kwargs` for the `OfflinePreLearner`.
fn_constructor_kwargs = {
"config": self.config,
"learner": self.learner_handles[0],
"spaces": self.spaces[INPUT_ENV_SPACES],
}
# If we have multiple learners, add to the constructor `kwargs`.
if num_shards > 1:
# Call here the learner to get an up-to-date module state.
# TODO (simon): This is a workaround as along as learners cannot
Expand Down
8 changes: 2 additions & 6 deletions rllib/tuned_examples/bc/cartpole_bc.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,15 @@
# The number of iterations to be run per learner when in multi-learner
# mode in a single RLlib training iteration. Leave this to `None` to
# run an entire epoch on the dataset during a single RLlib training
# iteration. For single-learner mode, 1 is the only option.
# iteration. For single-learner mode 1 is the only option.
dataset_num_iters_per_learner=1 if args.num_gpus == 0 else None,
)
.training(
train_batch_size_per_learner=1024,
# To increase learning speed with multiple learners,
# increase the learning rate correspondingly.
lr=0.0008 * max(1, args.num_gpus**0.5),
)
.rl_module(
model_config=DefaultModelConfig(
fcnet_hiddens=[256, 256],
),
train_batch_size_per_learner=1024,
)
)

Expand Down
23 changes: 15 additions & 8 deletions rllib/tuned_examples/cql/pendulum_cql.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,21 @@
)
.offline_data(
input_=[data_path.as_posix()],
# Define the number of reading blocks, these should be larger than 1
# and aligned with the data size.
input_read_method_kwargs={"override_num_blocks": max(args.num_gpus, 2)},
# Concurrency defines the number of processes that run the
# `map_batches` transformations. This should be aligned with the
# 'prefetch_batches' argument in 'iter_batches_kwargs'.
map_batches_kwargs={"concurrency": max(2, args.num_gpus * 2)},
actions_in_input_normalized=True,
# The `kwargs` for the `input_read_method`. We override the
# the number of blocks to pull at once b/c our dataset is
# small.
input_read_method_kwargs={"override_num_blocks": max(args.num_gpus * 2, 2)},
# The `kwargs` for the `map_batches` method in which our
# `OfflinePreLearner` is run. 2 data workers should be run
# concurrently.
map_batches_kwargs={"concurrency": 2, "num_cpus": 2},
# The `kwargs` for the `iter_batches` method. Due to the small
# dataset we choose only a single batch to prefetch.
iter_batches_kwargs={"prefetch_batches": 1},
# The number of iterations to be run per learner when in multi-learner
# mode in a single RLlib training iteration. Leave this to `None` to
# run an entire epoch on the dataset during a single RLlib training
# iteration. For single-learner mode 1 is the only option.
dataset_num_iters_per_learner=1 if args.num_gpus == 0 else None,
# TODO (sven): Has this any influence in the connectors?
actions_in_input_normalized=True,
Expand Down
19 changes: 15 additions & 4 deletions rllib/tuned_examples/marwil/cartpole_marwil.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,21 @@
# as remote learners.
.offline_data(
input_=[data_path.as_posix()],
# Note, we want to have at leat 2 data blocks to read from such that
# concurrency in `map_batches` works.
input_read_method_kwargs={"override_num_blocks": max(args.num_gpus, 2)},
prelearner_module_synch_period=20,
# The `kwargs` for the `input_read_method`. We override the
# the number of blocks to pull at once b/c our dataset is
# small.
input_read_method_kwargs={"override_num_blocks": max(args.num_gpus * 2, 2)},
# The `kwargs` for the `map_batches` method in which our
# `OfflinePreLearner` is run. 2 data workers should be run
# concurrently.
map_batches_kwargs={"concurrency": 2, "num_cpus": 2},
# The `kwargs` for the `iter_batches` method. Due to the small
# dataset we choose only a single batch to prefetch.
iter_batches_kwargs={"prefetch_batches": 1},
# The number of iterations to be run per learner when in multi-learner
# mode in a single RLlib training iteration. Leave this to `None` to
# run an entire epoch on the dataset during a single RLlib training
# iteration. For single-learner mode 1 is the only option.
dataset_num_iters_per_learner=1 if args.num_gpus == 0 else None,
)
.training(
Expand Down

0 comments on commit 961a705

Please sign in to comment.