Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add "shuffle batch per epoch" option. #47458

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,13 @@ def __init__(self, algo_class: Optional[type] = None):
# Simple logic for now: If None, use `train_batch_size`.
self.train_batch_size_per_learner = None
self.train_batch_size = 32 # @OldAPIStack

# These setting have been adopted from the original PPO batch settings:
# num_sgd_iter, minibatch_size, and shuffle_sequences.
self.num_epochs = 1
self.minibatch_size = None
self.shuffle_batch_per_epoch = False

# TODO (sven): Unsolved problem with RLModules sometimes requiring settings from
# the main AlgorithmConfig. We should not require the user to provide those
# settings in both, the AlgorithmConfig (as property) AND the model config
Expand Down Expand Up @@ -2064,6 +2071,9 @@ def training(
grad_clip_by: Optional[str] = NotProvided,
train_batch_size: Optional[int] = NotProvided,
train_batch_size_per_learner: Optional[int] = NotProvided,
num_epochs: Optional[int] = NotProvided,
minibatch_size: Optional[int] = NotProvided,
shuffle_batch_per_epoch: Optional[bool] = NotProvided,
model: Optional[dict] = NotProvided,
optimizer: Optional[dict] = NotProvided,
max_requests_in_flight_per_sampler_worker: Optional[int] = NotProvided,
Expand All @@ -2073,6 +2083,8 @@ def training(
] = NotProvided,
add_default_connectors_to_learner_pipeline: Optional[bool] = NotProvided,
learner_config_dict: Optional[Dict[str, Any]] = NotProvided,
# Deprecated args.
num_sgd_iter=DEPRECATED_VALUE,
) -> "AlgorithmConfig":
"""Sets the training related configuration.

Expand Down Expand Up @@ -2122,6 +2134,15 @@ def training(
stack, this setting should no longer be used. Instead, use
`train_batch_size_per_learner` (in combination with
`num_learners`).
num_epochs: The number of complete passes over the entire train batch (per
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! For Offline RL we might want to add here that an epoch might loop over the entire dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add!

Learner). Each pass might be further split into n minibatches (if
`minibatch_size` provided).
minibatch_size: The size of minibatches to use to further split the train
batch into.
shuffle_batch_per_epoch: Whether to shuffle the train batch once per epoch.
If the train batch has a time rank (axis=1), shuffling will only take
place along the batch axis to not disturb any intact (episode)
trajectories.
model: Arguments passed into the policy model. See models/catalog.py for a
full list of the available model options.
TODO: Provide ModelConfig objects instead of dicts.
Expand Down Expand Up @@ -2168,6 +2189,14 @@ def training(
Returns:
This updated AlgorithmConfig object.
"""
if num_sgd_iter != DEPRECATED_VALUE:
deprecation_warning(
old="config.training(num_sgd_iter=..)",
new="config.training(num_epochs=..)",
error=False,
)
num_epochs = num_sgd_iter

