Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] AlgorithmConfig: Replace more occurrences of old config dicts; Make all Algorithms use the non-dict lookup for config properties. #30096

Merged
merged 14 commits into from
Nov 10, 2022
6 changes: 3 additions & 3 deletions doc/source/rllib/rllib-algorithms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ in the main Algorithm config and inheriting the `RE3UpdateCallbacks` as shown in
config["exploration_config"] = {
"type": "RE3",
# the dimensionality of the observation embedding vectors in latent space.
"embeds_dim": 128,
"embeds_dim": 128,
"rho": 0.1, # Beta decay factor, used for on-policy algorithm.
"k_nn": 50, # Number of neighbours to set for K-NN entropy estimation.
# Configuration for the encoder network, producing embedding vectors from observations.
Expand All @@ -943,11 +943,11 @@ in the main Algorithm config and inheriting the `RE3UpdateCallbacks` as shown in
"fcnet_activation": "relu",
},
# Hyperparameter to choose between exploration and exploitation. A higher value of beta adds
# more importance to the intrinsic reward, as per the following equation
# more importance to the intrinsic reward, as per the following equation
# `reward = r + beta * intrinsic_reward`
"beta": 0.2,
# Schedule to use for beta decay, one of constant" or "linear_decay".
"beta_schedule": 'constant',
"beta_schedule": 'constant',
# Specify, which exploration sub-type to use (usually, the algo's "default"
# exploration, e.g. EpsilonGreedy for DQN, StochasticSampling for PG/SAC).
"sub_exploration": {
Expand Down
10 changes: 5 additions & 5 deletions rllib/algorithms/a2c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def setup(self, config: PartialAlgorithmConfigDict):
# Create a microbatch variable for collecting gradients on microbatches'.
# These gradients will be accumulated on-the-fly and applied at once (once train
# batch size has been collected) to the model.
if self.config["microbatch_size"]:
if self.config.microbatch_size:
self._microbatches_grads = None
self._microbatches_counts = self._num_microbatches = 0

Expand All @@ -169,7 +169,7 @@ def training_step(self) -> ResultDict:
# W/o microbatching: Identical to Algorithm's default implementation.
# Only difference to a default Algorithm being the value function loss term
# and its value computations alongside each action.
if self.config["microbatch_size"] is None:
if self.config.microbatch_size is None:
return Algorithm.training_step(self)

# In microbatch mode, we want to compute gradients on experience
Expand All @@ -179,11 +179,11 @@ def training_step(self) -> ResultDict:
# used.
if self.config.count_steps_by == "agent_steps":
train_batch = synchronous_parallel_sample(
worker_set=self.workers, max_agent_steps=self.config["microbatch_size"]
worker_set=self.workers, max_agent_steps=self.config.microbatch_size
)
else:
train_batch = synchronous_parallel_sample(
worker_set=self.workers, max_env_steps=self.config["microbatch_size"]
worker_set=self.workers, max_env_steps=self.config.microbatch_size
)
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
Expand All @@ -204,7 +204,7 @@ def training_step(self) -> ResultDict:

# If `train_batch_size` reached: Accumulate gradients and apply.
num_microbatches = math.ceil(
self.config["train_batch_size"] / self.config["microbatch_size"]
self.config.train_batch_size / self.config.microbatch_size
)
if self._num_microbatches >= num_microbatches:
# Update counters.
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def setup(self, config: AlgorithmConfig) -> None:

# Set Algorithm's seed after we have - if necessary - enabled
# tf eager-execution.
update_global_seed_if_necessary(self.config["framework"], self.config["seed"])
update_global_seed_if_necessary(self.config.framework_str, self.config.seed)

self._record_usage(self.config)

Expand Down
22 changes: 9 additions & 13 deletions rllib/algorithms/alpha_star/alpha_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,9 @@ def setup(self, config: AlphaStarConfig):
# one or more GPU nodes.
# - On each such node, also locate one replay buffer shard.

# By default, set max_num_policies_to_train to the number of policy IDs
# provided in the multiagent config.
if self.config["max_num_policies_to_train"] is None:
self.config["max_num_policies_to_train"] = len(
self.workers.local_worker().get_policies_to_train()
)

# Single CPU replay shard (co-located with GPUs so we can place the
# policies on the same machine(s)).
num_gpus = (
0.01 if (self.config["num_gpus"] and not self.config["_fake_gpus"]) else 0
)
num_gpus = 0.01 if (self.config.num_gpus and not self.config._fake_gpus) else 0
ReplayActor = ray.remote(
num_cpus=1,
num_gpus=num_gpus,
Expand All @@ -372,7 +363,12 @@ def setup(self, config: AlphaStarConfig):
# the initial first n learnable policies (found in the config).
distributed_learners = DistributedLearners(
config=self.config,
max_num_policies_to_train=self.config["max_num_policies_to_train"],
# By default, set max_num_policies_to_train to the number of policy IDs
# provided in the multiagent config.
max_num_policies_to_train=(
self.config.max_num_policies_to_train
or len(self.workers.local_worker().get_policies_to_train())
),
replay_actor_class=ReplayActor,
replay_actor_args=replay_actor_args,
)
Expand Down Expand Up @@ -406,15 +402,15 @@ def _set_policy_learners(worker):
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_sampler_worker"
],
ray_wait_timeout_s=self.config["timeout_s_sampler_manager"],
ray_wait_timeout_s=self.config.timeout_s_sampler_manager,
)
policy_actors = [policy_actor for _, policy_actor, _ in distributed_learners]
self._learner_worker_manager = AsyncRequestsManager(
workers=policy_actors,
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_learner_worker"
],
ray_wait_timeout_s=self.config["timeout_s_learner_manager"],
ray_wait_timeout_s=self.config.timeout_s_learner_manager,
)

