From 04c328488a4f378a21185a9ba08b7a98fab17104 Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Wed, 19 May 2021 17:43:44 +0200 Subject: [PATCH 1/6] fix cur_lr not updated on local worker --- rllib/agents/dqn/apex.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index 47651868cfb3..17c6b9a961da 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -133,6 +133,8 @@ def __call__(self, item: Tuple[ActorHandle, SampleBatchType]): self.weights = ray.put( self.workers.local_worker().get_weights()) actor.set_weights.remote(self.weights, _get_global_vars()) + # Also update global vars of the local worker. + self.workers.local_worker().set_global_vars(_get_global_vars()) self.steps_since_update[actor] = 0 # Update metrics. metrics = _get_shared_metrics() From da04f75ad1c44951330a8865aa09c624c3390d1f Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Wed, 19 May 2021 17:44:02 +0200 Subject: [PATCH 2/6] refactor LearningRateScheduler mixin (placeholder removed) --- rllib/policy/tf_policy.py | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 0f22d7fe55cc..d239b7f246d1 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -946,31 +946,20 @@ class LearningRateSchedule: @DeveloperAPI def __init__(self, lr, lr_schedule): - self._lr_schedule = None - if lr_schedule is None: - self.cur_lr = tf1.get_variable( - "lr", initializer=lr, trainable=False) - else: + self.cur_lr = tf1.get_variable("lr", initializer=lr, trainable=False) + self._lr_schedule = lr_schedule + if self._lr_schedule is not None: self._lr_schedule = PiecewiseSchedule( lr_schedule, outside_value=lr_schedule[-1][-1], framework=None) - self.cur_lr = tf1.get_variable( - "lr", initializer=self._lr_schedule.value(0), trainable=False) - if self.framework == "tf": - self._lr_placeholder = tf1.placeholder( - dtype=tf.float32, name="lr") - self._lr_update = self.cur_lr.assign( - self._lr_placeholder, read_value=False) @override(Policy) def on_global_var_update(self, global_vars): - super(LearningRateSchedule, self).on_global_var_update(global_vars) + super().on_global_var_update(global_vars) if self._lr_schedule is not None: new_val = self._lr_schedule.value(global_vars["timestep"]) - if self.framework == "tf": - self._sess.run( - self._lr_update, feed_dict={self._lr_placeholder: new_val}) - else: - self.cur_lr.assign(new_val, read_value=False) + op_or_none = self.cur_lr.assign(new_val, read_value=False) + if self._sess is not None: + self._sess.run(op_or_none) @override(TFPolicy) def optimizer(self): From c42b604704b585d1f0218760d54b32bb8132b394 Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Wed, 19 May 2021 20:20:34 +0200 Subject: [PATCH 3/6] init lr with schedule when provided --- rllib/policy/tf_policy.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index d239b7f246d1..6e02b164465b 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -946,8 +946,9 @@ class LearningRateSchedule: @DeveloperAPI def __init__(self, lr, lr_schedule): - self.cur_lr = tf1.get_variable("lr", initializer=lr, trainable=False) self._lr_schedule = lr_schedule + init_val = lr if self._lr_schedule is None else self._lr_schedule[0][-1] + self.cur_lr = tf1.get_variable("lr", initializer=init_val, trainable=False) if self._lr_schedule is not None: self._lr_schedule = PiecewiseSchedule( lr_schedule, outside_value=lr_schedule[-1][-1], framework=None) From bf92eefececbf9bc1612ffacc7b7d57ce0e1bc14 Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Wed, 19 May 2021 21:11:31 +0200 Subject: [PATCH 4/6] format --- rllib/policy/tf_policy.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 6e02b164465b..61afa0c0ae9e 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -946,12 +946,17 @@ class LearningRateSchedule: @DeveloperAPI def __init__(self, lr, lr_schedule): - self._lr_schedule = lr_schedule - init_val = lr if self._lr_schedule is None else self._lr_schedule[0][-1] - self.cur_lr = tf1.get_variable("lr", initializer=init_val, trainable=False) - if self._lr_schedule is not None: + self._lr_schedule = None + if lr_schedule is not None: self._lr_schedule = PiecewiseSchedule( lr_schedule, outside_value=lr_schedule[-1][-1], framework=None) + self.cur_lr = tf1.get_variable("lr", + initializer=lr_schedule[0][-1], + trainable=False) + else: + self.cur_lr = tf1.get_variable("lr", + initializer=lr, + trainable=False) @override(Policy) def on_global_var_update(self, global_vars): From 8356fdf477da40acae8d9364107cd74296b6f3aa Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Wed, 19 May 2021 22:07:20 +0200 Subject: [PATCH 5/6] format --- rllib/policy/tf_policy.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 61afa0c0ae9e..318a5bd7c688 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -950,13 +950,11 @@ def __init__(self, lr, lr_schedule): if lr_schedule is not None: self._lr_schedule = PiecewiseSchedule( lr_schedule, outside_value=lr_schedule[-1][-1], framework=None) - self.cur_lr = tf1.get_variable("lr", - initializer=lr_schedule[0][-1], - trainable=False) + self.cur_lr = tf1.get_variable( + "lr", initializer=lr_schedule[0][-1], trainable=False) else: - self.cur_lr = tf1.get_variable("lr", - initializer=lr, - trainable=False) + self.cur_lr = tf1.get_variable( + "lr", initializer=lr, trainable=False) @override(Policy) def on_global_var_update(self, global_vars): From 81f7bb212ed0983795d11bb879ee1db168c343c7 Mon Sep 17 00:00:00 2001 From: antoine_galataud Date: Fri, 4 Jun 2021 09:31:34 +0200 Subject: [PATCH 6/6] revert change --- rllib/policy/tf_policy.py | 47 +++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/rllib/policy/tf_policy.py b/rllib/policy/tf_policy.py index 0f22d7fe55cc..83b264a5d814 100644 --- a/rllib/policy/tf_policy.py +++ b/rllib/policy/tf_policy.py @@ -15,7 +15,7 @@ from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.debug import summarize from ray.rllib.utils.framework import try_import_tf, get_variable -from ray.rllib.utils.schedules import ConstantSchedule, PiecewiseSchedule +from ray.rllib.utils.schedules import PiecewiseSchedule from ray.rllib.utils.tf_run_builder import TFRunBuilder from ray.rllib.utils.typing import ModelGradients, TensorType, \ TrainerConfigDict @@ -971,6 +971,7 @@ def on_global_var_update(self, global_vars): self._lr_update, feed_dict={self._lr_placeholder: new_val}) else: self.cur_lr.assign(new_val, read_value=False) + self._optimizer.learning_rate.assign(self.cur_lr) @override(TFPolicy) def optimizer(self): @@ -983,35 +984,47 @@ class EntropyCoeffSchedule: @DeveloperAPI def __init__(self, entropy_coeff, entropy_coeff_schedule): - self.entropy_coeff = get_variable( - entropy_coeff, - framework="tf", - tf_name="entropy_coeff", - trainable=False) - + self._entropy_coeff_schedule = None if entropy_coeff_schedule is None: - self.entropy_coeff_schedule = ConstantSchedule( - entropy_coeff, framework=None) + self.entropy_coeff = get_variable( + entropy_coeff, + framework="tf", + tf_name="entropy_coeff", + trainable=False) else: # Allows for custom schedule similar to lr_schedule format if isinstance(entropy_coeff_schedule, list): - self.entropy_coeff_schedule = PiecewiseSchedule( + self._entropy_coeff_schedule = PiecewiseSchedule( entropy_coeff_schedule, outside_value=entropy_coeff_schedule[-1][-1], framework=None) else: # Implements previous version but enforces outside_value - self.entropy_coeff_schedule = PiecewiseSchedule( + self._entropy_coeff_schedule = PiecewiseSchedule( [[0, entropy_coeff], [entropy_coeff_schedule, 0.0]], outside_value=0.0, framework=None) + self.entropy_coeff = get_variable( + self._entropy_coeff_schedule.value(0), + framework="tf", + tf_name="entropy_coeff", + trainable=False) + if self.framework == "tf": + self._entropy_coeff_placeholder = tf1.placeholder( + dtype=tf.float32, name="entropy_coeff") + self._entropy_coeff_update = self.entropy_coeff.assign( + self._entropy_coeff_placeholder, read_value=False) + @override(Policy) def on_global_var_update(self, global_vars): super(EntropyCoeffSchedule, self).on_global_var_update(global_vars) - op_or_none = self.entropy_coeff.assign( - self.entropy_coeff_schedule.value(global_vars["timestep"]), - read_value=False, # return tf op (None in eager mode). - ) - if self._sess is not None: - self._sess.run(op_or_none) + if self._entropy_coeff_schedule is not None: + new_val = self._entropy_coeff_schedule.value( + global_vars["timestep"]) + if self.framework == "tf": + self._sess.run( + self._entropy_coeff_update, + feed_dict={self._entropy_coeff_placeholder: new_val}) + else: + self.entropy_coeff.assign(new_val, read_value=False)