if gamma is not NotProvided:
self.gamma = gamma
if lr is not NotProvided:
Expand All @@ -2185,6 +2214,13 @@ def training(
self.train_batch_size_per_learner = train_batch_size_per_learner
if train_batch_size is not NotProvided:
self.train_batch_size = train_batch_size
if num_epochs is not NotProvided:
self.num_epochs = num_epochs
if minibatch_size is not NotProvided:
self.minibatch_size = minibatch_size
if shuffle_batch_per_epoch is not NotProvided:
self.shuffle_batch_per_epoch = shuffle_batch_per_epoch

if model is not NotProvided:
self.model.update(model)
if (
Expand Down
32 changes: 16 additions & 16 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,19 @@ def __init__(self, algo_class=None):
self.use_kl_loss = False
self.kl_coeff = 1.0
self.kl_target = 0.01
# TODO (sven): Activate once v-trace sequences in non-RNN batch are solved.
# If we switch this on right now, the shuffling would destroy the rollout
# sequences (non-zero-padded!) needed in the batch for v-trace.
# self.shuffle_batch_per_epoch = True

# Override some of IMPALAConfig's default values with APPO-specific values.
self.num_env_runners = 2
self.min_time_s_per_iteration = 10
self.num_gpus = 0
self.num_multi_gpu_tower_stacks = 1
self.minibatch_buffer_size = 1
self.num_sgd_iter = 1
self.target_network_update_freq = 1
self.replay_proportion = 0.0
self.replay_buffer_num_slots = 100
self.learner_queue_size = 16
self.learner_queue_timeout = 300
self.max_sample_requests_in_flight_per_worker = 2
self.broadcast_interval = 1

self.grad_clip = 40.0
# Note: Only when using enable_rl_module_and_learner=True can the clipping mode
# be configured by the user. On the old API stack, RLlib will always clip by
Expand All @@ -140,6 +137,12 @@ def __init__(self, algo_class=None):
# Add constructor kwargs here (if any).
}

self.num_gpus = 0 # @OldAPIStack
self.num_multi_gpu_tower_stacks = 1 # @OldAPIStack
self.minibatch_buffer_size = 1 # @OldAPIStack
self.replay_proportion = 0.0 # @OldAPIStack
self.replay_buffer_num_slots = 100 # @OldAPIStack

# __sphinx_doc_end__
# fmt: on

Expand Down Expand Up @@ -185,13 +188,10 @@ def training(
target_network_update_freq: The frequency to update the target policy and
tune the kl loss coefficients that are used during training. After
setting this parameter, the algorithm waits for at least
`target_network_update_freq * minibatch_size * num_sgd_iter` number of
samples to be trained on by the learner group before updating the target
networks and tuned the kl loss coefficients that are used during
training.
NOTE: This parameter is only applicable when using the Learner API
(enable_rl_module_and_learner=True).

`target_network_update_freq` number of environment samples to be trained
on before updating the target networks and tune the kl loss
coefficients. NOTE: This parameter is only applicable when using the
Learner API (enable_rl_module_and_learner=True).

Returns:
This updated AlgorithmConfig object.
Expand Down Expand Up @@ -292,7 +292,7 @@ def training_step(self) -> ResultDict:

# Update the target network and the KL coefficient for the APPO-loss.
# The target network update frequency is calculated automatically by the product
# of `num_sgd_iter` setting (usually 1 for APPO) and `minibatch_buffer_size`.
# of `num_epochs` setting (usually 1 for APPO) and `minibatch_buffer_size`.
if self.config.enable_rl_module_and_learner:
if NUM_TARGET_UPDATES in train_results:
self._counters[NUM_TARGET_UPDATES] += train_results[NUM_TARGET_UPDATES]
Expand All @@ -309,7 +309,7 @@ def training_step(self) -> ResultDict:
)
]
target_update_freq = (
self.config.num_sgd_iter * self.config.minibatch_buffer_size
self.config.num_epochs * self.config.minibatch_buffer_size
)
if cur_ts - last_update > target_update_freq:
self._counters[NUM_TARGET_UPDATES] += 1
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/appo/appo_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ def after_gradient_based_update(self, *, timesteps: Dict[str, Any]) -> None:
# TODO (avnish) Using steps trained here instead of sampled ... I'm not sure
# why the other implementation uses sampled.
# The difference in steps sampled/trained is pretty
# much always going to be larger than self.config.num_sgd_iter *
# much always going to be larger than self.config.num_epochs *
# self.config.minibatch_buffer_size unless the number of steps collected
# is really small. The thing is that the default rollout fragment length
# is 50, so the minibatch buffer size * num_sgd_iter is going to be
# is 50, so the minibatch buffer size * num_epochs is going to be
# have to be 50 to even meet the threshold of having delayed target
# updates.
# We should instead have the target / kl threshold update be based off
# of the train_batch_size * some target update frequency * num_sgd_iter.
# of the train_batch_size * some target update frequency * num_epochs.