@override(Algorithm)
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/alpha_star/distributed_learners.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
actor's constructor.
"""
self.config = config
self.num_gpus = self.config["num_gpus"]
self.num_gpus = self.config.num_gpus
self.max_num_policies_to_train = max_num_policies_to_train
self.replay_actor_class = replay_actor_class
self.replay_actor_args = replay_actor_args
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/alpha_zero/alpha_zero.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ def training_step(self) -> ResultDict:
else NUM_ENV_STEPS_SAMPLED
]

if cur_ts > self.config["num_steps_sampled_before_learning_starts"]:
if cur_ts > self.config.num_steps_sampled_before_learning_starts:
train_batch = self.local_replay_buffer.sample(
self.config["train_batch_size"]
self.config.train_batch_size
)
else:
train_batch = None
Expand Down
31 changes: 14 additions & 17 deletions rllib/algorithms/apex_dqn/apex_dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ def setup(self, config: PartialAlgorithmConfigDict):
-len(self.workers.remote_workers()) // 3 :
]

num_replay_buffer_shards = self.config["optimizer"]["num_replay_buffer_shards"]
num_replay_buffer_shards = self.config.optimizer["num_replay_buffer_shards"]

# Create copy here so that we can modify without breaking other logic
replay_actor_config = copy.deepcopy(self.config["replay_buffer_config"])
replay_actor_config = copy.deepcopy(self.config.replay_buffer_config)

replay_actor_config["capacity"] = (
self.config["replay_buffer_config"]["capacity"] // num_replay_buffer_shards
self.config.replay_buffer_config["capacity"] // num_replay_buffer_shards
)

ReplayActor = ray.remote(num_cpus=0)(replay_actor_config["type"])
Expand Down Expand Up @@ -386,14 +386,14 @@ def setup(self, config: PartialAlgorithmConfigDict):
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_replay_worker"
],
ray_wait_timeout_s=self.config["timeout_s_replay_manager"],
ray_wait_timeout_s=self.config.timeout_s_replay_manager,
)
self._sampling_actor_manager = AsyncRequestsManager(
self.workers.remote_workers(),
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_sampler_worker"
],
ray_wait_timeout_s=self.config["timeout_s_sampler_manager"],
ray_wait_timeout_s=self.config.timeout_s_sampler_manager,
)
self.learner_thread = LearnerThread(self.workers.local_worker())
self.learner_thread.start()
Expand Down Expand Up @@ -432,7 +432,7 @@ def training_step(self) -> ResultDict:
else NUM_ENV_STEPS_SAMPLED
]

if cur_ts > self.config["num_steps_sampled_before_learning_starts"]:
if cur_ts > self.config.num_steps_sampled_before_learning_starts:
# trigger a sample from the replay actors and enqueue operation to the
# learner thread.
self.sample_from_replay_buffer_place_on_learner_queue_non_blocking(
Expand Down Expand Up @@ -497,7 +497,7 @@ def update_workers(self, _num_samples_ready: Dict[ActorHandle, int]) -> int:
Returns:
The number of remote workers whose weights were updated.
"""
max_steps_weight_sync_delay = self.config["optimizer"]["max_weight_sync_delay"]
max_steps_weight_sync_delay = self.config.optimizer["max_weight_sync_delay"]
# Update our local copy of the weights if the learner thread has updated
# the learner worker's weights
policy_ids_updated = self.learner_thread.policy_ids_updated.copy()
Expand Down Expand Up @@ -566,17 +566,17 @@ def wait_on_replay_actors() -> List[Tuple[ActorHandle, SampleBatchType]]:
num_samples_collected = sum(num_worker_samples_collected.values())
self.curr_num_samples_collected += num_samples_collected
replay_sample_batches = wait_on_replay_actors()
if self.curr_num_samples_collected >= self.config["train_batch_size"]:
training_intensity = int(self.config["training_intensity"] or 1)
if self.curr_num_samples_collected >= self.config.train_batch_size:
training_intensity = int(self.config.training_intensity or 1)
num_requests_to_launch = (
self.curr_num_samples_collected / self.config["train_batch_size"]
self.curr_num_samples_collected / self.config.train_batch_size
) * training_intensity
num_requests_to_launch = max(1, round(num_requests_to_launch))
self.curr_num_samples_collected = 0
for _ in range(num_requests_to_launch):
self._replay_actor_manager.call(
lambda actor, num_items: actor.sample(num_items),
fn_args=[self.config["train_batch_size"]],
fn_args=[self.config.train_batch_size],
)
replay_sample_batches.extend(wait_on_replay_actors())

