diff --git a/rllib/BUILD b/rllib/BUILD index 2a5e4f0f0e69..4ae15d490537 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -81,7 +81,7 @@ py_test( ) py_test( - name = "learning_cartpole_a2c_fake_gpus", + name = "learning_tests_cartpole_a2c_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -126,15 +126,22 @@ py_test( # APPO py_test( - name = "learning_tests_cartpole_appo", + name = "learning_tests_cartpole_appo_no_vtrace", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"], size = "large", srcs = ["tests/run_regression_tests.py"], - data = [ - "tuned_examples/ppo/cartpole-appo.yaml", - "tuned_examples/ppo/cartpole-appo-vtrace.yaml" - ], + data = ["tuned_examples/ppo/cartpole-appo.yaml"], + args = ["--yaml-dir=tuned_examples/ppo"] +) + +py_test( + name = "learning_tests_cartpole_appo_vtrace", + main = "tests/run_regression_tests.py", + tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"], + size = "large", + srcs = ["tests/run_regression_tests.py"], + data = ["tuned_examples/ppo/cartpole-appo-vtrace.yaml"], args = ["--yaml-dir=tuned_examples/ppo"] ) @@ -151,7 +158,7 @@ py_test( ) py_test( - name = "learning_frozenlake_appo", + name = "learning_tests_frozenlake_appo", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_discrete"], size = "large", @@ -161,7 +168,7 @@ py_test( ) py_test( - name = "learning_cartpole_appo_fake_gpus", + name = "learning_tests_cartpole_appo_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -208,7 +215,7 @@ py_test( ) py_test( - name = "learning_pendulum_ddpg_fake_gpus", + name = "learning_tests_pendulum_ddpg_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous", "fake_gpus"], size = "large", @@ -263,7 +270,7 @@ py_test( ) py_test( - name = "learning_cartpole_dqn_fake_gpus", + name = "learning_tests_cartpole_dqn_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -286,7 +293,7 @@ py_test( ) py_test( - name = "learning_cartpole_simpleq_fake_gpus", + name = "learning_tests_cartpole_simpleq_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "medium", @@ -318,7 +325,7 @@ py_test( ) py_test( - name = "learning_cartpole_impala_fake_gpus", + name = "learning_tests_cartpole_impala_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -352,7 +359,7 @@ py_test( ) py_test( - name = "learning_cartpole_pg_fake_gpus", + name = "learning_tests_cartpole_pg_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -403,7 +410,7 @@ py_test( ) py_test( - name = "learning_cartpole_ppo_fake_gpus", + name = "learning_tests_cartpole_ppo_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "fake_gpus"], size = "large", @@ -455,7 +462,7 @@ py_test( ) py_test( - name = "learning_stateless_cartpole_r2d2_fake_gpus", + name = "learning_tests_stateless_cartpole_r2d2_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_cartpole", "fake_gpus"], size = "large", @@ -506,7 +513,7 @@ py_test( ) py_test( - name = "learning_pendulum_sac_fake_gpus", + name = "learning_tests_pendulum_sac_fake_gpus", main = "tests/run_regression_tests.py", tags = ["team:ml", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous", "fake_gpus"], size = "large", @@ -845,7 +852,7 @@ py_test( "--env", "Pendulum-v1", "--run", "APEX_DDPG", "--stop", "'{\"training_iteration\": 1}'", - "--config", "'{\"framework\": \"tf\", \"num_workers\": 2, \"optimizer\": {\"num_replay_buffer_shards\": 1}, \"learning_starts\": 100, \"min_iter_time_s\": 1, \"batch_mode\": \"complete_episodes\"}'", + "--config", "'{\"framework\": \"tf\", \"num_workers\": 2, \"optimizer\": {\"num_replay_buffer_shards\": 1}, \"learning_starts\": 100, \"min_time_s_per_reporting\": 1, \"batch_mode\": \"complete_episodes\"}'", "--ray-num-cpus", "4", ] ) @@ -928,7 +935,7 @@ py_test( "--env", "CartPole-v0", "--run", "IMPALA", "--stop", "'{\"training_iteration\": 1}'", - "--config", "'{\"framework\": \"tf\", \"num_gpus\": 0, \"num_workers\": 2, \"min_iter_time_s\": 1, \"num_multi_gpu_tower_stacks\": 2, \"replay_buffer_num_slots\": 100, \"replay_proportion\": 1.0}'", + "--config", "'{\"framework\": \"tf\", \"num_gpus\": 0, \"num_workers\": 2, \"min_time_s_per_reporting\": 1, \"num_multi_gpu_tower_stacks\": 2, \"replay_buffer_num_slots\": 100, \"replay_proportion\": 1.0}'", "--ray-num-cpus", "4", ] ) @@ -942,7 +949,7 @@ py_test( "--env", "CartPole-v0", "--run", "IMPALA", "--stop", "'{\"training_iteration\": 1}'", - "--config", "'{\"framework\": \"tf\", \"num_gpus\": 0, \"num_workers\": 2, \"min_iter_time_s\": 1, \"num_multi_gpu_tower_stacks\": 2, \"replay_buffer_num_slots\": 100, \"replay_proportion\": 1.0, \"model\": {\"use_lstm\": true}}'", + "--config", "'{\"framework\": \"tf\", \"num_gpus\": 0, \"num_workers\": 2, \"min_time_s_per_reporting\": 1, \"num_multi_gpu_tower_stacks\": 2, \"replay_buffer_num_slots\": 100, \"replay_proportion\": 1.0, \"model\": {\"use_lstm\": true}}'", "--ray-num-cpus", "4", ] ) diff --git a/rllib/agents/a3c/a2c.py b/rllib/agents/a3c/a2c.py index ff9eb15989db..5932b345e23e 100644 --- a/rllib/agents/a3c/a2c.py +++ b/rllib/agents/a3c/a2c.py @@ -17,7 +17,7 @@ A3C_CONFIG, { "rollout_fragment_length": 20, - "min_iter_time_s": 10, + "min_time_s_per_reporting": 10, "sample_async": False, # A2C supports microbatching, in which we accumulate gradients over diff --git a/rllib/agents/a3c/a3c.py b/rllib/agents/a3c/a3c.py index e7a48e01f08c..22b48c82a8d1 100644 --- a/rllib/agents/a3c/a3c.py +++ b/rllib/agents/a3c/a3c.py @@ -39,8 +39,8 @@ "entropy_coeff": 0.01, # Entropy coefficient schedule "entropy_coeff_schedule": None, - # Min time per iteration - "min_iter_time_s": 5, + # Min time per reporting + "min_time_s_per_reporting": 5, # Workers sample async. Note that this increases the effective # rollout_fragment_length by up to 5x due to async buffering of batches. "sample_async": True, diff --git a/rllib/agents/a3c/tests/test_a2c.py b/rllib/agents/a3c/tests/test_a2c.py index 7d2547a5249b..6fb1e89ef4d6 100644 --- a/rllib/agents/a3c/tests/test_a2c.py +++ b/rllib/agents/a3c/tests/test_a2c.py @@ -35,7 +35,7 @@ def test_a2c_compilation(self): trainer.stop() def test_a2c_exec_impl(ray_start_regular): - config = {"min_iter_time_s": 0} + config = {"min_time_s_per_reporting": 0} for _ in framework_iterator(config): trainer = a3c.A2CTrainer(env="CartPole-v0", config=config) results = trainer.train() @@ -46,7 +46,7 @@ def test_a2c_exec_impl(ray_start_regular): def test_a2c_exec_impl_microbatch(ray_start_regular): config = { - "min_iter_time_s": 0, + "min_time_s_per_reporting": 0, "microbatch_size": 10, } for _ in framework_iterator(config): diff --git a/rllib/agents/a3c/tests/test_a3c.py b/rllib/agents/a3c/tests/test_a3c.py index e6b9f7a229f2..c4519d6de7f8 100644 --- a/rllib/agents/a3c/tests/test_a3c.py +++ b/rllib/agents/a3c/tests/test_a3c.py @@ -51,7 +51,7 @@ def test_a3c_entropy_coeff_schedule(self): config["timesteps_per_iteration"] = 20 # 0 metrics reporting delay, this makes sure timestep, # which entropy coeff depends on, is updated after each worker rollout. - config["min_iter_time_s"] = 0 + config["min_time_s_per_reporting"] = 0 # Initial lr, doesn't really matter because of the schedule below. config["entropy_coeff"] = 0.01 schedule = [ diff --git a/rllib/agents/ars/ars.py b/rllib/agents/ars/ars.py index 840c6459e0c1..e5b507883b63 100644 --- a/rllib/agents/ars/ars.py +++ b/rllib/agents/ars/ars.py @@ -228,30 +228,44 @@ def validate_config(self, config: TrainerConfigDict) -> None: "`NoFilter` for ARS!") @override(Trainer) - def _init(self, config, env_creator): - self.validate_config(config) - env_context = EnvContext(config["env_config"] or {}, worker_index=0) - env = env_creator(env_context) + def setup(self, config): + # Setup our config: Merge the user-supplied config (which could + # be a partial config dict with the class' default). + self.config = self.merge_trainer_configs( + self.get_default_config(), config, self._allow_unknown_configs) - self._policy_class = get_policy_class(config) + # Validate our config dict. + self.validate_config(self.config) + + # Generate `self.env_creator` callable to create an env instance. + self.env_creator = self._get_env_creator_from_env_id(self._env_id) + # Generate the local env. + env_context = EnvContext( + self.config["env_config"] or {}, worker_index=0) + env = self.env_creator(env_context) + + self.callbacks = self.config["callbacks"]() + + self._policy_class = get_policy_class(self.config) self.policy = self._policy_class(env.observation_space, - env.action_space, config) - self.optimizer = optimizers.SGD(self.policy, config["sgd_stepsize"]) + env.action_space, self.config) + self.optimizer = optimizers.SGD(self.policy, + self.config["sgd_stepsize"]) - self.rollouts_used = config["rollouts_used"] - self.num_rollouts = config["num_rollouts"] - self.report_length = config["report_length"] + self.rollouts_used = self.config["rollouts_used"] + self.num_rollouts = self.config["num_rollouts"] + self.report_length = self.config["report_length"] # Create the shared noise table. logger.info("Creating shared noise table.") - noise_id = create_shared_noise.remote(config["noise_size"]) + noise_id = create_shared_noise.remote(self.config["noise_size"]) self.noise = SharedNoiseTable(ray.get(noise_id)) # Create the actors. logger.info("Creating actors.") self.workers = [ - Worker.remote(config, env_creator, noise_id, idx + 1) - for idx in range(config["num_workers"]) + Worker.remote(self.config, self.env_creator, noise_id, idx + 1) + for idx in range(self.config["num_workers"]) ] self.episodes_so_far = 0 @@ -375,7 +389,7 @@ def compute_single_action(self, observation, *args, **kwargs): return action[0], [], {} return action[0] - @Deprecated(new="compute_single_action", error=False) + @Deprecated(new="compute_single_action", error=True) def compute_action(self, observation, *args, **kwargs): return self.compute_single_action(observation, *args, **kwargs) diff --git a/rllib/agents/ars/tests/test_ars.py b/rllib/agents/ars/tests/test_ars.py index a78353de44ac..31a3e0210cfd 100644 --- a/rllib/agents/ars/tests/test_ars.py +++ b/rllib/agents/ars/tests/test_ars.py @@ -22,7 +22,8 @@ def test_ars_compilation(self): config["model"]["fcnet_hiddens"] = [10] config["model"]["fcnet_activation"] = None config["noise_size"] = 2500000 - # Test eval workers ("normal" Trainer eval WorkerSet, unusual for ARS). + # Test eval workers ("normal" WorkerSet, unlike ARS' list of + # RolloutWorkers used for collecting train batches). config["evaluation_interval"] = 1 config["evaluation_num_workers"] = 1 diff --git a/rllib/agents/ddpg/apex.py b/rllib/agents/ddpg/apex.py index 8669c4e90d86..73aada3c8852 100644 --- a/rllib/agents/ddpg/apex.py +++ b/rllib/agents/ddpg/apex.py @@ -38,7 +38,7 @@ "target_network_update_freq": 500000, "timesteps_per_iteration": 25000, "worker_side_prioritization": True, - "min_iter_time_s": 30, + "min_time_s_per_reporting": 30, }, _allow_unknown_configs=True, ) diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 7bd079683f44..e21370ca21bb 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -171,8 +171,8 @@ "num_workers": 0, # Whether to compute priorities on workers. "worker_side_prioritization": False, - # Prevent iterations from going lower than this time span - "min_iter_time_s": 1, + # Prevent reporting frequency from going lower than this time span. + "min_time_s_per_reporting": 1, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/agents/ddpg/tests/test_apex_ddpg.py b/rllib/agents/ddpg/tests/test_apex_ddpg.py index f40799392021..bea1e174d4c9 100644 --- a/rllib/agents/ddpg/tests/test_apex_ddpg.py +++ b/rllib/agents/ddpg/tests/test_apex_ddpg.py @@ -20,7 +20,7 @@ def test_apex_ddpg_compilation_and_per_worker_epsilon_values(self): config["num_workers"] = 2 config["prioritized_replay"] = True config["timesteps_per_iteration"] = 100 - config["min_iter_time_s"] = 1 + config["min_time_s_per_reporting"] = 1 config["learning_starts"] = 0 config["optimizer"]["num_replay_buffer_shards"] = 1 num_iterations = 1 diff --git a/rllib/agents/ddpg/tests/test_ddpg.py b/rllib/agents/ddpg/tests/test_ddpg.py index 74333300196e..7489c4ffb557 100644 --- a/rllib/agents/ddpg/tests/test_ddpg.py +++ b/rllib/agents/ddpg/tests/test_ddpg.py @@ -154,7 +154,7 @@ def test_ddpg_loss_function(self): config["actor_hiddens"] = [10] config["critic_hiddens"] = [10] # Make sure, timing differences do not affect trainer.train(). - config["min_iter_time_s"] = 0 + config["min_time_s_per_reporting"] = 0 config["timesteps_per_iteration"] = 100 map_ = { diff --git a/rllib/agents/dqn/apex.py b/rllib/agents/dqn/apex.py index d9d1ab3f536b..883230f4a973 100644 --- a/rllib/agents/dqn/apex.py +++ b/rllib/agents/dqn/apex.py @@ -78,7 +78,7 @@ "timesteps_per_iteration": 25000, "exploration_config": {"type": "PerWorkerEpsilonGreedy"}, "worker_side_prioritization": True, - "min_iter_time_s": 30, + "min_time_s_per_reporting": 30, # If set, this will fix the ratio of replayed from a buffer and learned # on timesteps to sampled from an environment and stored in the replay # buffer timesteps. Otherwise, replay will proceed as fast as possible. diff --git a/rllib/agents/dqn/simple_q.py b/rllib/agents/dqn/simple_q.py index ba99f1c28a50..69bb7ef17791 100644 --- a/rllib/agents/dqn/simple_q.py +++ b/rllib/agents/dqn/simple_q.py @@ -103,8 +103,8 @@ # to increase if your environment is particularly slow to sample, or if # you"re using the Async or Ape-X optimizers. "num_workers": 0, - # Prevent iterations from going lower than this time span. - "min_iter_time_s": 1, + # Prevent reporting frequency from going lower than this time span. + "min_time_s_per_reporting": 1, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/agents/dqn/tests/test_apex_dqn.py b/rllib/agents/dqn/tests/test_apex_dqn.py index 1f4f3f749915..b73bc397fc39 100644 --- a/rllib/agents/dqn/tests/test_apex_dqn.py +++ b/rllib/agents/dqn/tests/test_apex_dqn.py @@ -24,7 +24,7 @@ def test_apex_zero_workers(self): config["learning_starts"] = 1000 config["prioritized_replay"] = True config["timesteps_per_iteration"] = 100 - config["min_iter_time_s"] = 1 + config["min_time_s_per_reporting"] = 1 config["optimizer"]["num_replay_buffer_shards"] = 1 for _ in framework_iterator(config): trainer = apex.ApexTrainer(config=config, env="CartPole-v0") @@ -41,7 +41,7 @@ def test_apex_dqn_compilation_and_per_worker_epsilon_values(self): config["learning_starts"] = 1000 config["prioritized_replay"] = True config["timesteps_per_iteration"] = 100 - config["min_iter_time_s"] = 1 + config["min_time_s_per_reporting"] = 1 config["optimizer"]["num_replay_buffer_shards"] = 1 for _ in framework_iterator(config, with_eager_tracing=True): @@ -81,7 +81,7 @@ def test_apex_lr_schedule(self): config["timesteps_per_iteration"] = 10 # 0 metrics reporting delay, this makes sure timestep, # which lr depends on, is updated after each worker rollout. - config["min_iter_time_s"] = 0 + config["min_time_s_per_reporting"] = 0 config["optimizer"]["num_replay_buffer_shards"] = 1 # This makes sure learning schedule is checked every 10 timesteps. config["optimizer"]["max_weight_sync_delay"] = 10 diff --git a/rllib/agents/es/es.py b/rllib/agents/es/es.py index 30fd1403146f..b4553772a0bb 100644 --- a/rllib/agents/es/es.py +++ b/rllib/agents/es/es.py @@ -228,28 +228,42 @@ def validate_config(self, config: TrainerConfigDict) -> None: "`NoFilter` for ES!") @override(Trainer) - def _init(self, config, env_creator): - self.validate_config(config) - env_context = EnvContext(config["env_config"] or {}, worker_index=0) - env = env_creator(env_context) - self._policy_class = get_policy_class(config) + def setup(self, config): + # Setup our config: Merge the user-supplied config (which could + # be a partial config dict with the class' default). + self.config = self.merge_trainer_configs( + self.get_default_config(), config, self._allow_unknown_configs) + + # Call super's validation method. + self.validate_config(self.config) + + # Generate `self.env_creator` callable to create an env instance. + self.env_creator = self._get_env_creator_from_env_id(self._env_id) + # Generate the local env. + env_context = EnvContext( + self.config["env_config"] or {}, worker_index=0) + env = self.env_creator(env_context) + + self.callbacks = self.config["callbacks"]() + + self._policy_class = get_policy_class(self.config) self.policy = self._policy_class( obs_space=env.observation_space, action_space=env.action_space, - config=config) - self.optimizer = optimizers.Adam(self.policy, config["stepsize"]) - self.report_length = config["report_length"] + config=self.config) + self.optimizer = optimizers.Adam(self.policy, self.config["stepsize"]) + self.report_length = self.config["report_length"] # Create the shared noise table. logger.info("Creating shared noise table.") - noise_id = create_shared_noise.remote(config["noise_size"]) + noise_id = create_shared_noise.remote(self.config["noise_size"]) self.noise = SharedNoiseTable(ray.get(noise_id)) # Create the actors. logger.info("Creating actors.") - self._workers = [ - Worker.remote(config, {}, env_creator, noise_id, idx + 1) - for idx in range(config["num_workers"]) + self.workers = [ + Worker.remote(self.config, {}, self.env_creator, noise_id, idx + 1) + for idx in range(self.config["num_workers"]) ] self.episodes_so_far = 0 @@ -333,7 +347,7 @@ def step_attempt(self): # Now sync the filters FilterManager.synchronize({ DEFAULT_POLICY_ID: self.policy.observation_filter - }, self._workers) + }, self.workers) info = { "weights_norm": np.square(theta).sum(), @@ -375,7 +389,7 @@ def _sync_weights_to_workers(self, *, worker_set=None, workers=None): @override(Trainer) def cleanup(self): # workaround for https://github.com/ray-project/ray/issues/1516 - for w in self._workers: + for w in self.workers: w.__ray_terminate__.remote() def _collect_results(self, theta_id, min_episodes, min_timesteps): @@ -386,7 +400,7 @@ def _collect_results(self, theta_id, min_episodes, min_timesteps): "Collected {} episodes {} timesteps so far this iter".format( num_episodes, num_timesteps)) rollout_ids = [ - worker.do_rollouts.remote(theta_id) for worker in self._workers + worker.do_rollouts.remote(theta_id) for worker in self.workers ] # Get the results of the rollouts. for result in ray.get(rollout_ids): @@ -413,4 +427,4 @@ def __setstate__(self, state): self.policy.observation_filter = state["filter"] FilterManager.synchronize({ DEFAULT_POLICY_ID: self.policy.observation_filter - }, self._workers) + }, self.workers) diff --git a/rllib/agents/impala/impala.py b/rllib/agents/impala/impala.py index 12f3b7959114..e0d9c56fbe6e 100644 --- a/rllib/agents/impala/impala.py +++ b/rllib/agents/impala/impala.py @@ -50,7 +50,7 @@ # "rollout_fragment_length": 50, "train_batch_size": 500, - "min_iter_time_s": 10, + "min_time_s_per_reporting": 10, "num_workers": 2, # Number of GPUs the learner should use. "num_gpus": 1, diff --git a/rllib/agents/impala/tests/test_impala.py b/rllib/agents/impala/tests/test_impala.py index 1af4ab95e3dc..a9d863785856 100644 --- a/rllib/agents/impala/tests/test_impala.py +++ b/rllib/agents/impala/tests/test_impala.py @@ -56,10 +56,10 @@ def test_impala_lr_schedule(self): config = impala.DEFAULT_CONFIG.copy() config["num_gpus"] = 0 # Test whether we correctly ignore the "lr" setting. - # The first lr should be 0.0005. + # The first lr should be 0.05. config["lr"] = 0.1 config["lr_schedule"] = [ - [0, 0.0005], + [0, 0.05], [10000, 0.000001], ] config["num_gpus"] = 0 # Do not use any (fake) GPUs. @@ -69,18 +69,27 @@ def get_lr(result): return result["info"][LEARNER_INFO][DEFAULT_POLICY_ID][ LEARNER_STATS_KEY]["cur_lr"] - for fw in framework_iterator(config, frameworks=("tf", "torch")): + for fw in framework_iterator(config): trainer = impala.ImpalaTrainer(config=config) policy = trainer.get_policy() try: if fw == "tf": - check(policy.get_session().run(policy.cur_lr), 0.0005) + check(policy.get_session().run(policy.cur_lr), 0.05) else: - check(policy.cur_lr, 0.0005) + check(policy.cur_lr, 0.05) r1 = trainer.train() r2 = trainer.train() - assert get_lr(r2) < get_lr(r1), (r1, r2) + r3 = trainer.train() + # Due to the asynch'ness of IMPALA, learner-stats metrics + # could be delayed by one iteration. Do 3 train() calls here + # and measure guaranteed decrease in lr between 1st and 3rd. + lr1 = get_lr(r1) + lr2 = get_lr(r2) + lr3 = get_lr(r3) + assert lr2 <= lr1, (lr1, lr2) + assert lr3 <= lr2, (lr2, lr3) + assert lr3 < lr1, (lr1, lr3) finally: trainer.stop() diff --git a/rllib/agents/mock.py b/rllib/agents/mock.py index 664d2a0f8c5b..5cce10df4ff8 100644 --- a/rllib/agents/mock.py +++ b/rllib/agents/mock.py @@ -27,10 +27,22 @@ def get_default_config(cls) -> TrainerConfigDict: def default_resource_request(cls, config): return None - def _init(self, config, env_creator): + @override(Trainer) + def setup(self, config): + # Setup our config: Merge the user-supplied config (which could + # be a partial config dict with the class' default). + self.config = self.merge_trainer_configs( + self.get_default_config(), config, self._allow_unknown_configs) + self.config["env"] = self._env_id + + self.validate_config(self.config) + self.callbacks = self.config["callbacks"]() + + # Add needed properties. self.info = None self.restored = False + @override(Trainer) def step(self): if self.config["mock_error"] and self.iteration == 1 \ and (self.config["persistent_error"] or not self.restored): @@ -45,19 +57,23 @@ def step(self): result.update({tune_result.SHOULD_CHECKPOINT: True}) return result + @override(Trainer) def save_checkpoint(self, checkpoint_dir): path = os.path.join(checkpoint_dir, "mock_agent.pkl") with open(path, "wb") as f: pickle.dump(self.info, f) return path + @override(Trainer) def load_checkpoint(self, checkpoint_path): with open(checkpoint_path, "rb") as f: info = pickle.load(f) self.info = info self.restored = True + @override(Trainer) def _register_if_needed(self, env_object, config): + # No env to register. pass def set_info(self, info): diff --git a/rllib/agents/ppo/appo.py b/rllib/agents/ppo/appo.py index e030de383916..2cfab4ff54f4 100644 --- a/rllib/agents/ppo/appo.py +++ b/rllib/agents/ppo/appo.py @@ -54,7 +54,7 @@ # == IMPALA optimizer params (see documentation in impala.py) == "rollout_fragment_length": 50, "train_batch_size": 500, - "min_iter_time_s": 10, + "min_time_s_per_reporting": 10, "num_workers": 2, "num_gpus": 0, "num_multi_gpu_tower_stacks": 1, @@ -132,5 +132,7 @@ def get_default_policy_class(self, config: PartialTrainerConfigDict) -> \ from ray.rllib.agents.ppo.appo_torch_policy import \ AsyncPPOTorchPolicy return AsyncPPOTorchPolicy - else: + elif config["framework"] == "tf": return AsyncPPOTFPolicy + elif config["framework"] in ["tf2", "tfe"]: + return AsyncPPOTFPolicy.as_eager() diff --git a/rllib/agents/ppo/tests/test_appo.py b/rllib/agents/ppo/tests/test_appo.py index 3ae6ae0339c3..53da984d72fa 100644 --- a/rllib/agents/ppo/tests/test_appo.py +++ b/rllib/agents/ppo/tests/test_appo.py @@ -81,7 +81,7 @@ def test_appo_entropy_coeff_schedule(self): config["timesteps_per_iteration"] = 20 # 0 metrics reporting delay, this makes sure timestep, # which entropy coeff depends on, is updated after each worker rollout. - config["min_iter_time_s"] = 0 + config["min_time_s_per_reporting"] = 0 # Initial lr, doesn't really matter because of the schedule below. config["entropy_coeff"] = 0.01 schedule = [ diff --git a/rllib/agents/qmix/qmix.py b/rllib/agents/qmix/qmix.py index 9eff9ca5b49b..3501823f5f47 100644 --- a/rllib/agents/qmix/qmix.py +++ b/rllib/agents/qmix/qmix.py @@ -92,8 +92,8 @@ "num_workers": 0, # Whether to compute priorities on workers. "worker_side_prioritization": False, - # Prevent iterations from going lower than this time span - "min_iter_time_s": 1, + # Prevent reporting frequency from going lower than this time span. + "min_time_s_per_reporting": 1, # === Model === "model": { diff --git a/rllib/agents/sac/sac.py b/rllib/agents/sac/sac.py index b6f5b0e7ab06..7c36dab63d40 100644 --- a/rllib/agents/sac/sac.py +++ b/rllib/agents/sac/sac.py @@ -156,8 +156,8 @@ "num_cpus_per_worker": 1, # Whether to compute priorities on workers. "worker_side_prioritization": False, - # Prevent iterations from going lower than this time span. - "min_iter_time_s": 1, + # Prevent reporting frequency from going lower than this time span. + "min_time_s_per_reporting": 1, # Whether the loss should be calculated deterministically (w/o the # stochastic action sampling step). True only useful for cont. actions and diff --git a/rllib/agents/sac/tests/test_sac.py b/rllib/agents/sac/tests/test_sac.py index 83293cb95bfb..9e67e03d2941 100644 --- a/rllib/agents/sac/tests/test_sac.py +++ b/rllib/agents/sac/tests/test_sac.py @@ -157,7 +157,7 @@ def test_sac_loss_function(self): config["Q_model"]["fcnet_hiddens"] = [10] config["policy_model"]["fcnet_hiddens"] = [10] # Make sure, timing differences do not affect trainer.train(). - config["min_iter_time_s"] = 0 + config["min_time_s_per_reporting"] = 0 # Test SAC with Simplex action space. config["env_config"] = {"simplex_actions": True} diff --git a/rllib/agents/slateq/slateq.py b/rllib/agents/slateq/slateq.py index 9ce28d146def..82bf7107d406 100644 --- a/rllib/agents/slateq/slateq.py +++ b/rllib/agents/slateq/slateq.py @@ -124,8 +124,8 @@ "num_workers": 0, # Whether to compute priorities on workers. "worker_side_prioritization": False, - # Prevent iterations from going lower than this time span - "min_iter_time_s": 1, + # Prevent reporting frequency from going lower than this time span. + "min_time_s_per_reporting": 1, # === SlateQ specific options === # Learning method used by the slateq policy. Choose from: RANDOM, diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 180ab36b505c..e5e36b098ea5 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -11,8 +11,8 @@ import pickle import tempfile import time -from typing import Callable, DefaultDict, Dict, List, Optional, Set, Tuple, \ - Type, Union +from typing import Callable, Container, DefaultDict, Dict, List, Optional, \ + Set, Tuple, Type, Union import ray from ray.actor import ActorHandle @@ -53,9 +53,9 @@ from ray.rllib.utils.metrics.learner_info import LEARNER_INFO from ray.rllib.utils.pre_checks.multi_agent import check_multi_agent from ray.rllib.utils.spaces import space_utils -from ray.rllib.utils.typing import AgentID, EnvInfoDict, EnvType, EpisodeID, \ - PartialTrainerConfigDict, PolicyID, ResultDict, TensorStructType, \ - TensorType, TrainerConfigDict +from ray.rllib.utils.typing import AgentID, EnvCreator, EnvInfoDict, EnvType, \ + EpisodeID, PartialTrainerConfigDict, PolicyID, PolicyState, ResultDict, \ + TensorStructType, TensorType, TrainerConfigDict from ray.tune.logger import Logger, UnifiedLogger from ray.tune.registry import ENV_CREATOR, register_env, _global_registry from ray.tune.resources import Resources @@ -674,12 +674,17 @@ class directly. Note that this arg can also be specified via # COMMON_CONFIG in self.setup(). config = config or {} - # Trainers allow env ids to be passed directly to the constructor. - self._env_id = self._register_if_needed( + # Convert `env` provided in config into a string: + # - If `env` is a string: `self._env_id` = `env`. + # - If `env` is a class: `self._env_id` = `env.__name__` -> Already + # register it with a auto-generated env creator. + # - If `env` is None: `self._env_id` is None. + self._env_id: Optional[str] = self._register_if_needed( env or config.get("env"), config) + # The env creator callable, taking an EnvContext (config dict) # as arg and returning an RLlib supported Env type (e.g. a gym.Env). - self.env_creator: Callable[[EnvContext], EnvType] = None + self.env_creator: EnvCreator = None # Placeholder for a local replay buffer instance. self.local_replay_buffer = None @@ -753,38 +758,14 @@ def setup(self, config: PartialTrainerConfigDict): # be a partial config dict with the class' default). self.config = self.merge_trainer_configs( self.get_default_config(), config, self._allow_unknown_configs) + self.config["env"] = self._env_id # Validate the framework settings in config. self.validate_framework(self.config) - # Setup the "env creator" callable. - env = self._env_id - if env: - self.config["env"] = env - - # An already registered env. - if _global_registry.contains(ENV_CREATOR, env): - self.env_creator = _global_registry.get(ENV_CREATOR, env) - - # A class path specifier. - elif "." in env: - - def env_creator_from_classpath(env_context): - try: - env_obj = from_config(env, env_context) - except ValueError: - raise EnvError( - ERR_MSG_INVALID_ENV_DESCRIPTOR.format(env)) - return env_obj - - self.env_creator = env_creator_from_classpath - # Try gym/PyBullet/Vizdoom. - else: - self.env_creator = functools.partial( - gym_env_creator, env_descriptor=env) - # No env -> Env creator always returns None. - else: - self.env_creator = lambda env_config: None + # Setup the self.env_creator callable (to be passed + # e.g. to RolloutWorkers' c'tors). + self.env_creator = self._get_env_creator_from_env_id(self._env_id) # Set Trainer's seed after we have - if necessary - enabled # tf eager-execution. @@ -792,11 +773,6 @@ def env_creator_from_classpath(env_context): self.config["seed"]) self.validate_config(self.config) - if not callable(self.config["callbacks"]): - raise ValueError( - "`callbacks` must be a callable method that " - "returns a subclass of DefaultCallbacks, got {}".format( - self.config["callbacks"])) self.callbacks = self.config["callbacks"]() log_level = self.config.get("log_level") if log_level in ["WARN", "ERROR"]: @@ -817,14 +793,14 @@ def env_creator_from_classpath(env_context): self.remote_requests_in_flight: \ DefaultDict[ActorHandle, Set[ray.ObjectRef]] = defaultdict(set) + self.workers: Optional[WorkerSet] = None + self.train_exec_impl = None + # Deprecated way of implementing Trainer sub-classes (or "templates" - # via the soon-to-be deprecated `build_trainer` utility function). + # via the `build_trainer` utility function). # Instead, sub-classes should override the Trainable's `setup()` # method and call super().setup() from within that override at some # point. - self.workers: Optional[WorkerSet] = None - self.train_exec_impl = None - # Old design: Override `Trainer._init` (or use `build_trainer()`, which # will do this for you). try: @@ -864,8 +840,9 @@ def env_creator_from_classpath(env_context): self.workers, self.config, **self._kwargs_for_execution_plan()) - # Now that workers have been created, update our policy - # specs in the config[multiagent] dict with the correct spaces. + # Now that workers have been created, update our policies + # dict in config[multiagent] (with the correct original/ + # unpreprocessed spaces). self.config["multiagent"]["policies"] = \ self.workers.local_worker().policy_dict @@ -944,7 +921,7 @@ def env_creator_from_classpath(env_context): # If you don't need the env/workers/config/etc.. setup for you by super, # simply do not call super().setup() from your overridden method. def _init(self, config: TrainerConfigDict, - env_creator: Callable[[EnvContext], EnvType]) -> None: + env_creator: EnvCreator) -> None: raise NotImplementedError @ExperimentalAPI @@ -1694,55 +1671,73 @@ def add_policy( observation_space: Optional[gym.spaces.Space] = None, action_space: Optional[gym.spaces.Space] = None, config: Optional[PartialTrainerConfigDict] = None, + policy_state: Optional[PolicyState] = None, policy_mapping_fn: Optional[Callable[[AgentID, EpisodeID], PolicyID]] = None, - policies_to_train: Optional[List[PolicyID]] = None, + policies_to_train: Optional[Container[PolicyID]] = None, evaluation_workers: bool = True, + workers: Optional[List[Union[RolloutWorker, ActorHandle]]] = None, ) -> Policy: """Adds a new policy to this Trainer. Args: - policy_id (PolicyID): ID of the policy to add. - policy_cls (Type[Policy]): The Policy class to use for + policy_id: ID of the policy to add. + policy_cls: The Policy class to use for constructing the new Policy. - observation_space (Optional[gym.spaces.Space]): The observation - space of the policy to add. - action_space (Optional[gym.spaces.Space]): The action space - of the policy to add. - config (Optional[PartialTrainerConfigDict]): The config overrides - for the policy to add. - policy_mapping_fn (Optional[Callable[[AgentID], PolicyID]]): An - optional (updated) policy mapping function to use from here on. - Note that already ongoing episodes will not change their - mapping but will use the old mapping till the end of the - episode. - policies_to_train (Optional[List[PolicyID]]): An optional list of - policy IDs to be trained. If None, will keep the existing list - in place. Policies, whose IDs are not in the list will not be - updated. - evaluation_workers (bool): Whether to add the new policy also + observation_space: The observation space of the policy to add. + If None, try to infer this space from the environment. + action_space: The action space of the policy to add. + If None, try to infer this space from the environment. + config: The config overrides for the policy to add. + policy_state: Optional state dict to apply to the new + policy instance, right after its construction. + policy_mapping_fn: An optional (updated) policy mapping function + to use from here on. Note that already ongoing episodes will + not change their mapping but will use the old mapping till + the end of the episode. + policies_to_train: An optional list/set of policy IDs to be + trained. If None, will keep the existing list in place. + Policies, whose IDs are not in the list will not be updated. + evaluation_workers: Whether to add the new policy also to the evaluation WorkerSet. + workers: A list of RolloutWorker/ActorHandles (remote + RolloutWorkers) to add this policy to. If defined, will only + add the given policy to these workers. Returns: The newly added policy (the copy that got added to the local worker). """ + kwargs = dict( + policy_id=policy_id, + policy_cls=policy_cls, + observation_space=observation_space, + action_space=action_space, + config=config, + policy_state=policy_state, + policy_mapping_fn=policy_mapping_fn, + policies_to_train=list(policies_to_train), + ) + def fn(worker: RolloutWorker): # `foreach_worker` function: Adds the policy the the worker (and # maybe changes its policy_mapping_fn - if provided here). - worker.add_policy( - policy_id=policy_id, - policy_cls=policy_cls, - observation_space=observation_space, - action_space=action_space, - config=config, - policy_mapping_fn=policy_mapping_fn, - policies_to_train=policies_to_train, - ) + worker.add_policy(**kwargs) - # Run foreach_worker fn on all workers (incl. evaluation workers). - self.workers.foreach_worker(fn) + if workers is not None: + ray_gets = [] + for worker in workers: + if isinstance(worker, ActorHandle): + ray_gets.append(worker.add_policy.remote(**kwargs)) + else: + fn(worker) + ray.get(ray_gets) + else: + # Run foreach_worker fn on all workers. + self.workers.foreach_worker(fn) + + # Update evaluation workers, if necessary. if evaluation_workers and self.evaluation_workers is not None: self.evaluation_workers.foreach_worker(fn) @@ -1761,18 +1756,16 @@ def remove_policy( """Removes a new policy from this Trainer. Args: - policy_id (Optional[PolicyID]): ID of the policy to be removed. - policy_mapping_fn (Optional[Callable[[AgentID], PolicyID]]): An - optional (updated) policy mapping function to use from here on. - Note that already ongoing episodes will not change their - mapping but will use the old mapping till the end of the - episode. - policies_to_train (Optional[List[PolicyID]]): An optional list of - policy IDs to be trained. If None, will keep the existing list - in place. Policies, whose IDs are not in the list will not be - updated. - evaluation_workers (bool): Whether to also remove the policy from - the evaluation WorkerSet. + policy_id: ID of the policy to be removed. + policy_mapping_fn: An optional (updated) policy mapping function + to use from here on. Note that already ongoing episodes will + not change their mapping but will use the old mapping till + the end of the episode. + policies_to_train: An optional list of policy IDs to be trained. + If None, will keep the existing list in place. Policies, + whose IDs are not in the list will not be updated. + evaluation_workers: Whether to also remove the policy from the + evaluation WorkerSet. """ def fn(worker): @@ -1878,12 +1871,8 @@ def log_result(self, result: ResultDict) -> None: @override(Trainable) def cleanup(self) -> None: # Stop all workers. - workers = getattr(self, "workers", None) - if workers: - workers.stop() - # Stop all optimizers. - if hasattr(self, "optimizer") and self.optimizer: - self.optimizer.stop() + if hasattr(self, "workers"): + self.workers.stop() @classmethod @override(Trainable) @@ -1932,11 +1921,46 @@ def _before_evaluate(self): """Pre-evaluation callback.""" pass + def _get_env_creator_from_env_id( + self, env_id: Optional[str] = None) -> EnvCreator: + """Returns an env creator callable, given an `env_id` (e.g. "CartPole-v0"). + + Args: + env_id: An already tune registered env ID, a known gym env name, + or None (if no env is used). + + Returns: + """ + if env_id: + # An already registered env. + if _global_registry.contains(ENV_CREATOR, env_id): + return _global_registry.get(ENV_CREATOR, env_id) + + # A class path specifier. + elif "." in env_id: + + def env_creator_from_classpath(env_context): + try: + env_obj = from_config(env_id, env_context) + except ValueError: + raise EnvError( + ERR_MSG_INVALID_ENV_DESCRIPTOR.format(env_id)) + return env_obj + + return env_creator_from_classpath + # Try gym/PyBullet/Vizdoom. + else: + return functools.partial( + gym_env_creator, env_descriptor=env_id) + # No env -> Env creator always returns None. + else: + return lambda env_config: None + @DeveloperAPI def _make_workers( self, *, - env_creator: Callable[[EnvContext], EnvType], + env_creator: EnvCreator, validate_env: Optional[Callable[[EnvType, EnvContext], None]], policy_class: Type[Policy], config: TrainerConfigDict, @@ -2151,10 +2175,14 @@ def validate_config(self, config: TrainerConfigDict) -> None: if config.get("record_env") == "": config["record_env"] = True - # DefaultCallbacks if callbacks - for whatever reason - set to - # None. + # Use DefaultCallbacks class, if callbacks is None. if config["callbacks"] is None: config["callbacks"] = DefaultCallbacks + # Check, whether given `callbacks` is a callable. + if not callable(config["callbacks"]): + raise ValueError("`callbacks` must be a callable method that " + "returns a subclass of DefaultCallbacks, got " + f"{config['callbacks']}!") # Multi-GPU settings. simple_optim_setting = config.get("simple_optimizer", DEPRECATED_VALUE) @@ -2262,14 +2290,12 @@ def validate_config(self, config: TrainerConfigDict) -> None: config["metrics_num_episodes_for_smoothing"] = \ config["metrics_smoothing_episodes"] if config["min_iter_time_s"] != DEPRECATED_VALUE: - # TODO: Warn once all algos use the `training_iteration` method. - # deprecation_warning( - # old="min_iter_time_s", - # new="min_time_s_per_reporting", - # error=False, - # ) - config["min_time_s_per_reporting"] = \ - config["min_iter_time_s"] + deprecation_warning( + old="min_iter_time_s", + new="min_time_s_per_reporting", + error=False, + ) + config["min_time_s_per_reporting"] = config["min_iter_time_s"] if config["collect_metrics_timeout"] != DEPRECATED_VALUE: # TODO: Warn once all algos use the `training_iteration` method. @@ -2465,8 +2491,6 @@ def __getstate__(self) -> dict: state = {} if hasattr(self, "workers"): state["worker"] = self.workers.local_worker().save() - if hasattr(self, "optimizer") and hasattr(self.optimizer, "save"): - state["optimizer"] = self.optimizer.save() # TODO: Experimental functionality: Store contents of replay buffer # to checkpoint, only if user has configured this. if self.local_replay_buffer is not None and \ @@ -2481,14 +2505,11 @@ def __getstate__(self) -> dict: return state def __setstate__(self, state: dict): - if "worker" in state and hasattr(self, "workers"): + if hasattr(self, "workers") and "worker" in state: self.workers.local_worker().restore(state["worker"]) remote_state = ray.put(state["worker"]) for r in self.workers.remote_workers(): r.restore.remote(remote_state) - # Restore optimizer data, if necessary. - if "optimizer" in state and hasattr(self, "optimizer"): - self.optimizer.restore(state["optimizer"]) # If necessary, restore replay data as well. if self.local_replay_buffer is not None: # TODO: Experimental functionality: Restore contents of replay @@ -2796,12 +2817,3 @@ def _try_recover(self): def _validate_config(config, trainer_or_none): assert trainer_or_none is not None return trainer_or_none.validate_config(config) - - # TODO: `self.optimizer` is no longer created in Trainer -> - # Deprecate this method. - @Deprecated(error=False) - def collect_metrics(self, selected_workers=None): - return self.optimizer.collect_metrics( - self.config["metrics_episode_collection_timeout_s"], - min_history=self.config["metrics_num_episodes_for_smoothing"], - selected_workers=selected_workers) diff --git a/rllib/agents/trainer_template.py b/rllib/agents/trainer_template.py index cca4e2036f7d..93133e3175fc 100644 --- a/rllib/agents/trainer_template.py +++ b/rllib/agents/trainer_template.py @@ -8,7 +8,7 @@ from ray.rllib.utils import add_mixins from ray.rllib.utils.annotations import override from ray.rllib.utils.deprecation import Deprecated -from ray.rllib.utils.typing import EnvConfigDict, EnvType, \ +from ray.rllib.utils.typing import EnvCreator, EnvType, \ PartialTrainerConfigDict, ResultDict, TrainerConfigDict from ray.tune.logger import Logger @@ -113,8 +113,7 @@ def setup(self, config: PartialTrainerConfigDict): override_all_subkeys_if_type_changes Trainer.setup(self, config) - def _init(self, config: TrainerConfigDict, - env_creator: Callable[[EnvConfigDict], EnvType]): + def _init(self, config: TrainerConfigDict, env_creator: EnvCreator): # No `get_policy_class` function. if get_policy_class is None: diff --git a/rllib/contrib/maddpg/maddpg.py b/rllib/contrib/maddpg/maddpg.py index 2cb9ceac086c..414e68940b88 100644 --- a/rllib/contrib/maddpg/maddpg.py +++ b/rllib/contrib/maddpg/maddpg.py @@ -120,7 +120,7 @@ # you're using the Async or Ape-X optimizers. "num_workers": 1, # Prevent iterations from going lower than this time span - "min_iter_time_s": 0, + "min_time_s_per_reporting": 0, }) # __sphinx_doc_end__ # yapf: enable diff --git a/rllib/env/multi_agent_env.py b/rllib/env/multi_agent_env.py index fc45ecc548e0..ab458bcef85a 100644 --- a/rllib/env/multi_agent_env.py +++ b/rllib/env/multi_agent_env.py @@ -3,11 +3,10 @@ from typing import Callable, Dict, List, Tuple, Type, Optional, Union, Set from ray.rllib.env.base_env import BaseEnv -from ray.rllib.env.env_context import EnvContext from ray.rllib.utils.annotations import ExperimentalAPI, override, PublicAPI, \ DeveloperAPI -from ray.rllib.utils.typing import AgentID, EnvID, EnvType, MultiAgentDict, \ - MultiEnvDict +from ray.rllib.utils.typing import AgentID, EnvCreator, EnvID, EnvType, \ + MultiAgentDict, MultiEnvDict # If the obs space is Dict type, look for the global state under this key. ENV_STATE = "state" @@ -322,9 +321,8 @@ def _check_if_space_maps_agent_id_to_sub_space(self) -> bool: return obs_space_check and action_space_check -def make_multi_agent( - env_name_or_creator: Union[str, Callable[[EnvContext], EnvType]], -) -> Type["MultiAgentEnv"]: +def make_multi_agent(env_name_or_creator: Union[str, EnvCreator], + ) -> Type["MultiAgentEnv"]: """Convenience wrapper for any single-agent env to be converted into MA. Allows you to convert a simple (single-agent) `gym.Env` class diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index 9592a8525da2..a9688a5b54b5 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -44,10 +44,10 @@ from ray.rllib.utils.sgd import do_minibatch_sgd from ray.rllib.utils.tf_utils import get_gpu_devices as get_tf_gpu_devices from ray.rllib.utils.tf_run_builder import TFRunBuilder -from ray.rllib.utils.typing import AgentID, EnvConfigDict, EnvType, \ - ModelConfigDict, ModelGradients, ModelWeights, \ +from ray.rllib.utils.typing import AgentID, EnvConfigDict, EnvCreator, \ + EnvType, ModelConfigDict, ModelGradients, ModelWeights, \ MultiAgentPolicyConfigDict, PartialTrainerConfigDict, PolicyID, \ - SampleBatchType, T + PolicyState, SampleBatchType, T from ray.util.debug import log_once, disable_log_once_globally, \ enable_periodic_logging from ray.util.iter import ParallelIteratorWorker @@ -181,7 +181,7 @@ def as_remote(cls, def __init__( self, *, - env_creator: Callable[[EnvContext], EnvType], + env_creator: EnvCreator, validate_env: Optional[Callable[[EnvType, EnvContext], None]] = None, policy_spec: Optional[Union[type, Dict[PolicyID, @@ -421,7 +421,7 @@ def gen_rollouts(): # If provided, set it here. self.set_policy_mapping_fn(policy_mapping_fn) - self.env_creator: Callable[[EnvContext], EnvType] = env_creator + self.env_creator: EnvCreator = env_creator self.rollout_fragment_length: int = rollout_fragment_length * num_envs self.count_steps_by: str = count_steps_by self.batch_mode: str = batch_mode @@ -1084,6 +1084,7 @@ def add_policy( observation_space: Optional[Space] = None, action_space: Optional[Space] = None, config: Optional[PartialTrainerConfigDict] = None, + policy_state: Optional[PolicyState] = None, policy_mapping_fn: Optional[Callable[[AgentID, "Episode"], PolicyID]] = None, policies_to_train: Optional[List[PolicyID]] = None, @@ -1097,8 +1098,8 @@ def add_policy( observation_space: The observation space of the policy to add. action_space: The action space of the policy to add. config: The config overrides for the policy to add. - policy_config: The base config of the Trainer object owning this - RolloutWorker. + policy_state: Optional state dict to apply to the new + policy instance, right after its construction. policy_mapping_fn: An optional (updated) policy mapping function to use from here on. Note that already ongoing episodes will not change their mapping but will use the old mapping till @@ -1109,9 +1110,13 @@ def add_policy( Returns: The newly added policy. + + Raises: + KeyError: If the given `policy_id` already exists in this worker's + PolicyMap. """ if policy_id in self.policy_map: - raise ValueError(f"Policy ID '{policy_id}' already in policy map!") + raise KeyError(f"Policy ID '{policy_id}' already in policy map!") policy_dict_to_add = _determine_spaces_for_multi_agent_dict( { policy_id: PolicySpec(policy_cls, observation_space, @@ -1127,6 +1132,9 @@ def add_policy( self.policy_config, seed=self.policy_config.get("seed")) new_policy = self.policy_map[policy_id] + # Set the state of the newly created policy. + if policy_state: + new_policy.set_state(policy_state) self.filters[policy_id] = get_filter( self.observation_filter, new_policy.observation_space.shape) diff --git a/rllib/evaluation/worker_set.py b/rllib/evaluation/worker_set.py index ddcd21c4126d..e2315716b36e 100644 --- a/rllib/evaluation/worker_set.py +++ b/rllib/evaluation/worker_set.py @@ -16,7 +16,8 @@ from ray.rllib.utils.annotations import DeveloperAPI from ray.rllib.utils.framework import try_import_tf from ray.rllib.utils.from_config import from_config -from ray.rllib.utils.typing import EnvType, PolicyID, TrainerConfigDict +from ray.rllib.utils.typing import EnvCreator, EnvType, PolicyID, \ + TrainerConfigDict from ray.tune.registry import registry_contains_input, registry_get_input tf1, tf, tfv = try_import_tf() @@ -37,7 +38,7 @@ class WorkerSet: def __init__( self, *, - env_creator: Optional[Callable[[EnvContext], EnvType]] = None, + env_creator: Optional[EnvCreator] = None, validate_env: Optional[Callable[[EnvType], None]] = None, policy_class: Optional[Type[Policy]] = None, trainer_config: Optional[TrainerConfigDict] = None, @@ -401,7 +402,7 @@ def _make_worker( self, *, cls: Callable, - env_creator: Callable[[EnvContext], EnvType], + env_creator: EnvCreator, validate_env: Optional[Callable[[EnvType], None]], policy_cls: Type[Policy], worker_index: int, diff --git a/rllib/examples/sumo_env_local.py b/rllib/examples/sumo_env_local.py index 7e2eb65d35d0..e185496e14e3 100644 --- a/rllib/examples/sumo_env_local.py +++ b/rllib/examples/sumo_env_local.py @@ -84,7 +84,7 @@ config["lambda"] = 0.95 config["log_level"] = "WARN" config["lr"] = 0.001 - config["min_iter_time_s"] = 5 + config["min_time_s_per_reporting"] = 5 config["num_gpus"] = int(os.environ.get("RLLIB_NUM_GPUS", "0")) config["num_workers"] = args.num_workers config["rollout_fragment_length"] = 200 diff --git a/rllib/execution/metric_ops.py b/rllib/execution/metric_ops.py index 4fd2e28f3596..fb72c8ba4b79 100644 --- a/rllib/execution/metric_ops.py +++ b/rllib/execution/metric_ops.py @@ -1,4 +1,4 @@ -from typing import Any, List, Dict +from typing import Any, Dict, List, Optional import time from ray.actor import ActorHandle @@ -43,7 +43,7 @@ def StandardMetricsReporting( output_op = train_op \ .filter(OncePerTimestepsElapsed(config["timesteps_per_iteration"], by_steps_trained=by_steps_trained)) \ - .filter(OncePerTimeInterval(config["min_iter_time_s"])) \ + .filter(OncePerTimeInterval(config["min_time_s_per_reporting"])) \ .for_each(CollectMetrics( workers, min_history=config["metrics_num_episodes_for_smoothing"], @@ -147,16 +147,19 @@ class OncePerTimeInterval: 5.00001 # will be greater than 5 seconds """ - def __init__(self, delay: int): - self.delay = delay - self.last_called = 0 + def __init__(self, delay: Optional[float] = None): + self.delay = delay or 0.0 + self.last_returned_true = 0 def __call__(self, item: Any) -> bool: + # No minimum time to wait for -> Return True. if self.delay <= 0.0: return True + # Return True, if time since last returned=True is larger than + # `self.delay`. now = time.time() - if now - self.last_called > self.delay: - self.last_called = now + if now - self.last_returned_true > self.delay: + self.last_returned_true = now return True return False diff --git a/rllib/tests/test_checkpoint_restore.py b/rllib/tests/test_checkpoint_restore.py index 9b242b3bed79..a49c428e78bf 100644 --- a/rllib/tests/test_checkpoint_restore.py +++ b/rllib/tests/test_checkpoint_restore.py @@ -11,7 +11,7 @@ def get_mean_action(alg, obs): out = [] for _ in range(2000): - out.append(float(alg.compute_action(obs))) + out.append(float(alg.compute_single_action(obs))) return np.mean(out) @@ -24,7 +24,7 @@ def get_mean_action(alg, obs): "explore": False, "observation_filter": "MeanStdFilter", "num_workers": 2, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "optimizer": { "num_replay_buffer_shards": 1, }, @@ -75,7 +75,7 @@ def ckpt_restore_test(alg_name, if replay_buffer: config["store_buffer_in_checkpoints"] = True - frameworks = (["tfe"] if tfe else []) + ["torch", "tf"] + frameworks = (["tf2"] if tfe else []) + ["torch", "tf"] for fw in framework_iterator(config, frameworks=frameworks): for use_object_store in ([False, True] if object_store else [False]): print("use_object_store={}".format(use_object_store)) diff --git a/rllib/tests/test_eager_support.py b/rllib/tests/test_eager_support.py index 1038cebe9427..85f2e01e6629 100644 --- a/rllib/tests/test_eager_support.py +++ b/rllib/tests/test_eager_support.py @@ -110,7 +110,7 @@ def test_apex_dqn(self): "num_workers": 2, "learning_starts": 0, "num_gpus": 0, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "timesteps_per_iteration": 100, "optimizer": { "num_replay_buffer_shards": 1, diff --git a/rllib/tests/test_exec_api.py b/rllib/tests/test_exec_api.py index 11339f08640b..520ca31effb7 100644 --- a/rllib/tests/test_exec_api.py +++ b/rllib/tests/test_exec_api.py @@ -24,7 +24,7 @@ def test_exec_plan_stats(ray_start_regular): trainer = A2CTrainer( env="CartPole-v0", config={ - "min_iter_time_s": 0, + "min_time_s_per_reporting": 0, "framework": fw, }) result = trainer.train() @@ -45,7 +45,7 @@ def test_exec_plan_save_restore(ray_start_regular): trainer = A2CTrainer( env="CartPole-v0", config={ - "min_iter_time_s": 0, + "min_time_s_per_reporting": 0, "framework": fw, }) res1 = trainer.train() diff --git a/rllib/tests/test_execution.py b/rllib/tests/test_execution.py index 9b879d047685..b6ae489193a2 100644 --- a/rllib/tests/test_execution.py +++ b/rllib/tests/test_execution.py @@ -104,7 +104,7 @@ def test_metrics(ray_start_regular_shared): a = from_range(10, repeat=True).gather_sync() b = StandardMetricsReporting( a, workers, { - "min_iter_time_s": 2.5, + "min_time_s_per_reporting": 2.5, "timesteps_per_iteration": 0, "metrics_num_episodes_for_smoothing": 10, "metrics_episode_collection_timeout_s": 10, diff --git a/rllib/tests/test_export.py b/rllib/tests/test_export.py index 0cf56d5a04dd..021f0f1e6154 100644 --- a/rllib/tests/test_export.py +++ b/rllib/tests/test_export.py @@ -20,7 +20,7 @@ "explore": False, "observation_filter": "MeanStdFilter", "num_workers": 2, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "optimizer": { "num_replay_buffer_shards": 1, }, diff --git a/rllib/tests/test_ignore_worker_failure.py b/rllib/tests/test_ignore_worker_failure.py index 050fd5e83037..ed83118da392 100644 --- a/rllib/tests/test_ignore_worker_failure.py +++ b/rllib/tests/test_ignore_worker_failure.py @@ -101,7 +101,7 @@ def test_async_replay(self): "APEX", { "timesteps_per_iteration": 1000, "num_gpus": 0, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "explore": False, "learning_starts": 1000, "target_network_update_freq": 100, diff --git a/rllib/tests/test_reproducibility.py b/rllib/tests/test_reproducibility.py index a457a437af41..00c6546f1b7b 100644 --- a/rllib/tests/test_reproducibility.py +++ b/rllib/tests/test_reproducibility.py @@ -34,7 +34,7 @@ def env_creator(env_config): register_env("PickLargest", env_creator) config = { "seed": 666 if trial in [0, 1] else 999, - "min_iter_time_s": 0, + "min_time_s_per_reporting": 0, "timesteps_per_iteration": 100, "framework": fw, } diff --git a/rllib/tests/test_rllib_train_and_evaluate.py b/rllib/tests/test_rllib_train_and_evaluate.py index 2f1961714865..a4b9dfa883d9 100644 --- a/rllib/tests/test_rllib_train_and_evaluate.py +++ b/rllib/tests/test_rllib_train_and_evaluate.py @@ -30,14 +30,15 @@ def evaluate_test(algo, env="CartPole-v0", test_episode_rollout=False): rllib_dir = str(Path(__file__).parent.parent.absolute()) print("RLlib dir = {}\nexists={}".format(rllib_dir, os.path.exists(rllib_dir))) - os.system("python {}/train.py --local-dir={} --run={} " - "--checkpoint-freq=1 ".format(rllib_dir, tmp_dir, algo) + - "--config='{" + "\"num_workers\": 1, \"num_gpus\": 0{}{}". - format(fw_, extra_config) + - ", \"timesteps_per_iteration\": 5,\"min_iter_time_s\": 0.1, " - "\"model\": {\"fcnet_hiddens\": [10]}" - "}' --stop='{\"training_iteration\": 1}'" + - " --env={} --no-ray-ui".format(env)) + os.system( + "python {}/train.py --local-dir={} --run={} " + "--checkpoint-freq=1 ".format(rllib_dir, tmp_dir, algo) + + "--config='{" + "\"num_workers\": 1, \"num_gpus\": 0{}{}".format( + fw_, extra_config) + + ", \"timesteps_per_iteration\": 5,\"min_time_s_per_reporting\": 0.1, " + "\"model\": {\"fcnet_hiddens\": [10]}" + "}' --stop='{\"training_iteration\": 1}'" + + " --env={} --no-ray-ui".format(env)) checkpoint_path = os.popen("ls {}/default/*/checkpoint_000001/" "checkpoint-1".format(tmp_dir)).read()[:-1] diff --git a/rllib/tests/test_supported_multi_agent.py b/rllib/tests/test_supported_multi_agent.py index 2c114cec4d02..cf6e1749f79a 100644 --- a/rllib/tests/test_supported_multi_agent.py +++ b/rllib/tests/test_supported_multi_agent.py @@ -99,7 +99,7 @@ def test_apex_multiagent(self): "timesteps_per_iteration": 100, "num_gpus": 0, "buffer_size": 1000, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "learning_starts": 10, "target_network_update_freq": 100, "optimizer": { @@ -114,7 +114,7 @@ def test_apex_ddpg_multiagent(self): "timesteps_per_iteration": 100, "buffer_size": 1000, "num_gpus": 0, - "min_iter_time_s": 1, + "min_time_s_per_reporting": 1, "learning_starts": 10, "target_network_update_freq": 100, "use_state_preprocessor": True, diff --git a/rllib/tuned_examples/dqn/cartpole-apex.yaml b/rllib/tuned_examples/dqn/cartpole-apex.yaml index 2c776aae2841..00c8c1a02e1d 100644 --- a/rllib/tuned_examples/dqn/cartpole-apex.yaml +++ b/rllib/tuned_examples/dqn/cartpole-apex.yaml @@ -21,7 +21,7 @@ cartpole-apex-dqn: num_gpus: 0 - min_iter_time_s: 5 + min_time_s_per_reporting: 5 target_network_update_freq: 500 learning_starts: 1000 timesteps_per_iteration: 1000 diff --git a/rllib/utils/typing.py b/rllib/utils/typing.py index c413ec4cbbd4..4f758c67b214 100644 --- a/rllib/utils/typing.py +++ b/rllib/utils/typing.py @@ -1,30 +1,36 @@ import gym -from typing import Any, Dict, List, Tuple, Union, TypeVar, \ - TYPE_CHECKING +from typing import Any, Callable, Dict, List, Optional, Tuple, Union, \ + TypeVar, TYPE_CHECKING if TYPE_CHECKING: - from ray.rllib.utils import try_import_tf, try_import_torch - _, tf, _ = try_import_tf() - torch, _ = try_import_torch() + from ray.rllib.env.env_context import EnvContext from ray.rllib.policy.policy import PolicySpec from ray.rllib.policy.sample_batch import SampleBatch, MultiAgentBatch from ray.rllib.policy.view_requirement import ViewRequirement + from ray.rllib.utils import try_import_tf, try_import_torch + _, tf, _ = try_import_tf() + torch, _ = try_import_torch() + +# Represents a generic tensor type. +# This could be an np.ndarray, tf.Tensor, or a torch.Tensor. +TensorType = Any + +# Either a plain tensor, or a dict or tuple of tensors (or StructTensors). +TensorStructType = Union[TensorType, dict, tuple] + +# A shape of a tensor. +TensorShape = Union[Tuple[int], List[int]] # Represents a fully filled out config of a Trainer class. # Note: Policy config dicts are usually the same as TrainerConfigDict, but # parts of it may sometimes be altered in e.g. a multi-agent setup, # where we have >1 Policies in the same Trainer. - TrainerConfigDict = dict # A trainer config dict that only has overrides. It needs to be combined with # the default trainer config to be used. PartialTrainerConfigDict = dict -# Represents the env_config sub-dict of the trainer config that is passed to -# the env constructor. -EnvConfigDict = dict - # Represents the model config sub-dict of the trainer config that is passed to # the model catalog. ModelConfigDict = dict @@ -33,10 +39,24 @@ # need a config dict with a "type" key, a class path (str), or a type directly. FromConfigSpec = Union[Dict[str, Any], type, str] +# Represents the env_config sub-dict of the trainer config that is passed to +# the env constructor. +EnvConfigDict = dict + +# Represents an environment id. These could be: +# - An int index for a sub-env within a vectorized env. +# - An external env ID (str), which changes(!) each episode. +EnvID = Union[int, str] + # Represents a BaseEnv, MultiAgentEnv, ExternalEnv, ExternalMultiAgentEnv, # VectorEnv, gym.Env, or ActorHandle. EnvType = Any +# A callable, taking a EnvContext object +# (config dict + properties: `worker_index`, `vector_index`, `num_workers`, +# and `remote`) and returning an env object (or None if no env is used). +EnvCreator = Callable[["EnvContext"], Optional[EnvType]] + # Represents a generic identifier for an agent (e.g., "agent1"). AgentID = Any @@ -46,10 +66,9 @@ # Type of the config["multiagent"]["policies"] dict for multi-agent training. MultiAgentPolicyConfigDict = Dict[PolicyID, "PolicySpec"] -# Represents an environment id. These could be: -# - An int index for a sub-env within a vectorized env. -# - An external env ID (str), which changes(!) each episode. -EnvID = Union[int, str] +# State dict of a Policy, mapping strings (e.g. "weights") to some state +# data (TensorStructType). +PolicyState = Dict[str, TensorStructType] # Represents an episode id. EpisodeID = int @@ -98,10 +117,6 @@ # policy id. LearnerStatsDict = dict -# Represents a generic tensor type. -# This could be an np.ndarray, tf.Tensor, or a torch.Tensor. -TensorType = Any - # List of grads+var tuples (tf) or list of gradient tensors (torch) # representing model gradients and returned by compute_gradients(). ModelGradients = Union[List[Tuple[TensorType, TensorType]], List[TensorType]] @@ -115,12 +130,6 @@ # Some kind of sample batch. SampleBatchType = Union["SampleBatch", "MultiAgentBatch"] -# Either a plain tensor, or a dict or tuple of tensors (or StructTensors). -TensorStructType = Union[TensorType, dict, tuple] - -# A shape of a tensor. -TensorShape = Union[Tuple[int], List[int]] - # A (possibly nested) space struct: Either a gym.spaces.Space or a # (possibly nested) dict|tuple of gym.space.Spaces. SpaceStruct = Union[gym.spaces.Space, dict, tuple]