last_update_ts_key = (module_id, LAST_TARGET_UPDATE_TS)
if timestep - self.metrics.peek(
Expand Down
8 changes: 4 additions & 4 deletions rllib/algorithms/cql/cql.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def _training_step_new_api_stack(self) -> ResultDict:
# Sampling from offline data.
with self.metrics.log_time((TIMERS, OFFLINE_SAMPLING_TIMER)):
# Return an iterator in case we are using remote learners.
batch = self.offline_data.sample(
batch_or_iterator = self.offline_data.sample(
num_samples=self.config.train_batch_size_per_learner,
num_shards=self.config.num_learners,
return_iterator=self.config.num_learners > 1,
Expand All @@ -315,9 +315,9 @@ def _training_step_new_api_stack(self) -> ResultDict:
# Updating the policy.
with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)):
# TODO (simon, sven): Check, if we should execute directly s.th. like
# update_from_iterator.
learner_results = self.learner_group.update_from_batch(
batch,
# `LearnerGroup.update_from_iterator()`.
learner_results = self.learner_group._update(
batch=batch_or_iterator,
minibatch_size=self.config.train_batch_size_per_learner,
num_iters=self.config.dataset_num_iters_per_learner,
)
Expand Down
51 changes: 14 additions & 37 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def __init__(self, algo_class=None):
self.vtrace_clip_pg_rho_threshold = 1.0
self.num_multi_gpu_tower_stacks = 1 # @OldAPIstack
self.minibatch_buffer_size = 1 # @OldAPIstack
self.num_sgd_iter = 1
self.replay_proportion = 0.0 # @OldAPIstack
self.replay_buffer_num_slots = 0 # @OldAPIstack
self.learner_queue_size = 3
Expand Down Expand Up @@ -168,10 +167,10 @@ def __init__(self, algo_class=None):
self._lr_vf = 0.0005 # @OldAPIstack

# Override some of AlgorithmConfig's default values with IMPALA-specific values.
self.num_learners = 1
self.rollout_fragment_length = 50
self.train_batch_size = 500 # @OldAPIstack
self.train_batch_size_per_learner = 500
self._minibatch_size = "auto"
self.num_env_runners = 2
self.num_gpus = 1 # @OldAPIstack
self.lr = 0.0005
Expand Down Expand Up @@ -200,8 +199,6 @@ def training(
num_gpu_loader_threads: Optional[int] = NotProvided,
num_multi_gpu_tower_stacks: Optional[int] = NotProvided,
minibatch_buffer_size: Optional[int] = NotProvided,
minibatch_size: Optional[Union[int, str]] = NotProvided,
num_sgd_iter: Optional[int] = NotProvided,
replay_proportion: Optional[float] = NotProvided,
replay_buffer_num_slots: Optional[int] = NotProvided,
learner_queue_size: Optional[int] = NotProvided,
Expand Down Expand Up @@ -252,15 +249,7 @@ def training(
- This enables us to preload data into these stacks while another stack
is performing gradient calculations.
minibatch_buffer_size: How many train batches should be retained for
minibatching. This conf only has an effect if `num_sgd_iter > 1`.
minibatch_size: The size of minibatches that are trained over during
each SGD iteration. If "auto", will use the same value as
`train_batch_size`.
Note that this setting only has an effect if
`enable_rl_module_and_learner=True` and it must be a multiple of
`rollout_fragment_length` or `sequence_length` and smaller than or equal
to `train_batch_size`.
num_sgd_iter: Number of passes to make over each train batch.
minibatching. This conf only has an effect if `num_epochs > 1`.
replay_proportion: Set >0 to enable experience replay. Saved samples will
be replayed with a p:1 proportion to new data samples.
replay_buffer_num_slots: Number of sample batches to store for replay.
Expand Down Expand Up @@ -330,8 +319,6 @@ def training(
self.num_multi_gpu_tower_stacks = num_multi_gpu_tower_stacks
if minibatch_buffer_size is not NotProvided:
self.minibatch_buffer_size = minibatch_buffer_size
if num_sgd_iter is not NotProvided:
self.num_sgd_iter = num_sgd_iter
if replay_proportion is not NotProvided:
self.replay_proportion = replay_proportion
if replay_buffer_num_slots is not NotProvided:
Expand Down Expand Up @@ -374,8 +361,6 @@ def training(
self._separate_vf_optimizer = _separate_vf_optimizer
if _lr_vf is not NotProvided:
self._lr_vf = _lr_vf
if minibatch_size is not NotProvided:
self._minibatch_size = minibatch_size

return self

Expand Down Expand Up @@ -452,14 +437,14 @@ def validate(self) -> None:
# Learner API specific checks.
if (
self.enable_rl_module_and_learner
and self._minibatch_size != "auto"
and self.minibatch_size is not None
and not (
(self.minibatch_size % self.rollout_fragment_length == 0)
and self.minibatch_size <= self.total_train_batch_size
)
):
raise ValueError(
f"`minibatch_size` ({self._minibatch_size}) must either be 'auto' "
f"`minibatch_size` ({self._minibatch_size}) must either be None "
"or a multiple of `rollout_fragment_length` "
f"({self.rollout_fragment_length}) while at the same time smaller "
"than or equal to `total_train_batch_size` "
Expand All @@ -474,20 +459,6 @@ def replay_ratio(self) -> float:
"""
return (1 / self.replay_proportion) if self.replay_proportion > 0 else 0.0

@property
def minibatch_size(self):
# If 'auto', use the train_batch_size (meaning each SGD iter is a single pass
# through the entire train batch). Otherwise, use user provided setting.
return (
(
self.train_batch_size_per_learner
if self.enable_env_runner_and_connector_v2
else self.train_batch_size
)
if self._minibatch_size == "auto"
else self._minibatch_size
)

@override(AlgorithmConfig)
def get_default_learner_class(self):
if self.framework_str == "torch":
Expand Down Expand Up @@ -539,7 +510,7 @@ class IMPALA(Algorithm):
2. If enabled, the replay buffer stores and produces batches of size
`rollout_fragment_length * num_envs_per_env_runner`.
3. If enabled, the minibatch ring buffer stores and replays batches of
size `train_batch_size` up to `num_sgd_iter` times per batch.
size `train_batch_size` up to `num_epochs` times per batch.
4. The learner thread executes data parallel SGD across `num_gpus` GPUs
on batches of size `train_batch_size`.
"""
Expand Down Expand Up @@ -734,6 +705,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
else:
learner_results = self.learner_group.update_from_episodes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder: isn't it possible to just turn over a ray.data.DataIterator ti the learner via update_from_iterator and then iterate over the train batch (as a materialized dataset) in minibatch_size batches?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could run all of this (in the new stack) through the PreLearner to prefetch and make the learner connector run.

Expand All @@ -745,6 +719,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
if not do_async_updates:
learner_results = [learner_results]
Expand Down Expand Up @@ -1292,7 +1269,7 @@ def _learn_on_processed_samples(self) -> ResultDict:
),
},
async_update=async_update,
num_iters=self.config.num_sgd_iter,
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
)
if not async_update:
Expand Down Expand Up @@ -1531,7 +1508,7 @@ def make_learner_thread(local_worker, config):
lr=config["lr"],
train_batch_size=config["train_batch_size"],
num_multi_gpu_tower_stacks=config["num_multi_gpu_tower_stacks"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
num_data_load_threads=config["num_gpu_loader_threads"],
Expand All @@ -1540,7 +1517,7 @@ def make_learner_thread(local_worker, config):
learner_thread = LearnerThread(
local_worker,
minibatch_buffer_size=config["minibatch_buffer_size"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
)
Expand Down
Loading
Loading