Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] By-pass Evaluation workers when doing OPE #30135

Merged
merged 37 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
04e3838
wip: working on wis
kouroshHakha Nov 8, 2022
3dd5c17
wip: added is
kouroshHakha Nov 8, 2022
678ab3a
wip: parallelized feature_importance using ray core and added estimat…
kouroshHakha Nov 9, 2022
e283531
updated DR
kouroshHakha Nov 9, 2022
550c0eb
wip added dm
kouroshHakha Nov 9, 2022
732aeb9
lint and resource allocation
kouroshHakha Nov 10, 2022
6b5ebde
parallel to trianing done
kouroshHakha Nov 10, 2022
24b8c53
fixing some unittests
kouroshHakha Nov 10, 2022
7c36c71
fixed marwil tests
kouroshHakha Nov 10, 2022
c61a49d
fixed unittest
kouroshHakha Nov 10, 2022
073a4fd
for episodic RL on OPE fall back to the previous method
kouroshHakha Nov 10, 2022
3f66599
lint
kouroshHakha Nov 10, 2022
4e5ec05
added docstring for algorithm.py
kouroshHakha Nov 10, 2022
a0ca2e3
docstrings
kouroshHakha Nov 10, 2022
41f62b8
updated feature_imporance docstrings and code to remove remove_t_dim …
kouroshHakha Nov 10, 2022
d320f05
updated docstring feature_importance
kouroshHakha Nov 10, 2022
42a676a
docstrings for is
kouroshHakha Nov 10, 2022
bb797c2
docstrings
kouroshHakha Nov 10, 2022
ae2e376
fqe docstrings
kouroshHakha Nov 10, 2022
916a4ef
docstrings
kouroshHakha Nov 10, 2022
c797f0a
unittested is and wis
kouroshHakha Nov 11, 2022
cdc1b67
minor fix on dm
kouroshHakha Nov 14, 2022
4c497c7
1. fixed MARWIL bug 2. fixed algo_class being empty in AlgorithmConfi…
kouroshHakha Nov 14, 2022
e0ebf5f
remove some old mysterious ddpg validation check which does not make …
kouroshHakha Nov 14, 2022
877f7b2
Merge branch 'master' into remove-evalworkers-in-ope
kouroshHakha Nov 14, 2022
18db127
Merge branch 'master' into remove-evalworkers-in-ope
kouroshHakha Nov 14, 2022
194fde8
Merge branch 'remove-evalworkers-in-ope' of github.com:kouroshHakha/r…
kouroshHakha Nov 14, 2022
275be35
removed resource pre-allocation for ray dataset per each tune trial
kouroshHakha Nov 14, 2022
0b7bbd8
fixed some previous review points
kouroshHakha Nov 14, 2022
e72e8c4
lint
kouroshHakha Nov 14, 2022
4828f03
added ste
kouroshHakha Nov 14, 2022
72e2bd6
lint
kouroshHakha Nov 14, 2022
7d93827
added some more comments to explain the resource allocation problem w…
kouroshHakha Nov 14, 2022
8641256
fixed ci
kouroshHakha Nov 14, 2022
5fbb9c8
fixed doc
kouroshHakha Nov 15, 2022
43bacc4
addressed jun's comments
kouroshHakha Nov 15, 2022
eb82c84
raise value errors
kouroshHakha Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 112 additions & 34 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Union,
)
from ray.rllib.offline.offline_evaluator import OfflineEvaluator
from ray.rllib.offline.offline_evaluation_utils import remove_time_dim
import tree

import ray
Expand All @@ -52,7 +53,7 @@
from ray.rllib.execution.parallel_requests import AsyncRequestsManager
from ray.rllib.execution.rollout_ops import synchronous_parallel_sample
from ray.rllib.execution.train_ops import multi_gpu_train_one_step, train_one_step
from ray.rllib.offline import get_offline_io_resource_bundles
from ray.rllib.offline import get_dataset_and_shards
from ray.rllib.offline.estimators import (
OffPolicyEstimator,
ImportanceSampling,
Expand Down Expand Up @@ -592,7 +593,8 @@ def setup(self, config: AlgorithmConfig) -> None:

# Evaluation WorkerSet setup.
# User would like to setup a separate evaluation worker set.
if self.config.evaluation_num_workers > 0 or self.config.evaluation_interval:
# Note: We skip workerset creation if we need to do offline evaluation
if self._should_create_evaluation_rollout_workers(self.evaluation_config):
_, env_creator = self._get_env_id_and_creator(
self.evaluation_config.env, self.evaluation_config
)
Expand Down Expand Up @@ -620,6 +622,26 @@ def setup(self, config: AlgorithmConfig) -> None:
)
self._evaluation_weights_seq_number = 0

