From 81abbed4563e9adadb47f3513a18a8bbdd9ec22a Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 26 Nov 2021 16:21:38 +0100 Subject: [PATCH 1/9] wip. --- rllib/agents/trainer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 4e1149e94f1d..618057e9d8fd 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -271,8 +271,8 @@ # === Evaluation Settings === # Evaluate with every `evaluation_interval` training iterations. # The evaluation stats will be reported under the "evaluation" metric key. - # Note that evaluation is currently not parallelized, and that for Ape-X - # metrics are already only reported for the lowest epsilon workers. + # Note that for Ape-X metrics are already only reported for the lowest + # epsilon workers. "evaluation_interval": None, # Number of episodes to run in total per evaluation period. # If using multiple evaluation workers (evaluation_num_workers > 1), @@ -310,9 +310,9 @@ "evaluation_num_workers": 0, # Customize the evaluation method. This must be a function of signature # (trainer: Trainer, eval_workers: WorkerSet) -> metrics: dict. See the - # Trainer.evaluate() method to see the default implementation. The - # trainer guarantees all eval workers have the latest policy state before - # this function is called. + # Trainer.evaluate() method to see the default implementation. + # The Trainer guarantees all eval workers have the latest policy state + # before this function is called. "custom_eval_function": None, # === Advanced Rollout Settings === From e95eb05c4cb784da7de521dd57154163415d5567 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Mon, 29 Nov 2021 11:47:54 +0100 Subject: [PATCH 2/9] wip. --- rllib/BUILD | 12 +- rllib/agents/trainer.py | 214 ++++++++++++------ rllib/evaluation/worker_set.py | 88 ++++--- .../parallel_evaluation_and_training.py | 98 ++++++-- 4 files changed, 292 insertions(+), 120 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 45f48898b9ac..2395c41f4b77 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -2328,7 +2328,7 @@ py_test( tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--stop-reward=50.0", "--num-cpus=6", "--evaluation-num-episodes=13"] + args = ["--as-test", "--stop-reward=50.0", "--num-cpus=6", "--evaluation-duration=13"] ) py_test( @@ -2337,25 +2337,25 @@ py_test( tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--stop-reward=50.0", "--num-cpus=6", "--evaluation-num-episodes=auto"] + args = ["--as-test", "--stop-reward=50.0", "--num-cpus=6", "--evaluation-duration=auto"] ) py_test( - name = "examples/parallel_evaluation_and_training_11_episodes_tf2", + name = "examples/parallel_evaluation_and_training_211_ts_tf2", main = "examples/parallel_evaluation_and_training.py", tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--framework=tf2", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-num-episodes=11"] + args = ["--as-test", "--framework=tf2", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-duration=211", "--evaluation-duration-unit=timesteps"] ) py_test( - name = "examples/parallel_evaluation_and_training_14_episodes_torch", + name = "examples/parallel_evaluation_and_training_auto_ts_torch", main = "examples/parallel_evaluation_and_training.py", tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--framework=torch", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-num-episodes=14"] + args = ["--as-test", "--framework=torch", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-duration=auto", "--evaluation-duration-unit=timesteps"] ) py_test( diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 618057e9d8fd..aa92be624431 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -272,16 +272,22 @@ # Evaluate with every `evaluation_interval` training iterations. # The evaluation stats will be reported under the "evaluation" metric key. # Note that for Ape-X metrics are already only reported for the lowest - # epsilon workers. + # epsilon workers (least random workers). + # Set to None (or 0) for no evaluation. "evaluation_interval": None, - # Number of episodes to run in total per evaluation period. + # Duration for which to run evaluation each `evaluation_interval`. + # The unit for the duration can be set via `evaluation_duration_unit` to + # either "episodes" (default) or "timesteps". # If using multiple evaluation workers (evaluation_num_workers > 1), - # episodes will be split amongst these. - # If "auto": - # - evaluation_parallel_to_training=True: Will run as many episodes as the - # training step takes. - # - evaluation_parallel_to_training=False: Error. - "evaluation_num_episodes": 10, + # the load to run will be split amongst these. + # If the value is "auto": + # - For `evaluation_parallel_to_training=True`: Will run as many + # episodes/timesteps that fit into the (parallel) training step. + # - For `evaluation_parallel_to_training=False`: Error. + "evaluation_duration": 10, + # The unit, with which to count the evaluation duration. Either "episodes" + # (default) or "timesteps". + "evaluation_duration_unit": "episodes", # Whether to run evaluation in parallel to a Trainer.train() call # using threading. Default=False. # E.g. evaluation_interval=2 -> For every other training iteration, @@ -520,6 +526,9 @@ # Whether to write episode stats and videos to the agent log dir. This is # typically located in ~/ray_results. "monitor": DEPRECATED_VALUE, + # Replaced by `evaluation_duration=10` and + # `evaluation_duration_unit=episodes`. + "evaluation_num_episodes": DEPRECATED_VALUE, } # __sphinx_doc_end__ # yapf: enable @@ -774,19 +783,40 @@ def env_creator_from_classpath(env_context): # Assert that user has not unset "in_evaluation". assert "in_evaluation" not in extra_config or \ extra_config["in_evaluation"] is True + # Merge user-provided eval config with the base config. evaluation_config = merge_dicts(self.config, extra_config) # Validate evaluation config. self.validate_config(evaluation_config) - # Switch on complete_episode rollouts (evaluations are - # always done on n complete episodes) and set the - # `in_evaluation` flag. Also, make sure our rollout fragments - # are short so we don't have more than one episode in one rollout. - evaluation_config.update({ - "batch_mode": "complete_episodes", - "rollout_fragment_length": 1, - "in_evaluation": True, - }) + + # Set the `in_evaluation` flag. + evaluation_config["in_evaluation"] = True + + # Evaluation duration unit: episodes. + # Switch on `complete_episode` rollouts. Also, make sure + # rollout fragments are short so we never have more than one + # episode in one rollout. + if evaluation_config["evaluation_duration_unit"] == "episodes": + evaluation_config.update({ + "batch_mode": "complete_episodes", + "rollout_fragment_length": 1, + }) + # Evaluation duration unit: timesteps. + # Set `batch_mode=truncate_episodes` and set + # `rollout_fragment_length` such that desired steps are divided + # equally amongst workers or - in auto duration mode - set it + # to a reasonable small number. + else: + evaluation_config.update({ + "batch_mode": "truncate_episodes", + "rollout_fragment_length": 10 + if self.config["evaluation_duration"] == "auto" else + self.config["evaluation_duration"] // + (self.config["evaluation_num_workers"] or 1), + }) + logger.debug("using evaluation_config: {}".format(extra_config)) + self.config["evaluation_config"] = evaluation_config + # Create a separate evaluation worker set for evaluation. # If evaluation_num_workers=0, use the evaluation set's local # worker for evaluation, otherwise, use its remote workers @@ -796,7 +826,10 @@ def env_creator_from_classpath(env_context): validate_env=None, policy_class=self.get_default_policy_class(self.config), config=evaluation_config, - num_workers=self.config["evaluation_num_workers"]) + num_workers=self.config["evaluation_num_workers"], + # Don't even create a local worker if num_workers > 0. + local_worker=False, + ) # TODO: Deprecated: In your sub-classes of Trainer, override `setup()` # directly and call super().setup() from within it if you would like the @@ -878,8 +911,9 @@ def step_attempt(self) -> ResultDict: """Attempts a single training step, including evaluation, if required. Override this method in your Trainer sub-classes if you would like to - keep the n attempts (catch worker failures) or override `step()` - directly if you would like to handle worker failures yourself. + keep the n step-attempts logic (catch worker failures) in place or + override `step()` directly if you would like to handle worker + failures yourself. Returns: The results dict with stats/infos on sampling, training, @@ -909,16 +943,29 @@ def step_attempt(self) -> ResultDict: with concurrent.futures.ThreadPoolExecutor() as executor: train_future = executor.submit( lambda: next(self.train_exec_impl)) - if self.config["evaluation_num_episodes"] == "auto": - - # Run at least one `evaluate()` (num_episodes_done - # must be > 0), even if the training is very fast. - def episodes_left_fn(num_episodes_done): - if num_episodes_done > 0 and \ + # Automatically determine duration of the evaluation. + if self.config["evaluation_duration"] == "auto": + unit = self.config["evaluation_duration_unit"] + + # Run at least one `evaluate()`, even if the training + # is very fast. + def episodes_left_fn(remaining_duration): + # Training is done and we already ran at least one + # evaluation -> Nothing left to run. + if remaining_duration > 0 and \ train_future.done(): return 0 - else: + # Count by episodes. -> Run n more + # (n=num eval workers). + elif unit == "episodes": return self.config["evaluation_num_workers"] + # Count by ts. -> Run n*m more + # (n=num eval workers; m=rollout fragment length). + else: + return self.config[ + "evaluation_num_workers"] * \ + self.config["evaluation_config"][ + "rollout_fragment_length"] evaluation_metrics = self.evaluate( episodes_left_fn=episodes_left_fn) @@ -954,19 +1001,29 @@ def fn(env, env_context, task_fn): return step_results @PublicAPI - def evaluate(self, episodes_left_fn: Optional[Callable[[int], int]] = None - ) -> dict: + def evaluate( + self, + episodes_left_fn=None, # deprecated + duration_fn: Optional[Callable[[int], int]] = None, + ) -> dict: """Evaluates current policy under `evaluation_config` settings. Note that this default implementation does not do anything beyond merging evaluation_config with the normal trainer config. Args: - episodes_left_fn: An optional callable taking the already run + duration_fn: An optional callable taking the already run num episodes as only arg and returning the number of episodes left to run. It's used to find out whether evaluation should continue. """ + if episodes_left_fn is not None: + deprecation_warning( + old="Trainer.evaluate(episodes_left_fn)", + new="Trainer.evaluate(duration_fn)", + error=False) + duration_fn = episodes_left_fn + # In case we are evaluating (in a thread) parallel to training, # we may have to re-enable eager mode here (gets disabled in the # thread). @@ -991,22 +1048,24 @@ def evaluate(self, episodes_left_fn: Optional[Callable[[int], int]] = None raise ValueError("Custom eval function must return " "dict of metrics, got {}.".format(metrics)) else: - # How many episodes do we need to run? - # In "auto" mode (only for parallel eval + training): Run one - # episode per eval worker. - num_episodes = self.config["evaluation_num_episodes"] if \ - self.config["evaluation_num_episodes"] != "auto" else \ - (self.config["evaluation_num_workers"] or 1) + # How many episodes/timesteps do we need to run? + # In "auto" mode (only for parallel eval + training): Run as long + # as training lasts. + unit = self.config["evaluation_duration_unit"] + duration = self.config["evaluation_duration"] if \ + self.config["evaluation_duration"] != "auto" else \ + (self.config["evaluation_num_workers"] or 1) * \ + (1 if unit == "episodes" else + self.config["evaluation_config"]["rollout_fragment_length"]) # Default done-function returns True, whenever num episodes # have been completed. - if episodes_left_fn is None: + if duration_fn is None: - def episodes_left_fn(num_episodes_done): - return num_episodes - num_episodes_done + def duration_fn(num_units_done): + return duration - num_units_done - logger.info( - f"Evaluating current policy for {num_episodes} episodes.") + logger.info(f"Evaluating current policy for {duration} {unit}.") metrics = None # No evaluation worker set -> @@ -1014,7 +1073,12 @@ def episodes_left_fn(num_episodes_done): # local worker not having an env. if self.evaluation_workers is None: try: - for _ in range(num_episodes): + # If unit=episodes -> Run n times `sample()` (each sample + # produces exactly 1 episode). + # If unit=ts -> Run 1 `sample()` b/c the + # `rollout_fragment_length` is exactly the desired ts. + iters = duration if unit == "episodes" else 1 + for _ in range(iters): self.workers.local_worker().sample() metrics = collect_metrics(self.workers.local_worker()) except ValueError as e: @@ -1034,33 +1098,37 @@ def episodes_left_fn(num_episodes_done): # Evaluation worker set only has local worker. elif self.config["evaluation_num_workers"] == 0: - for _ in range(num_episodes): + # TODO: explain + iters = duration if unit == "episodes" else 1 + for _ in range(iters): self.evaluation_workers.local_worker().sample() # Evaluation worker set has n remote workers. else: # How many episodes have we run (across all eval workers)? - num_episodes_done = 0 + num_units_done = 0 round_ = 0 while True: - episodes_left_to_do = episodes_left_fn(num_episodes_done) - if episodes_left_to_do <= 0: + units_left_to_do = duration_fn(num_units_done) + if units_left_to_do <= 0: break round_ += 1 batches = ray.get([ w.sample.remote() for i, w in enumerate( self.evaluation_workers.remote_workers()) - if i < episodes_left_to_do + if i < units_left_to_do ]) - # Per our config for the evaluation workers - # (`rollout_fragment_length=1` and - # `batch_mode=complete_episode`), we know that we'll have - # exactly one episode per returned batch. - num_episodes_done += len(batches) - logger.info( - f"Ran round {round_} of parallel evaluation " - f"({num_episodes_done}/{num_episodes} episodes done)") + # 1 episode per returned batch. + if unit == "episodes": + num_units_done += len(batches) + # n timesteps per returned batch. + else: + num_units_done += sum(len(b) for b in batches) + + logger.info(f"Ran round {round_} of parallel evaluation " + f"({num_units_done}/{duration} {unit} done)") + if metrics is None: metrics = collect_metrics( self.evaluation_workers.local_worker(), @@ -1679,6 +1747,7 @@ def _make_workers( policy_class: Type[Policy], config: TrainerConfigDict, num_workers: int, + local_worker: bool = True, ) -> WorkerSet: """Default factory method for a WorkerSet running under this Trainer. @@ -1698,6 +1767,9 @@ def _make_workers( config: The Trainer's config. num_workers: Number of remote rollout workers to create. 0 for local only. + local_worker: Whether to create a local (non @ray.remote) worker + in the returned set as well (default: True). If `num_workers` + is 0, always create a local worker. Returns: The created WorkerSet. @@ -1708,7 +1780,9 @@ def _make_workers( policy_class=policy_class, trainer_config=config, num_workers=num_workers, - logdir=self.logdir) + local_worker=local_worker, + logdir=self.logdir, + ) def _sync_filters_if_needed(self, workers: WorkerSet): if self.config.get("observation_filter", "NoFilter") != "NoFilter": @@ -1985,6 +2059,18 @@ def validate_config(self, config: PartialTrainerConfigDict) -> None: "Got {}".format(config["multiagent"]["count_steps_by"])) # Evaluation settings. + + # Deprecated setting: `evaluation_num_episodes`. + if config["evaluation_num_episodes"] != DEPRECATED_VALUE: + deprecation_warning( + old="evaluation_num_episodes", + new="`evaluation_duration` and `evaluation_duration_unit=" + "episodes`", + error=False) + config["evaluation_duration"] = config["evaluation_num_episodes"] + config["evaluation_duration_unit"] = "episodes" + config["evaluation_num_episodes"] = DEPRECATED_VALUE + # If `evaluation_num_workers` > 0, warn if `evaluation_interval` is # None (also set `evaluation_interval` to 1). if config["evaluation_num_workers"] > 0 and \ @@ -2008,18 +2094,18 @@ def validate_config(self, config: PartialTrainerConfigDict) -> None: "`evaluation_parallel_to_training` to False.") config["evaluation_parallel_to_training"] = False - # If `evaluation_num_episodes=auto`, error if + # If `evaluation_duration=auto`, error if # `evaluation_parallel_to_training=False`. - if config["evaluation_num_episodes"] == "auto": + if config["evaluation_duration"] == "auto": if not config["evaluation_parallel_to_training"]: raise ValueError( - "`evaluation_num_episodes=auto` not supported for " + "`evaluation_duration=auto` not supported for " "`evaluation_parallel_to_training=False`!") # Make sure, it's an int otherwise. - elif not isinstance(config["evaluation_num_episodes"], int): - raise ValueError( - "`evaluation_num_episodes` ({}) must be an int and " - ">0!".format(config["evaluation_num_episodes"])) + elif not isinstance(config["evaluation_duration"], int) or \ + config["evaluation_duration"] <= 0: + raise ValueError("`evaluation_duration` ({}) must be an int and " + ">0!".format(config["evaluation_duration"])) @ExperimentalAPI @staticmethod @@ -2284,7 +2370,7 @@ def _is_multi_agent(self): def __repr__(self): return type(self).__name__ - @Deprecated(new="Trainer.evaluate()", error=False) + @Deprecated(new="Trainer.evaluate()", error=True) def _evaluate(self) -> dict: return self.evaluate() diff --git a/rllib/evaluation/worker_set.py b/rllib/evaluation/worker_set.py index b38c17509c0e..4b6a74b6cab8 100644 --- a/rllib/evaluation/worker_set.py +++ b/rllib/evaluation/worker_set.py @@ -34,15 +34,18 @@ class WorkerSet: Where n may be 0. """ - def __init__(self, - *, - env_creator: Optional[Callable[[EnvContext], EnvType]] = None, - validate_env: Optional[Callable[[EnvType], None]] = None, - policy_class: Optional[Type[Policy]] = None, - trainer_config: Optional[TrainerConfigDict] = None, - num_workers: int = 0, - logdir: Optional[str] = None, - _setup: bool = True): + def __init__( + self, + *, + env_creator: Optional[Callable[[EnvContext], EnvType]] = None, + validate_env: Optional[Callable[[EnvType], None]] = None, + policy_class: Optional[Type[Policy]] = None, + trainer_config: Optional[TrainerConfigDict] = None, + num_workers: int = 0, + local_worker: bool = True, + logdir: Optional[str] = None, + _setup: bool = True, + ): """Initializes a WorkerSet instance. Args: @@ -55,6 +58,9 @@ def __init__(self, trainer_config: Optional dict that extends the common config of the Trainer class. num_workers: Number of remote rollout workers to create. + local_worker: Whether to create a local (non @ray.remote) worker + in the returned set as well (default: True). If `num_workers` + is 0, always create a local worker. logdir: Optional logging directory for workers. _setup: Whether to setup workers. This is only for testing. """ @@ -69,18 +75,25 @@ def __init__(self, self._logdir = logdir if _setup: + # Force a local worker if num_workers == 0 (no remote workers). + # Otherwise, this WorkerSet would be empty. + self._local_worker = None + if num_workers == 0: + local_worker = True + self._local_config = merge_dicts( trainer_config, {"tf_session_args": trainer_config["local_tf_session_args"]}) - # Create a number of remote workers. + # Create a number of @ray.remote workers. self._remote_workers = [] self.add_workers(num_workers) + # Create a local worker, if needed. # If num_workers > 0 and we don't have an env on the local worker, # get the observation- and action spaces for each policy from # the first remote worker (which does have an env). - if self._remote_workers and \ + if local_worker and self._remote_workers and \ not trainer_config.get("create_env_on_driver") and \ (not trainer_config.get("observation_space") or not trainer_config.get("action_space")): @@ -106,17 +119,17 @@ def __init__(self, else: spaces = None - # Always create a local worker. - self._local_worker = self._make_worker( - cls=RolloutWorker, - env_creator=env_creator, - validate_env=validate_env, - policy_cls=self._policy_class, - worker_index=0, - num_workers=num_workers, - config=self._local_config, - spaces=spaces, - ) + if local_worker: + self._local_worker = self._make_worker( + cls=RolloutWorker, + env_creator=env_creator, + validate_env=validate_env, + policy_cls=self._policy_class, + worker_index=0, + num_workers=num_workers, + config=self._local_config, + spaces=spaces, + ) def local_worker(self) -> RolloutWorker: """Returns the local rollout worker.""" @@ -197,7 +210,9 @@ def foreach_worker(self, func: Callable[[RolloutWorker], T]) -> List[T]: Returns: The list of return values of all calls to `func([worker])`. """ - local_result = [func(self.local_worker())] + local_result = [] + if self._local_worker: + local_result = [func(self.local_worker())] remote_results = ray.get( [w.apply.remote(func) for w in self.remote_workers()]) return local_result + remote_results @@ -219,8 +234,10 @@ def foreach_worker_with_index( The first entry in this list are the results of the local worker, followed by all remote workers' results. """ + local_result = [] # Local worker: Index=0. - local_result = [func(self.local_worker(), 0)] + if self._local_worker: + local_result = [func(self.local_worker(), 0)] # Remote workers: Index > 0. remote_results = ray.get([ w.apply.remote(func, i + 1) @@ -247,7 +264,9 @@ def foreach_policy(self, func: Callable[[Policy, PolicyID], T]) -> List[T]: The local workers' results are first, followed by all remote workers' results """ - results = self.local_worker().foreach_policy(func) + results = [] + if self._local_worker: + results = self.local_worker().foreach_policy(func) ray_gets = [] for worker in self.remote_workers(): ray_gets.append( @@ -260,7 +279,10 @@ def foreach_policy(self, func: Callable[[Policy, PolicyID], T]) -> List[T]: @DeveloperAPI def trainable_policies(self) -> List[PolicyID]: """Returns the list of trainable policy ids.""" - return self.local_worker().policies_to_train + if self._local_worker: + return self._local_worker.policies_to_train + else: + raise NotImplementedError @DeveloperAPI def foreach_trainable_policy( @@ -275,7 +297,9 @@ def foreach_trainable_policy( List[any]: The list of n return values of all `func([trainable policy], [ID])`-calls. """ - results = self.local_worker().foreach_trainable_policy(func) + results = [] + if self._local_worker: + results = self.local_worker().foreach_trainable_policy(func) ray_gets = [] for worker in self.remote_workers(): ray_gets.append( @@ -303,7 +327,9 @@ def foreach_env(self, func: Callable[[EnvType], List[T]]) -> List[List[T]]: Returns: The list (workers) of lists (sub environments) of results. """ - local_results = [self.local_worker().foreach_env(func)] + local_results = [] + if self._local_worker: + local_results = [self.local_worker().foreach_env(func)] ray_gets = [] for worker in self.remote_workers(): ray_gets.append(worker.foreach_env.remote(func)) @@ -329,7 +355,11 @@ def foreach_env_with_context( The list (1 item per workers) of lists (1 item per sub-environment) of results. """ - local_results = [self.local_worker().foreach_env_with_context(func)] + local_results = [] + if self._local_worker: + local_results = [ + self.local_worker().foreach_env_with_context(func) + ] ray_gets = [] for worker in self.remote_workers(): ray_gets.append(worker.foreach_env_with_context.remote(func)) diff --git a/rllib/examples/parallel_evaluation_and_training.py b/rllib/examples/parallel_evaluation_and_training.py index 05e81ee3a28f..23544f2d3c1f 100644 --- a/rllib/examples/parallel_evaluation_and_training.py +++ b/rllib/examples/parallel_evaluation_and_training.py @@ -7,11 +7,30 @@ parser = argparse.ArgumentParser() parser.add_argument( - "--evaluation-num-episodes", + "--evaluation-duration", type=lambda v: v if v == "auto" else int(v), default=13, - help="Number of evaluation episodes to run each iteration. " + help="Number of evaluation episodes/timesteps to run each iteration. " "If 'auto', will run as many as possible during train pass.") +parser.add_argument( + "--evaluation-duration-unit", + type=str, + default="episodes", + choices=["episodes", "timesteps"], + help="The unit in which to measure the duration (`episodes` or" + "`timesteps`).") +parser.add_argument( + "--evaluation-num-workers", + type=int, + default=2, + help="The number of evaluation workers to setup. " + "0 for a single local evaluation worker. Note that for values >0, no" + "local evaluation worker will be created (b/c not needed).") +parser.add_argument( + "--evaluation-interval", + type=int, + default=2, + help="Every how many train iterations should we run an evaluation loop?") parser.add_argument( "--run", @@ -51,25 +70,60 @@ class AssertNumEvalEpisodesCallback(DefaultCallbacks): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Keep track of the number of timesteps that have not been reported + # yet in the evaluation metrics as they belong to an ongoing episode. + self.ts_not_reported_yet = 0 + def on_train_result(self, *, trainer, result, **kwargs): - # Make sure we always run exactly n evaluation episodes, + # Make sure we always run exactly the given evaluation duration, # no matter what the other settings are (such as # `evaluation_num_workers` or `evaluation_parallel_to_training`). if "evaluation" in result: hist_stats = result["evaluation"]["hist_stats"] - num_episodes_done = len(hist_stats["episode_lengths"]) - # Compare number of entries in episode_lengths (this is the - # number of episodes actually run) with desired number of - # episodes from the config. - if isinstance(trainer.config["evaluation_num_episodes"], int): - assert num_episodes_done == \ - trainer.config["evaluation_num_episodes"] + # We count in episodes. + if trainer.config["evaluation_duration_unit"] == "episodes": + num_episodes_done = len(hist_stats["episode_lengths"]) + # Compare number of entries in episode_lengths (this is the + # number of episodes actually run) with desired number of + # episodes from the config. + if isinstance(trainer.config["evaluation_duration"], int): + assert num_episodes_done == \ + trainer.config["evaluation_duration"] + # If auto-episodes: Expect at least as many episode as workers + # (each worker's `sample()` is at least called once). + else: + assert trainer.config["evaluation_duration"] == "auto" + assert num_episodes_done >= \ + trainer.config["evaluation_num_workers"] + print("Number of run evaluation episodes: " + f"{num_episodes_done} (ok)!") + # We count in timesteps. else: - assert trainer.config["evaluation_num_episodes"] == "auto" - assert num_episodes_done >= \ - trainer.config["evaluation_num_workers"] - print("Number of run evaluation episodes: " - f"{num_episodes_done} (ok)!") + num_timesteps_reported = sum(hist_stats["episode_lengths"]) + num_timesteps_done = trainer.config["evaluation_duration"] + if num_timesteps_done != "auto": + delta = num_timesteps_done - num_timesteps_reported + # Expected. + if delta >= 0: + self.ts_not_reported_yet += delta + else: + new_ts_not_reported_yet = \ + num_timesteps_done - \ + (num_timesteps_reported - self.overhang) + assert new_ts_not_reported_yet >= 0 + self.ts_not_reported_yet = new_ts_not_reported_yet + # If auto-timesteps: Expect roughly the same number of + # timesteps as were done by the normal workers + # (train_batch_size). + else: + batch_size = trainer.config["train_batch_size"] + assert abs(batch_size - num_timesteps_reported) <= 500 + print("Number of run evaluation timesteps: " + f"{num_timesteps_reported} (ok)!") + + print(f"R={result['evaluation']['episode_reward_mean']}") if __name__ == "__main__": @@ -93,16 +147,18 @@ def on_train_result(self, *, trainer, result, **kwargs): "evaluation_parallel_to_training": True, # Use two evaluation workers. Must be >0, otherwise, # evaluation will run on a local worker and block (no parallelism). - "evaluation_num_workers": 2, + "evaluation_num_workers": args.evaluation_num_workers, # Evaluate every other training iteration (together # with every other call to Trainer.train()). - "evaluation_interval": 2, - # Run for n episodes (properly distribute load amongst all eval - # workers). The longer it takes to evaluate, the more - # sense it makes to use `evaluation_parallel_to_training=True`. + "evaluation_interval": args.evaluation_interval, + # Run for n episodes/timesteps (properly distribute load amongst + # all eval workers). The longer it takes to evaluate, the more sense + # it makes to use `evaluation_parallel_to_training=True`. # Use "auto" to run evaluation for roughly as long as the training # step takes. - "evaluation_num_episodes": args.evaluation_num_episodes, + "evaluation_duration": args.evaluation_duration, + # "episodes" or "timesteps". + "evaluation_duration_unit": args.evaluation_duration_unit, # Use a custom callback that asserts that we are running the # configured exact number of episodes per evaluation OR - in auto From dc0f032cf67f5eeec353a7478ab22af3f874f6be Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 1 Dec 2021 09:10:58 +0100 Subject: [PATCH 3/9] wip --- rllib/examples/parallel_evaluation_and_training.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rllib/examples/parallel_evaluation_and_training.py b/rllib/examples/parallel_evaluation_and_training.py index 23544f2d3c1f..a4ff2da4dbd4 100644 --- a/rllib/examples/parallel_evaluation_and_training.py +++ b/rllib/examples/parallel_evaluation_and_training.py @@ -119,7 +119,8 @@ def on_train_result(self, *, trainer, result, **kwargs): # (train_batch_size). else: batch_size = trainer.config["train_batch_size"] - assert abs(batch_size - num_timesteps_reported) <= 500 + assert abs(batch_size - num_timesteps_reported) <= 500, \ + (batch_size, num_timesteps_reported) print("Number of run evaluation timesteps: " f"{num_timesteps_reported} (ok)!") From 5b1ad15cf6e669e260850def09327fba4b0e6513 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 1 Dec 2021 12:31:01 +0100 Subject: [PATCH 4/9] wip. --- rllib/BUILD | 6 +-- rllib/agents/trainer.py | 39 ++++++++++++------- .../parallel_evaluation_and_training.py | 36 +++++------------ 3 files changed, 38 insertions(+), 43 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 060887c78838..76806c1c67cd 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -2339,7 +2339,7 @@ py_test( ) py_test( - name = "examples/parallel_evaluation_and_training_auto_num_episodes_tf", + name = "examples/parallel_evaluation_and_training_auto_episodes_tf", main = "examples/parallel_evaluation_and_training.py", tags = ["team:ml", "examples", "examples_P"], size = "medium", @@ -2353,7 +2353,7 @@ py_test( tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--framework=tf2", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-duration=211", "--evaluation-duration-unit=timesteps"] + args = ["--as-test", "--framework=tf2", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-num-workers=3", "--evaluation-duration=211", "--evaluation-duration-unit=timesteps"] ) py_test( @@ -2362,7 +2362,7 @@ py_test( tags = ["team:ml", "examples", "examples_P"], size = "medium", srcs = ["examples/parallel_evaluation_and_training.py"], - args = ["--as-test", "--framework=torch", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-duration=auto", "--evaluation-duration-unit=timesteps"] + args = ["--as-test", "--framework=torch", "--stop-reward=30.0", "--num-cpus=6", "--evaluation-num-workers=3", "--evaluation-duration=auto", "--evaluation-duration-unit=timesteps"] ) py_test( diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 5469651519fd..2332adb1ba7e 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -4,6 +4,7 @@ import functools import gym import logging +import math import numpy as np import os import pickle @@ -809,14 +810,15 @@ def env_creator_from_classpath(env_context): # Set `batch_mode=truncate_episodes` and set # `rollout_fragment_length` such that desired steps are divided # equally amongst workers or - in auto duration mode - set it - # to a reasonable small number. + # to a reasonable small number (10). else: evaluation_config.update({ "batch_mode": "truncate_episodes", "rollout_fragment_length": 10 - if self.config["evaluation_duration"] == "auto" else - self.config["evaluation_duration"] // - (self.config["evaluation_num_workers"] or 1), + if self.config["evaluation_duration"] == "auto" else int( + math.ceil( + self.config["evaluation_duration"] / + (self.config["evaluation_num_workers"] or 1))), }) logger.debug("using evaluation_config: {}".format(extra_config)) @@ -954,7 +956,7 @@ def step_attempt(self) -> ResultDict: # Run at least one `evaluate()`, even if the training # is very fast. - def episodes_left_fn(remaining_duration): + def duration_fn(remaining_duration): # Training is done and we already ran at least one # evaluation -> Nothing left to run. if remaining_duration > 0 and \ @@ -973,7 +975,7 @@ def episodes_left_fn(remaining_duration): "rollout_fragment_length"] evaluation_metrics = self.evaluate( - episodes_left_fn=episodes_left_fn) + duration_fn=duration_fn) else: evaluation_metrics = self.evaluate() # Collect the training results from the future. @@ -1057,11 +1059,14 @@ def evaluate( # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. unit = self.config["evaluation_duration_unit"] + eval_cfg = self.config["evaluation_config"] + rollout = eval_cfg["rollout_fragment_length"] + num_envs = eval_cfg["num_envs_per_worker"] duration = self.config["evaluation_duration"] if \ self.config["evaluation_duration"] != "auto" else \ (self.config["evaluation_num_workers"] or 1) * \ - (1 if unit == "episodes" else - self.config["evaluation_config"]["rollout_fragment_length"]) + (1 if unit == "episodes" else rollout) + num_ts_run = 0 # Default done-function returns True, whenever num episodes # have been completed. @@ -1084,7 +1089,7 @@ def duration_fn(num_units_done): # `rollout_fragment_length` is exactly the desired ts. iters = duration if unit == "episodes" else 1 for _ in range(iters): - self.workers.local_worker().sample() + num_ts_run += len(self.workers.local_worker().sample()) metrics = collect_metrics(self.workers.local_worker()) except ValueError as e: if "RolloutWorker has no `input_reader` object" in \ @@ -1103,10 +1108,14 @@ def duration_fn(num_units_done): # Evaluation worker set only has local worker. elif self.config["evaluation_num_workers"] == 0: - # TODO: explain + # If unit=episodes -> Run n times `sample()` (each sample + # produces exactly 1 episode). + # If unit=ts -> Run 1 `sample()` b/c the + # `rollout_fragment_length` is exactly the desired ts. iters = duration if unit == "episodes" else 1 for _ in range(iters): - self.evaluation_workers.local_worker().sample() + num_ts_run += len( + self.evaluation_workers.local_worker().sample()) # Evaluation worker set has n remote workers. else: @@ -1122,14 +1131,17 @@ def duration_fn(num_units_done): batches = ray.get([ w.sample.remote() for i, w in enumerate( self.evaluation_workers.remote_workers()) - if i < units_left_to_do + if i * (1 if unit == "episodes" else rollout * + num_envs) < units_left_to_do ]) # 1 episode per returned batch. if unit == "episodes": num_units_done += len(batches) # n timesteps per returned batch. else: - num_units_done += sum(len(b) for b in batches) + ts = sum(len(b) for b in batches) + num_ts_run += ts + num_units_done += ts logger.info(f"Ran round {round_} of parallel evaluation " f"({num_units_done}/{duration} {unit} done)") @@ -1138,6 +1150,7 @@ def duration_fn(num_units_done): metrics = collect_metrics( self.evaluation_workers.local_worker(), self.evaluation_workers.remote_workers()) + metrics["timesteps_this_iter"] = num_ts_run return {"evaluation": metrics} @DeveloperAPI diff --git a/rllib/examples/parallel_evaluation_and_training.py b/rllib/examples/parallel_evaluation_and_training.py index a4ff2da4dbd4..af51e7a8dbc4 100644 --- a/rllib/examples/parallel_evaluation_and_training.py +++ b/rllib/examples/parallel_evaluation_and_training.py @@ -69,13 +69,7 @@ help="Init Ray in local mode for easier debugging.") -class AssertNumEvalEpisodesCallback(DefaultCallbacks): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # Keep track of the number of timesteps that have not been reported - # yet in the evaluation metrics as they belong to an ongoing episode. - self.ts_not_reported_yet = 0 - +class AssertEvalCallback(DefaultCallbacks): def on_train_result(self, *, trainer, result, **kwargs): # Make sure we always run exactly the given evaluation duration, # no matter what the other settings are (such as @@ -101,26 +95,14 @@ def on_train_result(self, *, trainer, result, **kwargs): f"{num_episodes_done} (ok)!") # We count in timesteps. else: - num_timesteps_reported = sum(hist_stats["episode_lengths"]) - num_timesteps_done = trainer.config["evaluation_duration"] - if num_timesteps_done != "auto": - delta = num_timesteps_done - num_timesteps_reported + num_timesteps_reported = result["evaluation"][ + "timesteps_this_iter"] + num_timesteps_wanted = trainer.config["evaluation_duration"] + if num_timesteps_wanted != "auto": + delta = num_timesteps_wanted - num_timesteps_reported # Expected. - if delta >= 0: - self.ts_not_reported_yet += delta - else: - new_ts_not_reported_yet = \ - num_timesteps_done - \ - (num_timesteps_reported - self.overhang) - assert new_ts_not_reported_yet >= 0 - self.ts_not_reported_yet = new_ts_not_reported_yet - # If auto-timesteps: Expect roughly the same number of - # timesteps as were done by the normal workers - # (train_batch_size). - else: - batch_size = trainer.config["train_batch_size"] - assert abs(batch_size - num_timesteps_reported) <= 500, \ - (batch_size, num_timesteps_reported) + assert abs(delta) < 20, \ + (delta, num_timesteps_wanted, num_timesteps_reported) print("Number of run evaluation timesteps: " f"{num_timesteps_reported} (ok)!") @@ -164,7 +146,7 @@ def on_train_result(self, *, trainer, result, **kwargs): # Use a custom callback that asserts that we are running the # configured exact number of episodes per evaluation OR - in auto # mode - run at least as many episodes as we have eval workers. - "callbacks": AssertNumEvalEpisodesCallback, + "callbacks": AssertEvalCallback, } stop = { From 313294b20f019c4112dc90401a843adc334bafff Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 1 Dec 2021 13:47:39 +0100 Subject: [PATCH 5/9] wip. --- doc/source/rllib-training.rst | 40 +++++++++++++++---- rllib/agents/trainer.py | 9 ++++- .../parallel_evaluation_and_training.py | 2 +- 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index 78990256586a..fc9dab681972 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -729,14 +729,28 @@ Customized Evaluation During Training RLlib will report online training rewards, however in some cases you may want to compute rewards with different settings (e.g., with exploration turned off, or on a specific set -of environment configurations). You can evaluate policies during training by setting -the ``evaluation_interval`` config, and optionally also ``evaluation_num_episodes``, -``evaluation_config``, ``evaluation_num_workers``, and ``custom_eval_function`` -(see `trainer.py `__ for further documentation). - -By default, exploration is left as-is within ``evaluation_config``. -However, you can switch off any exploration behavior for the evaluation workers -via: +of environment configurations). You can activate evaluating policies during training by setting +the ``evaluation_interval`` to an int value (> 0) indicating every how many training calls +an "evaluation step" is run. +One such "evaluation step" runs over ``evaluation_duration`` episodes or timesteps, depending +on the ``evaluation_duration_unit`` setting, which can be either "episodes" (default) or "timesteps". + +Normally, the evaluation step is run after the respective train step. For example, for +``evaluation_interval=2``, the sequence of steps is: ``train, train, eval, train, train, eval, ...``. +For ``evaluation_interval=1``, the sequence is: ``train, eval, train, eval, ...``. +Before each evaluation step, weights from the main model are synchronized to all evaluation workers. +However, it is possible to run evaluation parallel to training via the ``evaluation_parallel_to_training=True`` +config flag. In this case, both steps (train and eval) are run at the same time via threading. +This can speed up the evaluation process significantly, but leads to a 1-iteration delay between reported +training results and evaluation results (the evaluation results are "behind" as they use +slightly outdated model weights). + +When in ``evaluation_parallel_to_training=True`` mode, a special setting: ``evaluation_duration=auto`` +can be used that causes the evaluation step to take roughly as long as the train step. + +The config key ``evaluation_config`` allows you to override any config keys only for +the evaluation workers. For example, to switch off exploration in the evaluation steps, +do: .. code-block:: python @@ -752,6 +766,16 @@ via: policy, even if this is a stochastic one. Setting "explore=False" above will result in the evaluation workers not using this stochastic policy. +Parallelism for the evaluation step is determined via the ``evaluation_num_workers`` +setting. Set this to higher values if you want the desired eval episodes or timesteps to +run as much in parallel as possible. For example, if your ``evaluation_duration=10`` (``evaluation_duration_unit=episodes``) +and ``evaluation_num_workers=10``, each eval worker only has to run 1 episode in each eval step. + +In case you would like to completely customize the evaluation step, set ``custom_eval_function`` in your +config to a callable taking the Trainer object and a WorkerSet object (the evaluation WorkerSet) +and returning a metrics dict. See `trainer.py `__ +for further documentation. + There is an end to end example of how to set up custom online evaluation in `custom_eval.py `__. Note that if you only want to eval your policy at the end of training, you can set ``evaluation_interval: N``, where ``N`` is the number of training iterations before stopping. Below are some examples of how the custom evaluation metrics are reported nested under the ``evaluation`` key of normal training results: diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 2332adb1ba7e..3d3b0f972116 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -671,6 +671,10 @@ def default_logger_creator(config): logger_creator = default_logger_creator + # Evaluation WorkerSet and metrics last returned by `self.evaluate()`. + self.evaluation_workers = None + self.evaluation_metrics = {} + super().__init__(config, logger_creator, remote_checkpoint_dir, sync_function_tpl) @@ -779,8 +783,6 @@ def env_creator_from_classpath(env_context): self.workers, self.config, **self._kwargs_for_execution_plan()) # Evaluation WorkerSet setup. - self.evaluation_workers = None - self.evaluation_metrics = {} # User would like to setup a separate evaluation worker set. if self.config.get("evaluation_num_workers", 0) > 0 or \ self.config.get("evaluation_interval"): @@ -1151,6 +1153,9 @@ def duration_fn(num_units_done): self.evaluation_workers.local_worker(), self.evaluation_workers.remote_workers()) metrics["timesteps_this_iter"] = num_ts_run + + self.evaluation_metrics = metrics + return {"evaluation": metrics} @DeveloperAPI diff --git a/rllib/examples/parallel_evaluation_and_training.py b/rllib/examples/parallel_evaluation_and_training.py index af51e7a8dbc4..cdf75345ad65 100644 --- a/rllib/examples/parallel_evaluation_and_training.py +++ b/rllib/examples/parallel_evaluation_and_training.py @@ -100,7 +100,7 @@ def on_train_result(self, *, trainer, result, **kwargs): num_timesteps_wanted = trainer.config["evaluation_duration"] if num_timesteps_wanted != "auto": delta = num_timesteps_wanted - num_timesteps_reported - # Expected. + # Expect roughly the same (desired // num-eval-workers). assert abs(delta) < 20, \ (delta, num_timesteps_wanted, num_timesteps_reported) print("Number of run evaluation timesteps: " From 199cacc381f0cc010a8063e4445976d98a65b705 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 3 Dec 2021 11:32:16 +0100 Subject: [PATCH 6/9] wip --- doc/source/rllib-training.rst | 79 ++++++++++++++----- rllib/agents/cql/tests/test_cql.py | 2 +- rllib/agents/ddpg/ddpg.py | 2 +- rllib/agents/marwil/tests/test_bc.py | 2 +- rllib/agents/marwil/tests/test_marwil.py | 4 +- rllib/agents/qmix/qmix.py | 2 +- rllib/agents/tests/test_trainer.py | 8 +- rllib/agents/trainer.py | 54 +++++++------ rllib/contrib/maddpg/maddpg.py | 2 +- rllib/evaluate.py | 6 +- rllib/examples/custom_eval.py | 2 +- rllib/examples/custom_input_api.py | 2 +- rllib/examples/env_rendering_and_recording.py | 2 +- rllib/examples/offline_rl.py | 2 +- .../ddpg/pendulum-apex-ddpg.yaml | 2 +- 15 files changed, 109 insertions(+), 62 deletions(-) diff --git a/doc/source/rllib-training.rst b/doc/source/rllib-training.rst index fc9dab681972..fa6a6c2b08b6 100644 --- a/doc/source/rllib-training.rst +++ b/doc/source/rllib-training.rst @@ -729,33 +729,75 @@ Customized Evaluation During Training RLlib will report online training rewards, however in some cases you may want to compute rewards with different settings (e.g., with exploration turned off, or on a specific set -of environment configurations). You can activate evaluating policies during training by setting -the ``evaluation_interval`` to an int value (> 0) indicating every how many training calls -an "evaluation step" is run. -One such "evaluation step" runs over ``evaluation_duration`` episodes or timesteps, depending +of environment configurations). You can activate evaluating policies during training (``Trainer.train()``) by setting +the ``evaluation_interval`` to an int value (> 0) indicating every how many ``Trainer.train()`` +calls an "evaluation step" is run: + +.. code-block:: python + + # Run one evaluation step on every 3rd `Trainer.train()` call. + { + "evaluation_interval": 3, + } + + +One such evaluation step runs over ``evaluation_duration`` episodes or timesteps, depending on the ``evaluation_duration_unit`` setting, which can be either "episodes" (default) or "timesteps". -Normally, the evaluation step is run after the respective train step. For example, for + +.. code-block:: python + + # Every time we do run an evaluation step, run it for exactly 10 episodes. + { + "evaluation_duration": 10, + "evaluation_duration_unit": "episodes", + } + # Every time we do run an evaluation step, run it for close to 200 timesteps. + { + "evaluation_duration": 200, + "evaluation_duration_unit": "timesteps", + } + + +Before each evaluation step, weights from the main model are synchronized to all evaluation workers. + +Normally, the evaluation step is run right after the respective train step. For example, for ``evaluation_interval=2``, the sequence of steps is: ``train, train, eval, train, train, eval, ...``. For ``evaluation_interval=1``, the sequence is: ``train, eval, train, eval, ...``. -Before each evaluation step, weights from the main model are synchronized to all evaluation workers. -However, it is possible to run evaluation parallel to training via the ``evaluation_parallel_to_training=True`` -config flag. In this case, both steps (train and eval) are run at the same time via threading. + +However, it is possible to run evaluation in parallel to training via the ``evaluation_parallel_to_training=True`` +config setting. In this case, both steps (train and eval) are run at the same time via threading. This can speed up the evaluation process significantly, but leads to a 1-iteration delay between reported -training results and evaluation results (the evaluation results are "behind" as they use -slightly outdated model weights). +training results and evaluation results (the evaluation results are behind b/c they use slightly outdated +model weights). + +When running with the ``evaluation_parallel_to_training=True`` setting, a special "auto" value +is supported for ``evaluation_duration``. This can be used to make the evaluation step take +roughly as long as the train step: + +.. code-block:: python + + # Run eval and train at the same time via threading and make sure they roughly + # take the same time, such that the next `Trainer.train()` call can execute + # immediately and not have to wait for a still ongoing (e.g. very long episode) + # evaluation step: + { + "evaluation_interval": 1, + "evaluation_parallel_to_training": True, + "evaluation_duration": "auto", # automatically end evaluation when train step has finished + "evaluation_duration_unit": "timesteps", # <- more fine grained than "episodes" + } -When in ``evaluation_parallel_to_training=True`` mode, a special setting: ``evaluation_duration=auto`` -can be used that causes the evaluation step to take roughly as long as the train step. -The config key ``evaluation_config`` allows you to override any config keys only for +The ``evaluation_config`` key allows you to override any config settings for the evaluation workers. For example, to switch off exploration in the evaluation steps, do: .. code-block:: python # Switching off exploration behavior for evaluation workers - # (see rllib/agents/trainer.py) + # (see rllib/agents/trainer.py). Use any keys in this sub-dict that are + # also supported in the main Trainer config. "evaluation_config": { "explore": False } @@ -767,11 +809,12 @@ do: will result in the evaluation workers not using this stochastic policy. Parallelism for the evaluation step is determined via the ``evaluation_num_workers`` -setting. Set this to higher values if you want the desired eval episodes or timesteps to -run as much in parallel as possible. For example, if your ``evaluation_duration=10`` (``evaluation_duration_unit=episodes``) -and ``evaluation_num_workers=10``, each eval worker only has to run 1 episode in each eval step. +setting. Set this to larger values if you want the desired evaluation episodes or timesteps to +run as much in parallel as possible. For example, if your ``evaluation_duration=10``, +``evaluation_duration_unit=episodes``, and ``evaluation_num_workers=10``, each eval worker +only has to run 1 episode in each eval step. -In case you would like to completely customize the evaluation step, set ``custom_eval_function`` in your +In case you would like to entirely customize the evaluation step, set ``custom_eval_function`` in your config to a callable taking the Trainer object and a WorkerSet object (the evaluation WorkerSet) and returning a metrics dict. See `trainer.py `__ for further documentation. diff --git a/rllib/agents/cql/tests/test_cql.py b/rllib/agents/cql/tests/test_cql.py index 87c744ca7f6c..0d877841545c 100644 --- a/rllib/agents/cql/tests/test_cql.py +++ b/rllib/agents/cql/tests/test_cql.py @@ -58,7 +58,7 @@ def test_cql_compilation(self): config["input_evaluation"] = ["is"] config["evaluation_interval"] = 2 - config["evaluation_num_episodes"] = 10 + config["evaluation_duration"] = 10 config["evaluation_config"]["input"] = "sampler" config["evaluation_parallel_to_training"] = False config["evaluation_num_workers"] = 2 diff --git a/rllib/agents/ddpg/ddpg.py b/rllib/agents/ddpg/ddpg.py index 8381ff525127..52e1b66a4086 100644 --- a/rllib/agents/ddpg/ddpg.py +++ b/rllib/agents/ddpg/ddpg.py @@ -38,7 +38,7 @@ # metrics are already only reported for the lowest epsilon workers. "evaluation_interval": None, # Number of episodes to run per evaluation period. - "evaluation_num_episodes": 10, + "evaluation_duration": 10, # === Model === # Apply a state preprocessor with spec given by the "model" config option diff --git a/rllib/agents/marwil/tests/test_bc.py b/rllib/agents/marwil/tests/test_bc.py index d6ac23489783..acfd4282cef5 100644 --- a/rllib/agents/marwil/tests/test_bc.py +++ b/rllib/agents/marwil/tests/test_bc.py @@ -37,7 +37,7 @@ def test_bc_compilation_and_learning_from_offline_file(self): config["evaluation_interval"] = 3 config["evaluation_num_workers"] = 1 - config["evaluation_num_episodes"] = 5 + config["evaluation_duration"] = 5 config["evaluation_parallel_to_training"] = True # Evaluate on actual environment. config["evaluation_config"] = {"input": "sampler"} diff --git a/rllib/agents/marwil/tests/test_marwil.py b/rllib/agents/marwil/tests/test_marwil.py index 63f7f5dc6332..cdf57d4241a6 100644 --- a/rllib/agents/marwil/tests/test_marwil.py +++ b/rllib/agents/marwil/tests/test_marwil.py @@ -43,7 +43,7 @@ def test_marwil_compilation_and_learning_from_offline_file(self): config["num_workers"] = 2 config["evaluation_num_workers"] = 1 config["evaluation_interval"] = 3 - config["evaluation_num_episodes"] = 5 + config["evaluation_duration"] = 5 config["evaluation_parallel_to_training"] = True # Evaluate on actual environment. config["evaluation_config"] = {"input": "sampler"} @@ -100,7 +100,7 @@ def test_marwil_cont_actions_from_offline_file(self): config["num_workers"] = 1 config["evaluation_num_workers"] = 1 config["evaluation_interval"] = 3 - config["evaluation_num_episodes"] = 5 + config["evaluation_duration"] = 5 config["evaluation_parallel_to_training"] = True # Evaluate on actual environment. config["evaluation_config"] = {"input": "sampler"} diff --git a/rllib/agents/qmix/qmix.py b/rllib/agents/qmix/qmix.py index f1b27ccd576b..68d8332e2778 100644 --- a/rllib/agents/qmix/qmix.py +++ b/rllib/agents/qmix/qmix.py @@ -44,7 +44,7 @@ # metrics are already only reported for the lowest epsilon workers. "evaluation_interval": None, # Number of episodes to run per evaluation period. - "evaluation_num_episodes": 10, + "evaluation_duration": 10, # Switch to greedy actions in evaluation workers. "evaluation_config": { "explore": False, diff --git a/rllib/agents/tests/test_trainer.py b/rllib/agents/tests/test_trainer.py index e15c837f48fa..479d7cae1d90 100644 --- a/rllib/agents/tests/test_trainer.py +++ b/rllib/agents/tests/test_trainer.py @@ -12,7 +12,7 @@ from ray.rllib.agents.trainer import COMMON_CONFIG from ray.rllib.examples.env.multi_agent import MultiAgentCartPole from ray.rllib.examples.parallel_evaluation_and_training import \ - AssertNumEvalEpisodesCallback + AssertEvalCallback from ray.rllib.utils.metrics.learner_info import LEARNER_INFO from ray.rllib.utils.test_utils import framework_iterator @@ -131,13 +131,13 @@ def test_evaluation_option(self): config.update({ "env": "CartPole-v0", "evaluation_interval": 2, - "evaluation_num_episodes": 2, + "evaluation_duration": 2, "evaluation_config": { "gamma": 0.98, }, # Use a custom callback that asserts that we are running the # configured exact number of episodes per evaluation. - "callbacks": AssertNumEvalEpisodesCallback, + "callbacks": AssertEvalCallback, }) for _ in framework_iterator(config, frameworks=("tf", "torch")): @@ -169,7 +169,7 @@ def test_evaluation_wo_evaluation_worker_set(self): "evaluation_interval": None, # Use a custom callback that asserts that we are running the # configured exact number of episodes per evaluation. - "callbacks": AssertNumEvalEpisodesCallback, + "callbacks": AssertEvalCallback, }) for _ in framework_iterator(frameworks=("tf", "torch")): # Setup trainer w/o evaluation worker set and still call diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 3d3b0f972116..3288a45464be 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -809,10 +809,13 @@ def env_creator_from_classpath(env_context): "rollout_fragment_length": 1, }) # Evaluation duration unit: timesteps. - # Set `batch_mode=truncate_episodes` and set - # `rollout_fragment_length` such that desired steps are divided - # equally amongst workers or - in auto duration mode - set it - # to a reasonable small number (10). + # - Set `batch_mode=truncate_episodes` so we don't perform rollouts + # strictly along episode borders. + # Set `rollout_fragment_length` such that desired steps are divided + # equally amongst workers or - in "auto" duration mode - set it + # to a reasonably small number (10), such that a single `sample()` + # call doesn't take too much time so we can stop evaluation as soon + # as possible after the train step is completed. else: evaluation_config.update({ "batch_mode": "truncate_episodes", @@ -929,6 +932,24 @@ def step_attempt(self) -> ResultDict: and - if required - evaluation. """ + def auto_duration_fn(unit, num_eval_workers, eval_cfg, num_units_done): + # Training is done and we already ran at least one + # evaluation -> Nothing left to run. + if num_units_done > 0 and \ + train_future.done(): + return 0 + # Count by episodes. -> Run n more + # (n=num eval workers). + elif unit == "episodes": + return num_eval_workers + # Count by timesteps. -> Run n*m*p more + # (n=num eval workers; m=rollout fragment length; + # p=num-envs-per-worker). + else: + return num_eval_workers * \ + eval_cfg["rollout_fragment_length"] * \ + eval_cfg["num_envs_per_worker"] + # self._iteration gets incremented after this function returns, # meaning that e. g. the first time this function is called, # self._iteration will be 0. @@ -956,28 +977,11 @@ def step_attempt(self) -> ResultDict: if self.config["evaluation_duration"] == "auto": unit = self.config["evaluation_duration_unit"] - # Run at least one `evaluate()`, even if the training - # is very fast. - def duration_fn(remaining_duration): - # Training is done and we already ran at least one - # evaluation -> Nothing left to run. - if remaining_duration > 0 and \ - train_future.done(): - return 0 - # Count by episodes. -> Run n more - # (n=num eval workers). - elif unit == "episodes": - return self.config["evaluation_num_workers"] - # Count by ts. -> Run n*m more - # (n=num eval workers; m=rollout fragment length). - else: - return self.config[ - "evaluation_num_workers"] * \ - self.config["evaluation_config"][ - "rollout_fragment_length"] - evaluation_metrics = self.evaluate( - duration_fn=duration_fn) + duration_fn=functools.partial( + auto_duration_fn, unit, self.config[ + "evaluation_num_workers"], self.config[ + "evaluation_config"])) else: evaluation_metrics = self.evaluate() # Collect the training results from the future. diff --git a/rllib/contrib/maddpg/maddpg.py b/rllib/contrib/maddpg/maddpg.py index 50b9936ef590..b866fc1c421c 100644 --- a/rllib/contrib/maddpg/maddpg.py +++ b/rllib/contrib/maddpg/maddpg.py @@ -37,7 +37,7 @@ # Evaluation interval "evaluation_interval": None, # Number of episodes to run per evaluation period. - "evaluation_num_episodes": 10, + "evaluation_duration": 10, # === Model === # Apply a state preprocessor with spec given by the "model" config option diff --git a/rllib/evaluate.py b/rllib/evaluate.py index 884f1264887c..f1d7c54c8b42 100755 --- a/rllib/evaluate.py +++ b/rllib/evaluate.py @@ -309,8 +309,8 @@ def run(args, parser): # Make sure we have evaluation workers. if not config.get("evaluation_num_workers"): config["evaluation_num_workers"] = config.get("num_workers", 0) - if not config.get("evaluation_num_episodes"): - config["evaluation_num_episodes"] = 1 + if not config.get("evaluation_duration"): + config["evaluation_duration"] = 1 # Hard-override this as it raises a warning by Trainer otherwise. # Makes no sense anyways, to have it set to None as we don't call # `Trainer.train()` here. @@ -401,7 +401,7 @@ def rollout(agent, saver.begin_rollout() eval_result = agent.evaluate()["evaluation"] # Increase timestep and episode counters. - eps = agent.config["evaluation_num_episodes"] + eps = agent.config["evaluation_duration"] episodes += eps steps += eps * eval_result["episode_len_mean"] # Print out results and continue. diff --git a/rllib/examples/custom_eval.py b/rllib/examples/custom_eval.py index 12c191c34502..db4e777324b6 100644 --- a/rllib/examples/custom_eval.py +++ b/rllib/examples/custom_eval.py @@ -181,7 +181,7 @@ def custom_eval_function(trainer, eval_workers): "evaluation_interval": 1, # Run 10 episodes each time evaluation runs. - "evaluation_num_episodes": 10, + "evaluation_duration": 10, # Override the env config for evaluation. "evaluation_config": { diff --git a/rllib/examples/custom_input_api.py b/rllib/examples/custom_input_api.py index 5ac560277d61..e7f71ee153e0 100644 --- a/rllib/examples/custom_input_api.py +++ b/rllib/examples/custom_input_api.py @@ -101,7 +101,7 @@ def input_creator(ioctx: IOContext) -> InputReader: "metrics_smoothing_episodes": 5, "evaluation_interval": 1, "evaluation_num_workers": 2, - "evaluation_num_episodes": 10, + "evaluation_duration": 10, "evaluation_parallel_to_training": True, "evaluation_config": { "input": "sampler", diff --git a/rllib/examples/env_rendering_and_recording.py b/rllib/examples/env_rendering_and_recording.py index 5ac1f08daaa6..23bb9f961b2b 100644 --- a/rllib/examples/env_rendering_and_recording.py +++ b/rllib/examples/env_rendering_and_recording.py @@ -110,7 +110,7 @@ def render(self, mode="rgb"): # Evaluate once per training iteration. "evaluation_interval": 1, # Run evaluation on (at least) two episodes - "evaluation_num_episodes": 2, + "evaluation_duration": 2, # ... using one evaluation worker (setting this to 0 will cause # evaluation to run on the local evaluation worker, blocking # training until evaluation is done). diff --git a/rllib/examples/offline_rl.py b/rllib/examples/offline_rl.py index 02d3a109af9d..5fc366d8d264 100644 --- a/rllib/examples/offline_rl.py +++ b/rllib/examples/offline_rl.py @@ -69,7 +69,7 @@ # Set up evaluation. config["evaluation_num_workers"] = 1 config["evaluation_interval"] = 1 - config["evaluation_num_episodes"] = 10 + config["evaluation_duration"] = 10 # This should be False b/c iterations are very long and this would # cause evaluation to lag one iter behind training. config["evaluation_parallel_to_training"] = False diff --git a/rllib/tuned_examples/ddpg/pendulum-apex-ddpg.yaml b/rllib/tuned_examples/ddpg/pendulum-apex-ddpg.yaml index 1846ed4c724d..8bad4a662473 100644 --- a/rllib/tuned_examples/ddpg/pendulum-apex-ddpg.yaml +++ b/rllib/tuned_examples/ddpg/pendulum-apex-ddpg.yaml @@ -14,4 +14,4 @@ pendulum-apex-ddpg: target_network_update_freq: 50000 tau: 1.0 evaluation_interval: 5 - evaluation_num_episodes: 10 + evaluation_duration: 10 From 58319222f3368bd23fc149b06d4cd2abea8feef3 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 3 Dec 2021 19:15:51 +0100 Subject: [PATCH 7/9] fix --- rllib/agents/trainer.py | 77 +++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 3288a45464be..34432c5b346c 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -784,27 +784,34 @@ def env_creator_from_classpath(env_context): # Evaluation WorkerSet setup. # User would like to setup a separate evaluation worker set. + + # Update with evaluation settings: + user_eval_config = copy.deepcopy(self.config["evaluation_config"]) + # Merge user-provided eval config with the base config. This makes sure + # the eval config is always complete, no matter whether we have eval + # workers or perform evaluation on the (non-eval) local worker. + eval_config = merge_dicts(self.config, user_eval_config) + self.config["evaluation_config"] = eval_config + if self.config.get("evaluation_num_workers", 0) > 0 or \ self.config.get("evaluation_interval"): - # Update env_config with evaluation settings: - extra_config = copy.deepcopy(self.config["evaluation_config"]) + logger.debug(f"Using evaluation_config: {user_eval_config}.") + # Assert that user has not unset "in_evaluation". - assert "in_evaluation" not in extra_config or \ - extra_config["in_evaluation"] is True - # Merge user-provided eval config with the base config. - evaluation_config = merge_dicts(self.config, extra_config) + assert "in_evaluation" not in eval_config or \ + eval_config["in_evaluation"] is True # Validate evaluation config. - self.validate_config(evaluation_config) + self.validate_config(eval_config) # Set the `in_evaluation` flag. - evaluation_config["in_evaluation"] = True + eval_config["in_evaluation"] = True # Evaluation duration unit: episodes. # Switch on `complete_episode` rollouts. Also, make sure # rollout fragments are short so we never have more than one # episode in one rollout. - if evaluation_config["evaluation_duration_unit"] == "episodes": - evaluation_config.update({ + if eval_config["evaluation_duration_unit"] == "episodes": + eval_config.update({ "batch_mode": "complete_episodes", "rollout_fragment_length": 1, }) @@ -817,7 +824,7 @@ def env_creator_from_classpath(env_context): # call doesn't take too much time so we can stop evaluation as soon # as possible after the train step is completed. else: - evaluation_config.update({ + eval_config.update({ "batch_mode": "truncate_episodes", "rollout_fragment_length": 10 if self.config["evaluation_duration"] == "auto" else int( @@ -826,8 +833,7 @@ def env_creator_from_classpath(env_context): (self.config["evaluation_num_workers"] or 1))), }) - logger.debug("using evaluation_config: {}".format(extra_config)) - self.config["evaluation_config"] = evaluation_config + self.config["evaluation_config"] = eval_config # Create a separate evaluation worker set for evaluation. # If evaluation_num_workers=0, use the evaluation set's local @@ -837,7 +843,7 @@ def env_creator_from_classpath(env_context): env_creator=self.env_creator, validate_env=None, policy_class=self.get_default_policy_class(self.config), - config=evaluation_config, + config=eval_config, num_workers=self.config["evaluation_num_workers"], # Don't even create a local worker if num_workers > 0. local_worker=False, @@ -1061,6 +1067,18 @@ def evaluate( raise ValueError("Custom eval function must return " "dict of metrics, got {}.".format(metrics)) else: + if self.evaluation_workers is None and \ + self.workers.local_worker().input_reader is None: + raise ValueError( + "Cannot evaluate w/o an evaluation worker set in " + "the Trainer or w/o an env on the local worker!\n" + "Try one of the following:\n1) Set " + "`evaluation_interval` >= 0 to force creating a " + "separate evaluation worker set.\n2) Set " + "`create_env_on_driver=True` to force the local " + "(non-eval) worker to have an environment to " + "evaluate on.") + # How many episodes/timesteps do we need to run? # In "auto" mode (only for parallel eval + training): Run as long # as training lasts. @@ -1088,29 +1106,14 @@ def duration_fn(num_units_done): # Do evaluation using the local worker. Expect error due to the # local worker not having an env. if self.evaluation_workers is None: - try: - # If unit=episodes -> Run n times `sample()` (each sample - # produces exactly 1 episode). - # If unit=ts -> Run 1 `sample()` b/c the - # `rollout_fragment_length` is exactly the desired ts. - iters = duration if unit == "episodes" else 1 - for _ in range(iters): - num_ts_run += len(self.workers.local_worker().sample()) - metrics = collect_metrics(self.workers.local_worker()) - except ValueError as e: - if "RolloutWorker has no `input_reader` object" in \ - e.args[0]: - raise ValueError( - "Cannot evaluate w/o an evaluation worker set in " - "the Trainer or w/o an env on the local worker!\n" - "Try one of the following:\n1) Set " - "`evaluation_interval` >= 0 to force creating a " - "separate evaluation worker set.\n2) Set " - "`create_env_on_driver=True` to force the local " - "(non-eval) worker to have an environment to " - "evaluate on.") - else: - raise e + # If unit=episodes -> Run n times `sample()` (each sample + # produces exactly 1 episode). + # If unit=ts -> Run 1 `sample()` b/c the + # `rollout_fragment_length` is exactly the desired ts. + iters = duration if unit == "episodes" else 1 + for _ in range(iters): + num_ts_run += len(self.workers.local_worker().sample()) + metrics = collect_metrics(self.workers.local_worker()) # Evaluation worker set only has local worker. elif self.config["evaluation_num_workers"] == 0: From e31b546bf3841fa8d8ba60a314d553fab5760847 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sat, 4 Dec 2021 12:16:07 +0100 Subject: [PATCH 8/9] fix --- rllib/agents/trainer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rllib/agents/trainer.py b/rllib/agents/trainer.py index 759fec069389..849a36e0a7ef 100644 --- a/rllib/agents/trainer.py +++ b/rllib/agents/trainer.py @@ -787,6 +787,11 @@ def env_creator_from_classpath(env_context): # Update with evaluation settings: user_eval_config = copy.deepcopy(self.config["evaluation_config"]) + + # Assert that user has not unset "in_evaluation". + assert "in_evaluation" not in user_eval_config or \ + user_eval_config["in_evaluation"] is True + # Merge user-provided eval config with the base config. This makes sure # the eval config is always complete, no matter whether we have eval # workers or perform evaluation on the (non-eval) local worker. @@ -797,9 +802,6 @@ def env_creator_from_classpath(env_context): self.config.get("evaluation_interval"): logger.debug(f"Using evaluation_config: {user_eval_config}.") - # Assert that user has not unset "in_evaluation". - assert "in_evaluation" not in eval_config or \ - eval_config["in_evaluation"] is True # Validate evaluation config. self.validate_config(eval_config) From e6b6b3b0cd3ce0e3a8763b2769005338e16c1062 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sat, 4 Dec 2021 12:22:07 +0100 Subject: [PATCH 9/9] fix --- rllib/tests/test_rllib_train_and_evaluate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/tests/test_rllib_train_and_evaluate.py b/rllib/tests/test_rllib_train_and_evaluate.py index fb28257c1032..2f1961714865 100644 --- a/rllib/tests/test_rllib_train_and_evaluate.py +++ b/rllib/tests/test_rllib_train_and_evaluate.py @@ -194,7 +194,7 @@ def policy_fn(agent_id, episode, **kwargs): # Test rolling out n steps. result = os.popen( - "python {}/rollout.py --run={} " + "python {}/evaluate.py --run={} " "--steps=400 " "--out=\"{}/rollouts_n_steps.pkl\" --no-render \"{}\"".format( rllib_dir, algo, tmp_dir, last_checkpoint)).read()[:-1]