Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Cleanup ActorManager and WorkerSet: Make all mark_healthy/healthy_only method args True by default. #44993

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,6 @@ def setup(self, config: AlgorithmConfig) -> None:
)
self.workers.foreach_worker(
lambda w: w.set_is_policy_to_train(policies_to_train),
healthy_only=True,
)

# Sync the weights from the learner group to the rollout workers.
Expand Down Expand Up @@ -1255,10 +1254,9 @@ def _env_runner_remote(worker, num, round, iter):
func=functools.partial(
_env_runner_remote, num=_num, round=_round, iter=algo_iteration
),
healthy_only=True,
)
results = self.evaluation_workers.fetch_ready_async_reqs(
mark_healthy=True, return_obj_refs=False, timeout_seconds=0.01
return_obj_refs=False, timeout_seconds=0.01
)
for wid, (env_s, ag_s, metrics, iter) in results:
if iter != self.iteration:
Expand All @@ -1270,10 +1268,9 @@ def _env_runner_remote(worker, num, round, iter):
else:
self.evaluation_workers.foreach_worker_async(
func=lambda w: (w.sample(), w.get_metrics(), algo_iteration),
healthy_only=True,
)
results = self.evaluation_workers.fetch_ready_async_reqs(
mark_healthy=True, return_obj_refs=False, timeout_seconds=0.01
return_obj_refs=False, timeout_seconds=0.01
)
for wid, (batch, metrics, iter) in results:
if iter != self.iteration:
Expand Down Expand Up @@ -1398,10 +1395,9 @@ def _env_runner_remote(worker, num, round, iter):
func=functools.partial(
_env_runner_remote, num=_num, round=_round, iter=algo_iteration
),
healthy_only=True,
)
results = self.evaluation_workers.fetch_ready_async_reqs(
mark_healthy=True, return_obj_refs=False, timeout_seconds=0.01
return_obj_refs=False, timeout_seconds=0.01
)
# Make sure we properly time out if we have not received any results
# for more than `time_out` seconds.
Expand Down Expand Up @@ -1444,7 +1440,7 @@ def _env_runner_remote(worker, num, round, iter):
remote_worker_ids=selected_eval_worker_ids,
)
results = self.evaluation_workers.fetch_ready_async_reqs(
mark_healthy=True, return_obj_refs=False, timeout_seconds=0.01
return_obj_refs=False, timeout_seconds=0.01
)
# Make sure we properly time out if we have not received any results
# for more than `time_out` seconds.
Expand Down Expand Up @@ -1531,16 +1527,16 @@ def _env_runner_remote(worker, num, round, iter):
@OverrideToImplementCustomLogic
@DeveloperAPI
def restore_workers(self, workers: WorkerSet) -> None:
"""Try syncing previously failed and restarted workers with local, if necessary.
"""Try bringing back unhealthy EnvRunners and - if successful - sync with local.

Algorithms that use custom EnvRunners may override this method to
disable default, and create custom restoration logics. Note that "restoring"
disable the default, and create custom restoration logics. Note that "restoring"
does not include the actual restarting process, but merely what should happen
after such a restart of a (previously failed) worker.