self.evaluation_dataset = None
if (
self.evaluation_config.off_policy_estimation_methods
and not self.evaluation_config.ope_split_batch_by_episode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have lost tracked a little bit. what if there are off_policy_estimation_methods and ope_split_batch_by_episode is True? how is that mode handled?
kinda of feel like we need a single functoin def _setup_evaluation(self), and in there, a simple list of different online/offline evaluation types that we may have, like:

def _setup_evaluation(self):
    if (
        self.evaluation_config.off_policy_estimation_methods and
        not self.evaluation_config.ope_split_batch_by_episode
    ):
        self._setup_offline_bandit_eval()  # DS based offline bandit data eval.
    elif self.config.evaluation_num_workers > 0:
        assert self.config.evaluation_config.env, ...
        self._setup_evaluation_workers()  # Online eval.
    else:
        # What do we do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ope is specified but not split_batch_by_episode we fall back to the old behavior. i.e. use evaluation workers that each have a shard of the dataset. It is not clear how I could use ray dataset map_batches to do processing on an episode level. In the future we can do that consolidation but till then let's just go back to the old behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. but how do I tell this is the behavior here?
like we are gonna get to this if statement of off_policy_estimation_methods and not ope_split_batch_by_episode, if it doesn't fit, where do we fall to? how do I know the evaluation workers are guaranteed to be there?
to be clear, I am not saying the behavior is bad, I am just saying maybe we can write the code in a way that makes this flow clearer. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. Let me explain what happens:
if this condition is true, you'll set self.evaluation_dataset to a dataset otherwise it's None. In self.evaluate() if self.evaluation_dataset is None you fall back on to the old behavior. which is using evaluation_workers. Now the question is have we actually created evaluation_workers when they where necessary? The answer is yes. It's in the setup() function where you call self._should_create_evaluation_rollout_workers(self.evaluation_config). How do you think this should be re-written so that it is more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fine for now.
personally I will feel a lot safer, if this whole thing is a single if...else..., then I don't all cases are covered.
right now creation of self.evalution_workers is conditioned on self._should_create_evaluation_rollout_workers(), and creation of evaluation_dataset is conditioned on off_policy_estimation_methods and not ope_split_batch_by_episode.
it's just hard to tell if these 2 are mutually exclusive or not. you get the idea.

):
# the num worker is set to 0 to avoid creating shards. The dataset will not
# be repartioned to num_workers blocks.
logger.info("Creating evaluation dataset ...")
ds, _ = get_dataset_and_shards(self.evaluation_config, num_workers=0)

# Dataset should be in form of one episode per row. in case of bandits each
# row is just one time step. To make the computation more efficient later
# we remove the time dimension here.
parallelism = self.evaluation_config.evaluation_num_workers or 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have a parallelism parameter in evaluation_config. we don't want to use it in place of evaluation_num_workers?
it reads a little weird because on 1 hand, we say we are not gonna create evaluation workers, on the other hand, evaluation_num_workers actually play a critical role here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current users (you know who :)) get parallelism by defining the number of evaluation workers. I would love to actually change that at some point. To avoid further confusion I am getting rid of parallelism so it is less confusing. If you look in validate I am overriding the parallelism in evaluation_config.