Expand All @@ -603,10 +603,7 @@ def update_replay_sample_priority(self) -> None:
env_steps,
agent_steps,
) = self.learner_thread.outqueue.get(timeout=0.001)
if (
self.config["replay_buffer_config"].get("prioritized_replay_alpha")
> 0
):
if self.config.replay_buffer_config.get("prioritized_replay_alpha") > 0:
replay_actor.update_priorities.remote(priority_dict)
num_samples_trained_this_itr += env_steps
self.update_target_networks(env_steps)
Expand All @@ -627,7 +624,7 @@ def update_target_networks(self, num_new_trained_samples) -> None:
self._num_ts_trained_since_last_target_update += num_new_trained_samples
if (
self._num_ts_trained_since_last_target_update
>= self.config["target_network_update_freq"]
>= self.config.target_network_update_freq
):
self._num_ts_trained_since_last_target_update = 0
with self._timers[TARGET_NET_UPDATE_TIMER]:
Expand Down Expand Up @@ -662,7 +659,7 @@ def on_worker_failures(
def _compile_iteration_results(self, *args, **kwargs):
result = super()._compile_iteration_results(*args, **kwargs)
replay_stats = ray.get(
self._replay_actors[0].stats.remote(self.config["optimizer"].get("debug"))
self._replay_actors[0].stats.remote(self.config.optimizer.get("debug"))
)
exploration_infos_list = self.workers.foreach_policy_to_train(
lambda p, pid: {pid: p.get_exploration_state()}
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def __call__(self, fetches):
lambda p, _: p.update_target()
)
# Also update KL Coeff
if self.config["use_kl_loss"]:
if self.config.use_kl_loss:
self.update_kl(fetches)


Expand Down Expand Up @@ -231,7 +231,7 @@ def after_train_step(self, train_results: ResultDict) -> None:
]
last_update = self._counters[LAST_TARGET_UPDATE_TS]
target_update_freq = (
self.config["num_sgd_iter"] * self.config["minibatch_buffer_size"]
self.config.num_sgd_iter * self.config.minibatch_buffer_size
)
if cur_ts - last_update > target_update_freq:
self._counters[NUM_TARGET_UPDATES] += 1
Expand All @@ -243,7 +243,7 @@ def after_train_step(self, train_results: ResultDict) -> None:
)

# Also update the KL-coefficient for the APPO loss, if necessary.
if self.config["use_kl_loss"]:
if self.config.use_kl_loss:

def update(pi, pi_id):
assert LEARNER_STATS_KEY not in train_results, (
Expand Down
Loading