Args:
workers: The WorkerSet to restore. This may be Rollout or Evaluation
workers.
workers: The WorkerSet to restore. This may be the training or the
evaluation WorkerSet.
"""
# If `workers` is None, or
# 1. `workers` (WorkerSet) does not have a local worker, and
Expand Down Expand Up @@ -1577,9 +1573,7 @@ def restore_workers(self, workers: WorkerSet) -> None:
remote_worker_ids=restored,
# Don't update the local_worker, b/c it's the one we are synching from.
local_worker=False,
timeout_seconds=self.config.worker_restore_timeout_s,
# Bring back actor after successful state syncing.
mark_healthy=True,
timeout_seconds=self.config.env_runner_restore_timeout_s,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this timeout and restoration time in general influence in any way the sampling (blocking it or slowing down for example)? I ask b/c in this case the sample_timeout_s could lead to no samples returned by synchronous_parallel_sample and lead to errors.

)

# Fire the callback for re-created workers.
Expand Down Expand Up @@ -2279,7 +2273,7 @@ def fn(worker):
)

# Update all EnvRunner workers.
self.workers.foreach_worker(fn, local_worker=True, healthy_only=True)
self.workers.foreach_worker(fn, local_worker=True)

# Update each Learner's `policies_to_train` information, but only
# if the arg is explicitly provided here.
Expand All @@ -2292,11 +2286,7 @@ def fn(worker):

# Update the evaluation worker set's workers, if required.
if evaluation_workers and self.evaluation_workers is not None:
self.evaluation_workers.foreach_worker(
fn,
local_worker=True,
healthy_only=True,
)
self.evaluation_workers.foreach_worker(fn, local_worker=True)

@OldAPIStack
def export_policy_model(
Expand Down Expand Up @@ -2901,7 +2891,6 @@ def __setstate__(self, state) -> None:
self.workers.foreach_worker(
lambda w: w.set_state(ray.get(remote_state_ref)),
local_worker=False,
healthy_only=False,
)
if self.evaluation_workers:
# Avoid `state` being pickled into the remote function below.
Expand All @@ -2915,10 +2904,8 @@ def _setup_eval_worker(w):

# If evaluation workers are used, also restore the policies
# there in case they are used for evaluation purpose.
self.evaluation_workers.foreach_worker(
_setup_eval_worker,
healthy_only=False,
)
self.evaluation_workers.foreach_worker(_setup_eval_worker)

# If necessary, restore replay data as well.
if self.local_replay_buffer is not None:
# TODO: Experimental functionality: Restore contents of replay
Expand Down Expand Up @@ -3309,7 +3296,7 @@ def _run_one_training_iteration_and_evaluation_in_parallel_wo_thread(

# Collect the evaluation results.
eval_results = self.evaluation_workers.fetch_ready_async_reqs(
mark_healthy=True, return_obj_refs=False, timeout_seconds=time_out
return_obj_refs=False, timeout_seconds=time_out
)
for wid, (batch, metrics, iter) in eval_results:
# Skip results from an older iteration.
Expand Down
10 changes: 6 additions & 4 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def __init__(self, algo_class: Optional[type] = None):
self.delay_between_env_runner_restarts_s = 60.0
self.restart_failed_sub_environments = False
self.num_consecutive_env_runner_failures_tolerance = 100
self.env_runner_health_probe_timeout_s = 60
self.env_runner_health_probe_timeout_s = 30
self.env_runner_restore_timeout_s = 1800

# `self.rl_module()`
Expand Down Expand Up @@ -2790,9 +2790,11 @@ def fault_tolerance(
failures, the EnvRunner itself is NOT affected and won't throw any
errors as the flawed sub-environment is silently restarted under the
hood.
env_runner_health_probe_timeout_s: Max amount of time (in seconds) we should
spend waiting for health probe calls to finish. Health pings are very
cheap, so the default is 1 minute.
env_runner_health_probe_timeout_s: Max amount of time in seconds, we should
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well: Any influence on/by sample_timeout_s?

spend waiting for EnvRunner health probe calls
(`EnvRunner.ping.remote()`) to respond. Health pings are very cheap,
however, we perform the health check via a blocking `ray.get()`, so the
default value should not be too large.
env_runner_restore_timeout_s: Max amount of time we should wait to restore
states on recovered EnvRunner actors. Default is 30 mins.

Expand Down
6 changes: 1 addition & 5 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ def training_step(self) -> ResultDict:
if self._aggregator_actor_manager:
self._aggregator_actor_manager.probe_unhealthy_actors(
timeout_seconds=self.config.env_runner_health_probe_timeout_s,
mark_healthy=True,
)

if self.config.enable_rl_module_and_learner:
Expand Down Expand Up @@ -909,10 +908,7 @@ def get_samples_from_workers(
# local worker. Otherwise just return an empty list.
if self.workers.num_healthy_remote_workers() > 0:
# Perform asynchronous sampling on all (remote) rollout workers.
self.workers.foreach_worker_async(
lambda worker: worker.sample(),
healthy_only=True,
)
self.workers.foreach_worker_async(lambda worker: worker.sample())
sample_batches: List[
Tuple[int, ObjectRef]
] = self.workers.fetch_ready_async_reqs(
Expand Down
3 changes: 0 additions & 3 deletions rllib/algorithms/tests/test_worker_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ def _do_test_failing_ignore(self, config: AlgorithmConfig, fail_eval: bool = Fal
"evaluation": True,
},
}

print(config)

algo = config.build()
algo.train()

Expand Down
Loading
Loading