batch_size = max(ds.count() // parallelism, 1)
self.evaluation_dataset = ds.map_batches(
remove_time_dim, batch_size=batch_size
)
logger.info("Evaluation dataset created")

self.reward_estimators: Dict[str, OffPolicyEstimator] = {}
ope_types = {
"is": ImportanceSampling,
Expand Down Expand Up @@ -654,7 +676,7 @@ def setup(self, config: AlgorithmConfig) -> None:
raise ValueError(
f"Unknown off_policy_estimation type: {method_type}! Must be "
"either a class path or a sub-class of ray.rllib."
"offline.estimators.off_policy_estimator::OffPolicyEstimator"
"offline.offline_evaluator::OfflineEvaluator"
)

# Run `on_algorithm_init` callback after initialization is done.
Expand Down Expand Up @@ -803,6 +825,9 @@ def evaluate(
# Call the `_before_evaluate` hook.
self._before_evaluate()

if self.evaluation_dataset is not None:
return {"evaluation": self._run_offline_evaluation()}

# Sync weights to the evaluation WorkerSet.
if self.evaluation_workers is not None:
self.evaluation_workers.sync_weights(
Expand Down Expand Up @@ -1976,50 +2001,62 @@ def default_resource_request(
# workers to determine their CPU/GPU resource needs.

# Convenience config handles.
default_config = cls.get_default_config()
# TODO: Have to make this work for now for AlgorithmConfigs (returned by
# get_default_config(). Use only AlgorithmConfigs once all Algorithms
# return an AlgorothmConfig from their get_default_config() method.
if not isinstance(default_config, dict):
default_config = default_config.to_dict()
cf = dict(default_config, **config)
eval_cf = cf["evaluation_config"] or {}
cf = cls.get_default_config().update_from_dict(config)
cf.validate()
cf.freeze()

# get evaluation config
eval_cf = cf.get_evaluation_config_object()
eval_cf.validate()
eval_cf.freeze()

# resources for local worker
local_worker = {
"CPU": cf["num_cpus_for_driver"],
"GPU": 0 if cf["_fake_gpus"] else cf["num_gpus"],
"CPU": cf.num_cpus_for_local_worker,
"GPU": 0 if cf._fake_gpus else cf.num_gpus,
}

bundles = [local_worker]

# resources for rollout env samplers
rollout_workers = [
{
"CPU": cf["num_cpus_per_worker"],
"GPU": cf["num_gpus_per_worker"],
**cf["custom_resources_per_worker"],
"CPU": cf.num_cpus_per_worker,
"GPU": cf.num_gpus_per_worker,
**cf.custom_resources_per_worker,
}
for _ in range(cf["num_workers"])
for _ in range(cf.num_rollout_workers)
]

bundles = [local_worker] + rollout_workers

if cf["evaluation_interval"]:
# resources for evaluation env samplers or datasets (if any)
if cls._should_create_evaluation_rollout_workers(eval_cf):
# Evaluation workers.
# Note: The local eval worker is located on the driver CPU.
bundles += [
evaluation_bundle = [
{
"CPU": eval_cf.get(
"num_cpus_per_worker", cf["num_cpus_per_worker"]
),
"GPU": eval_cf.get(
"num_gpus_per_worker", cf["num_gpus_per_worker"]
),
**eval_cf.get(
"custom_resources_per_worker", cf["custom_resources_per_worker"]
),
"CPU": eval_cf.num_cpus_per_worker,
"GPU": eval_cf.num_gpus_per_worker,
**eval_cf.custom_resources_per_worker,
}
for _ in range(cf["evaluation_num_workers"])
for _ in range(eval_cf.evaluation_num_workers)
]

# In case our I/O reader/writer requires conmpute resources.
bundles += get_offline_io_resource_bundles(cf)
else:
# resources for offline dataset readers during evaluation
# Note (Kourosh): we should not claim extra workers for
# training on the offline dataset, since rollout workers have already
# claimed it.
# Another Note (Kourosh): dataset reader will not use placement groups so
# whatever we specify here won't matter because dataset won't even use it.
# Disclaimer: using ray dataset in tune may cause deadlock when multiple
# tune trials get scheduled on the same node and do not leave any spare
# resources for dataset operations. The workaround is to limit the
# max_concurrent trials so that some spare cpus are left for dataset
# operations. This behavior should get fixed by the dataset team. more info
# found here:
# https://docs.ray.io/en/master/data/dataset-internals.html#datasets-tune
evaluation_bundle = []

bundles += rollout_workers + evaluation_bundle

# Return PlacementGroupFactory containing all needed resources
# (already properly defined as device bundles).
Expand Down Expand Up @@ -2632,6 +2669,7 @@ def _run_one_evaluation(
Returns:
The results dict from the evaluation call.
"""

eval_results = {
"evaluation": {
"episode_reward_max": np.nan,
Expand Down Expand Up @@ -2715,6 +2753,46 @@ def _run_one_training_iteration_and_evaluation_in_parallel(

return results, train_iter_ctx

def _run_offline_evaluation(self):
"""Runs offline evaluation via `OfflineEvaluator.estimate_on_dataset()` API.

This method will be used when `evaluation_dataset` is provided.
Note: This will only work if the policy is a single agent policy.

Returns:
The results dict from the offline evaluation call.
"""
assert len(self.workers.local_worker().policy_map) == 1

parallelism = self.evaluation_config.evaluation_num_workers or 1
offline_eval_results = {"off_policy_estimator": {}}
for evaluator_name, offline_evaluator in self.reward_estimators.items():
offline_eval_results["off_policy_estimator"][
evaluator_name
] = offline_evaluator.estimate_on_dataset(
self.evaluation_dataset,
n_parallelism=parallelism,
)
return offline_eval_results

@classmethod
def _should_create_evaluation_rollout_workers(cls, eval_config: "AlgorithmConfig"):
"""Determines whether we need to create evaluation workers.

Returns False if we need to run offline evaluation
(with ope.estimate_on_dastaset API) or when local worker is to be used for
evaluation. Note: We only use estimate_on_dataset API with bandits for now.
That is when ope_split_batch_by_episode is False. TODO: In future we will do
the same for episodic RL OPE.
"""
run_offline_evaluation = (
eval_config.get("off_policy_estimation_methods")
and not eval_config.ope_split_batch_by_episode
)
return not run_offline_evaluation and (
eval_config.evaluation_num_workers > 0 or eval_config.evaluation_interval
)

@staticmethod
def _automatic_evaluation_duration_fn(
unit, num_eval_workers, eval_cfg, train_future, num_units_done
Expand Down
53 changes: 50 additions & 3 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,10 @@ def validate(self) -> None:
from ray.rllib.policy.dynamic_tf_policy import DynamicTFPolicy
from ray.rllib.policy.torch_policy import TorchPolicy

default_policy_cls = self.algo_class.get_default_policy_class(self)
default_policy_cls = None
if self.algo_class:
default_policy_cls = self.algo_class.get_default_policy_class(self)

policies = self.policies
policy_specs = (
[
Expand Down Expand Up @@ -680,6 +683,29 @@ def validate(self) -> None:
f"config.framework({self.framework_str})!"
)

if self.input_ == "sampler" and self.off_policy_estimation_methods:
raise ValueError(
"Off-policy estimation methods can only be used if the input is a "
"dataset. We currently do not support applying off_policy_esitmation "
"method on a sampler input."
)

if self.input_ == "dataset":
# if we need to read a ray dataset set the parallelism and
# num_cpus_per_read_task from rollout worker settings
self.input_config["num_cpus_per_read_task"] = self.num_cpus_per_worker
if self.in_evaluation:
# If using dataset for evaluation, the parallelism gets set to
# evaluation_num_workers for backward compatibility and num_cpus gets
# set to num_cpus_per_worker from rollout worker. User only needs to
# set evaluation_num_workers.
self.input_config["parallelism"] = self.evaluation_num_workers or 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so we should simply use "parallelism" in our code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in code yes. from user it's evaluation_num_workers.

else:
# If using dataset for training, the parallelism and num_cpus gets set
# based on rollout worker parameters. This is for backwards
# compatibility for now. User only needs to set num_rollout_workers.
self.input_config["parallelism"] = self.num_rollout_workers or 1

def build(
self,
env: Optional[Union[str, EnvType]] = None,
Expand Down Expand Up @@ -1460,8 +1486,10 @@ def offline_data(
- A callable that takes an `IOContext` object as only arg and returns a
ray.rllib.offline.InputReader.
- A string key that indexes a callable with tune.registry.register_input
input_config: Arguments accessible from the IOContext for configuring custom
input.
input_config: Arguments that describe the settings for reading the input.
If input is `sample`, this will be environment configuation, e.g.
`env_name` and `env_config`, etc. See `EnvContext` for more info.
If the input is `dataset`, this will be e.g. `format`, `path`.
actions_in_input_normalized: True, if the actions in a given offline "input"
are already normalized (between -1.0 and 1.0). This is usually the case
when the offline file has been generated by another RLlib algorithm
Expand Down Expand Up @@ -1497,6 +1525,25 @@ def offline_data(
if input_ is not NotProvided:
self.input_ = input_
if input_config is not NotProvided:
if not isinstance(input_config, dict):
raise ValueError(
f"input_config must be a dict, got {type(input_config)}."
)
# TODO (Kourosh) Once we use a complete sepration between rollout worker
# and input dataset reader we can remove this.
# For now Error out if user attempts to set these parameters.
msg = "{} should not be set in the input_config. RLlib will use {} instead."
if input_config.get("num_cpus_per_read_task") is not None:
raise ValueError(
msg.format("num_cpus_per_read_task", "num_cpus_per_worker")
)
if input_config.get("parallelism") is not None:
if self.in_evaluation:
raise ValueError(
msg.format("parallelism", "evaluation_num_workers")
)
else:
raise ValueError(msg.format("parallelism", "num_rollout_workers"))
self.input_config = input_config
if actions_in_input_normalized is not NotProvided:
self.actions_in_input_normalized = actions_in_input_normalized
Expand Down
5 changes: 0 additions & 5 deletions rllib/algorithms/bc/bc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ def __init__(self, algo_class=None):
# __sphinx_doc_end__
# fmt: on

# TODO: Remove this when the off_policy_estimation_methods
# default config is removed from MARWIL
# No off-policy estimation.
self.off_policy_estimation_methods = {}

@override(MARWILConfig)
def validate(self) -> None:
super().validate()
Expand Down
6 changes: 0 additions & 6 deletions rllib/algorithms/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,6 @@ def validate(self) -> None:
f"Try setting config.rollouts(rollout_fragment_length={self.n_step})."
)

if self.model["custom_model"]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apparently we never called this validate() function method before when custom_model was provided.

raise ValueError(
"Try setting config.training(use_state_preprocessor=True) "
"since a custom model was specified."
)

if self.grad_clip is not None and self.grad_clip <= 0.0:
raise ValueError("`grad_clip` value must be > 0.0!")

Expand Down
20 changes: 4 additions & 16 deletions rllib/algorithms/marwil/marwil.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
multi_gpu_train_one_step,
train_one_step,
)
from ray.rllib.offline.estimators import ImportanceSampling, WeightedImportanceSampling
from ray.rllib.policy.policy import Policy
from ray.rllib.utils.annotations import override
from ray.rllib.utils.deprecation import (
Deprecated,
deprecation_warning,
)
from ray.rllib.utils.deprecation import Deprecated, deprecation_warning
from ray.rllib.utils.metrics import (
NUM_AGENT_STEPS_SAMPLED,
NUM_ENV_STEPS_SAMPLED,
Expand Down Expand Up @@ -100,13 +96,6 @@ def __init__(self, algo_class=None):
self.train_batch_size = 2000
# __sphinx_doc_end__
# fmt: on

# TODO: Delete this and change off_policy_estimation_methods to {}
# Also remove the same section from BC
self.off_policy_estimation_methods = {
"is": {"type": ImportanceSampling},
"wis": {"type": WeightedImportanceSampling},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we hardcode this before??
also, not seeing same changes for bc.py?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have no idea :) This is probably from before Rohan started on this project.

self._set_off_policy_estimation_methods = False

@override(AlgorithmConfig)
Expand Down Expand Up @@ -169,7 +158,6 @@ def evaluation(
**kwargs,
) -> "MARWILConfig":
"""Sets the evaluation related configuration.

Returns:
This updated AlgorithmConfig object.
"""
Expand All @@ -190,9 +178,9 @@ def build(
) -> "Algorithm":
if not self._set_off_policy_estimation_methods:
deprecation_warning(
old="MARWIL currently uses off_policy_estimation_methods: "
f"{self.off_policy_estimation_methods} by default. This will"
"change to off_policy_estimation_methods: {} in a future release."
old="MARWIL used to have off_policy_estimation_methods "
"is and wis by default. This has"
"changed to off_policy_estimation_methods: \{\}."
"If you want to use an off-policy estimator, specify it in"
".evaluation(off_policy_estimation_methods=...)",
error=False,
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/marwil/tests/test_marwil.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class TestMARWIL(unittest.TestCase):
@classmethod
def setUpClass(cls):
ray.init(num_cpus=4)
ray.init()

@classmethod
def tearDownClass(cls):
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/tests/test_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class TestAlgorithm(unittest.TestCase):
@classmethod
def setUpClass(cls):
ray.init(num_cpus=6)
ray.init()

@classmethod
def tearDownClass(cls):
Expand Down
Loading