diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 83f205d31e05..7829fd41148c 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -26,6 +26,7 @@ Union, ) from ray.rllib.offline.offline_evaluator import OfflineEvaluator +from ray.rllib.offline.offline_evaluation_utils import remove_time_dim import tree import ray @@ -52,7 +53,7 @@ from ray.rllib.execution.parallel_requests import AsyncRequestsManager from ray.rllib.execution.rollout_ops import synchronous_parallel_sample from ray.rllib.execution.train_ops import multi_gpu_train_one_step, train_one_step -from ray.rllib.offline import get_offline_io_resource_bundles +from ray.rllib.offline import get_dataset_and_shards from ray.rllib.offline.estimators import ( OffPolicyEstimator, ImportanceSampling, @@ -592,7 +593,8 @@ def setup(self, config: AlgorithmConfig) -> None: # Evaluation WorkerSet setup. # User would like to setup a separate evaluation worker set. - if self.config.evaluation_num_workers > 0 or self.config.evaluation_interval: + # Note: We skip workerset creation if we need to do offline evaluation + if self._should_create_evaluation_rollout_workers(self.evaluation_config): _, env_creator = self._get_env_id_and_creator( self.evaluation_config.env, self.evaluation_config ) @@ -620,6 +622,26 @@ def setup(self, config: AlgorithmConfig) -> None: ) self._evaluation_weights_seq_number = 0 + self.evaluation_dataset = None + if ( + self.evaluation_config.off_policy_estimation_methods + and not self.evaluation_config.ope_split_batch_by_episode + ): + # the num worker is set to 0 to avoid creating shards. The dataset will not + # be repartioned to num_workers blocks. + logger.info("Creating evaluation dataset ...") + ds, _ = get_dataset_and_shards(self.evaluation_config, num_workers=0) + + # Dataset should be in form of one episode per row. in case of bandits each + # row is just one time step. To make the computation more efficient later + # we remove the time dimension here. + parallelism = self.evaluation_config.evaluation_num_workers or 1 + batch_size = max(ds.count() // parallelism, 1) + self.evaluation_dataset = ds.map_batches( + remove_time_dim, batch_size=batch_size + ) + logger.info("Evaluation dataset created") + self.reward_estimators: Dict[str, OffPolicyEstimator] = {} ope_types = { "is": ImportanceSampling, @@ -654,7 +676,7 @@ def setup(self, config: AlgorithmConfig) -> None: raise ValueError( f"Unknown off_policy_estimation type: {method_type}! Must be " "either a class path or a sub-class of ray.rllib." - "offline.estimators.off_policy_estimator::OffPolicyEstimator" + "offline.offline_evaluator::OfflineEvaluator" ) # Run `on_algorithm_init` callback after initialization is done. @@ -803,6 +825,9 @@ def evaluate( # Call the `_before_evaluate` hook. self._before_evaluate() + if self.evaluation_dataset is not None: + return {"evaluation": self._run_offline_evaluation()} + # Sync weights to the evaluation WorkerSet. if self.evaluation_workers is not None: self.evaluation_workers.sync_weights( @@ -1976,50 +2001,62 @@ def default_resource_request( # workers to determine their CPU/GPU resource needs. # Convenience config handles. - default_config = cls.get_default_config() - # TODO: Have to make this work for now for AlgorithmConfigs (returned by - # get_default_config(). Use only AlgorithmConfigs once all Algorithms - # return an AlgorothmConfig from their get_default_config() method. - if not isinstance(default_config, dict): - default_config = default_config.to_dict() - cf = dict(default_config, **config) - eval_cf = cf["evaluation_config"] or {} + cf = cls.get_default_config().update_from_dict(config) + cf.validate() + cf.freeze() + + # get evaluation config + eval_cf = cf.get_evaluation_config_object() + eval_cf.validate() + eval_cf.freeze() + # resources for local worker local_worker = { - "CPU": cf["num_cpus_for_driver"], - "GPU": 0 if cf["_fake_gpus"] else cf["num_gpus"], + "CPU": cf.num_cpus_for_local_worker, + "GPU": 0 if cf._fake_gpus else cf.num_gpus, } + + bundles = [local_worker] + + # resources for rollout env samplers rollout_workers = [ { - "CPU": cf["num_cpus_per_worker"], - "GPU": cf["num_gpus_per_worker"], - **cf["custom_resources_per_worker"], + "CPU": cf.num_cpus_per_worker, + "GPU": cf.num_gpus_per_worker, + **cf.custom_resources_per_worker, } - for _ in range(cf["num_workers"]) + for _ in range(cf.num_rollout_workers) ] - bundles = [local_worker] + rollout_workers - - if cf["evaluation_interval"]: + # resources for evaluation env samplers or datasets (if any) + if cls._should_create_evaluation_rollout_workers(eval_cf): # Evaluation workers. # Note: The local eval worker is located on the driver CPU. - bundles += [ + evaluation_bundle = [ { - "CPU": eval_cf.get( - "num_cpus_per_worker", cf["num_cpus_per_worker"] - ), - "GPU": eval_cf.get( - "num_gpus_per_worker", cf["num_gpus_per_worker"] - ), - **eval_cf.get( - "custom_resources_per_worker", cf["custom_resources_per_worker"] - ), + "CPU": eval_cf.num_cpus_per_worker, + "GPU": eval_cf.num_gpus_per_worker, + **eval_cf.custom_resources_per_worker, } - for _ in range(cf["evaluation_num_workers"]) + for _ in range(eval_cf.evaluation_num_workers) ] - - # In case our I/O reader/writer requires conmpute resources. - bundles += get_offline_io_resource_bundles(cf) + else: + # resources for offline dataset readers during evaluation + # Note (Kourosh): we should not claim extra workers for + # training on the offline dataset, since rollout workers have already + # claimed it. + # Another Note (Kourosh): dataset reader will not use placement groups so + # whatever we specify here won't matter because dataset won't even use it. + # Disclaimer: using ray dataset in tune may cause deadlock when multiple + # tune trials get scheduled on the same node and do not leave any spare + # resources for dataset operations. The workaround is to limit the + # max_concurrent trials so that some spare cpus are left for dataset + # operations. This behavior should get fixed by the dataset team. more info + # found here: + # https://docs.ray.io/en/master/data/dataset-internals.html#datasets-tune + evaluation_bundle = [] + + bundles += rollout_workers + evaluation_bundle # Return PlacementGroupFactory containing all needed resources # (already properly defined as device bundles). @@ -2632,6 +2669,7 @@ def _run_one_evaluation( Returns: The results dict from the evaluation call. """ + eval_results = { "evaluation": { "episode_reward_max": np.nan, @@ -2715,6 +2753,46 @@ def _run_one_training_iteration_and_evaluation_in_parallel( return results, train_iter_ctx + def _run_offline_evaluation(self): + """Runs offline evaluation via `OfflineEvaluator.estimate_on_dataset()` API. + + This method will be used when `evaluation_dataset` is provided. + Note: This will only work if the policy is a single agent policy. + + Returns: + The results dict from the offline evaluation call. + """ + assert len(self.workers.local_worker().policy_map) == 1 + + parallelism = self.evaluation_config.evaluation_num_workers or 1 + offline_eval_results = {"off_policy_estimator": {}} + for evaluator_name, offline_evaluator in self.reward_estimators.items(): + offline_eval_results["off_policy_estimator"][ + evaluator_name + ] = offline_evaluator.estimate_on_dataset( + self.evaluation_dataset, + n_parallelism=parallelism, + ) + return offline_eval_results + + @classmethod + def _should_create_evaluation_rollout_workers(cls, eval_config: "AlgorithmConfig"): + """Determines whether we need to create evaluation workers. + + Returns False if we need to run offline evaluation + (with ope.estimate_on_dastaset API) or when local worker is to be used for + evaluation. Note: We only use estimate_on_dataset API with bandits for now. + That is when ope_split_batch_by_episode is False. TODO: In future we will do + the same for episodic RL OPE. + """ + run_offline_evaluation = ( + eval_config.get("off_policy_estimation_methods") + and not eval_config.ope_split_batch_by_episode + ) + return not run_offline_evaluation and ( + eval_config.evaluation_num_workers > 0 or eval_config.evaluation_interval + ) + @staticmethod def _automatic_evaluation_duration_fn( unit, num_eval_workers, eval_cfg, train_future, num_units_done diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 97eb731bb6b4..b30679cdbe93 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -647,7 +647,10 @@ def validate(self) -> None: from ray.rllib.policy.dynamic_tf_policy import DynamicTFPolicy from ray.rllib.policy.torch_policy import TorchPolicy - default_policy_cls = self.algo_class.get_default_policy_class(self) + default_policy_cls = None + if self.algo_class: + default_policy_cls = self.algo_class.get_default_policy_class(self) + policies = self.policies policy_specs = ( [ @@ -680,6 +683,29 @@ def validate(self) -> None: f"config.framework({self.framework_str})!" ) + if self.input_ == "sampler" and self.off_policy_estimation_methods: + raise ValueError( + "Off-policy estimation methods can only be used if the input is a " + "dataset. We currently do not support applying off_policy_esitmation " + "method on a sampler input." + ) + + if self.input_ == "dataset": + # if we need to read a ray dataset set the parallelism and + # num_cpus_per_read_task from rollout worker settings + self.input_config["num_cpus_per_read_task"] = self.num_cpus_per_worker + if self.in_evaluation: + # If using dataset for evaluation, the parallelism gets set to + # evaluation_num_workers for backward compatibility and num_cpus gets + # set to num_cpus_per_worker from rollout worker. User only needs to + # set evaluation_num_workers. + self.input_config["parallelism"] = self.evaluation_num_workers or 1 + else: + # If using dataset for training, the parallelism and num_cpus gets set + # based on rollout worker parameters. This is for backwards + # compatibility for now. User only needs to set num_rollout_workers. + self.input_config["parallelism"] = self.num_rollout_workers or 1 + def build( self, env: Optional[Union[str, EnvType]] = None, @@ -1460,8 +1486,10 @@ def offline_data( - A callable that takes an `IOContext` object as only arg and returns a ray.rllib.offline.InputReader. - A string key that indexes a callable with tune.registry.register_input - input_config: Arguments accessible from the IOContext for configuring custom - input. + input_config: Arguments that describe the settings for reading the input. + If input is `sample`, this will be environment configuation, e.g. + `env_name` and `env_config`, etc. See `EnvContext` for more info. + If the input is `dataset`, this will be e.g. `format`, `path`. actions_in_input_normalized: True, if the actions in a given offline "input" are already normalized (between -1.0 and 1.0). This is usually the case when the offline file has been generated by another RLlib algorithm @@ -1497,6 +1525,25 @@ def offline_data( if input_ is not NotProvided: self.input_ = input_ if input_config is not NotProvided: + if not isinstance(input_config, dict): + raise ValueError( + f"input_config must be a dict, got {type(input_config)}." + ) + # TODO (Kourosh) Once we use a complete sepration between rollout worker + # and input dataset reader we can remove this. + # For now Error out if user attempts to set these parameters. + msg = "{} should not be set in the input_config. RLlib will use {} instead." + if input_config.get("num_cpus_per_read_task") is not None: + raise ValueError( + msg.format("num_cpus_per_read_task", "num_cpus_per_worker") + ) + if input_config.get("parallelism") is not None: + if self.in_evaluation: + raise ValueError( + msg.format("parallelism", "evaluation_num_workers") + ) + else: + raise ValueError(msg.format("parallelism", "num_rollout_workers")) self.input_config = input_config if actions_in_input_normalized is not NotProvided: self.actions_in_input_normalized = actions_in_input_normalized diff --git a/rllib/algorithms/bc/bc.py b/rllib/algorithms/bc/bc.py index 9523c2c9266c..905c2813fc80 100644 --- a/rllib/algorithms/bc/bc.py +++ b/rllib/algorithms/bc/bc.py @@ -56,11 +56,6 @@ def __init__(self, algo_class=None): # __sphinx_doc_end__ # fmt: on - # TODO: Remove this when the off_policy_estimation_methods - # default config is removed from MARWIL - # No off-policy estimation. - self.off_policy_estimation_methods = {} - @override(MARWILConfig) def validate(self) -> None: super().validate() diff --git a/rllib/algorithms/ddpg/ddpg.py b/rllib/algorithms/ddpg/ddpg.py index 6325890add71..fe3cbb07cc1e 100644 --- a/rllib/algorithms/ddpg/ddpg.py +++ b/rllib/algorithms/ddpg/ddpg.py @@ -270,12 +270,6 @@ def validate(self) -> None: f"Try setting config.rollouts(rollout_fragment_length={self.n_step})." ) - if self.model["custom_model"]: - raise ValueError( - "Try setting config.training(use_state_preprocessor=True) " - "since a custom model was specified." - ) - if self.grad_clip is not None and self.grad_clip <= 0.0: raise ValueError("`grad_clip` value must be > 0.0!") diff --git a/rllib/algorithms/marwil/marwil.py b/rllib/algorithms/marwil/marwil.py index 15bf4530855f..edc0e00fa57c 100644 --- a/rllib/algorithms/marwil/marwil.py +++ b/rllib/algorithms/marwil/marwil.py @@ -9,13 +9,9 @@ multi_gpu_train_one_step, train_one_step, ) -from ray.rllib.offline.estimators import ImportanceSampling, WeightedImportanceSampling from ray.rllib.policy.policy import Policy from ray.rllib.utils.annotations import override -from ray.rllib.utils.deprecation import ( - Deprecated, - deprecation_warning, -) +from ray.rllib.utils.deprecation import Deprecated, deprecation_warning from ray.rllib.utils.metrics import ( NUM_AGENT_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED, @@ -100,13 +96,6 @@ def __init__(self, algo_class=None): self.train_batch_size = 2000 # __sphinx_doc_end__ # fmt: on - - # TODO: Delete this and change off_policy_estimation_methods to {} - # Also remove the same section from BC - self.off_policy_estimation_methods = { - "is": {"type": ImportanceSampling}, - "wis": {"type": WeightedImportanceSampling}, - } self._set_off_policy_estimation_methods = False @override(AlgorithmConfig) @@ -169,7 +158,6 @@ def evaluation( **kwargs, ) -> "MARWILConfig": """Sets the evaluation related configuration. - Returns: This updated AlgorithmConfig object. """ @@ -190,9 +178,9 @@ def build( ) -> "Algorithm": if not self._set_off_policy_estimation_methods: deprecation_warning( - old="MARWIL currently uses off_policy_estimation_methods: " - f"{self.off_policy_estimation_methods} by default. This will" - "change to off_policy_estimation_methods: {} in a future release." + old="MARWIL used to have off_policy_estimation_methods " + "is and wis by default. This has" + "changed to off_policy_estimation_methods: \{\}." "If you want to use an off-policy estimator, specify it in" ".evaluation(off_policy_estimation_methods=...)", error=False, diff --git a/rllib/algorithms/marwil/tests/test_marwil.py b/rllib/algorithms/marwil/tests/test_marwil.py index 66f0c19f2a9b..908c3c4f98e4 100644 --- a/rllib/algorithms/marwil/tests/test_marwil.py +++ b/rllib/algorithms/marwil/tests/test_marwil.py @@ -24,7 +24,7 @@ class TestMARWIL(unittest.TestCase): @classmethod def setUpClass(cls): - ray.init(num_cpus=4) + ray.init() @classmethod def tearDownClass(cls): diff --git a/rllib/algorithms/tests/test_algorithm.py b/rllib/algorithms/tests/test_algorithm.py index ef34e68d8efe..5140f4913360 100644 --- a/rllib/algorithms/tests/test_algorithm.py +++ b/rllib/algorithms/tests/test_algorithm.py @@ -20,7 +20,7 @@ class TestAlgorithm(unittest.TestCase): @classmethod def setUpClass(cls): - ray.init(num_cpus=6) + ray.init() @classmethod def tearDownClass(cls): diff --git a/rllib/offline/dataset_reader.py b/rllib/offline/dataset_reader.py index b15f15684157..0f8e5188717b 100644 --- a/rllib/offline/dataset_reader.py +++ b/rllib/offline/dataset_reader.py @@ -14,19 +14,9 @@ from ray.rllib.utils.annotations import override, PublicAPI from ray.rllib.utils.typing import SampleBatchType, AlgorithmConfigDict -logger = logging.getLogger(__name__) - DEFAULT_NUM_CPUS_PER_TASK = 0.5 - -# TODO: @avnishn what is the use of this function anymore? -def _get_resource_bundles(config: AlgorithmConfigDict): - input_config = config.get("input_config", {}) - parallelism = input_config.get("parallelism", config.get("num_workers", 1)) - cpus_per_task = input_config.get( - "num_cpus_per_read_task", DEFAULT_NUM_CPUS_PER_TASK - ) - return [{"CPU": math.ceil(parallelism * cpus_per_task)}] +logger = logging.getLogger(__name__) def _unzip_this_path(fpath: Path, extract_path: str): @@ -135,9 +125,8 @@ def get_dataset_and_shards( # check if at least loader_fn or format + path is specified. if not (format and paths) and not loader_fn: raise ValueError( - f"If using a loader_fn: {loader_fn} that constructs a dataset, " - "neither format: {format} and paths: {paths} must not be specified. If " - "format and paths are specified, a loader_fn must not be specified." + "Must specify either a `loader_fn` or a `format` and `path` in " + "`input_config`." ) # check paths to be a str or list[str] if not None @@ -150,6 +139,8 @@ def get_dataset_and_shards( raise ValueError("Paths must be a path string or a list of path strings.") paths = _unzip_if_needed(paths, format) + # TODO (Kourosh): num_workers is not necessary since we can use parallelism for + # everything. Having two parameters is confusing here. Remove num_workers later. parallelism = input_config.get("parallelism", num_workers or 1) cpus_per_task = input_config.get( "num_cpus_per_read_task", DEFAULT_NUM_CPUS_PER_TASK diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index 32bcd605f376..798955de15f8 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -1,6 +1,13 @@ import logging from typing import Dict, Any, Optional, List +import math +import numpy as np + +from ray.data import Dataset + from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator +from ray.rllib.offline.offline_evaluation_utils import compute_q_and_v_values +from ray.rllib.offline.offline_evaluator import OfflineEvaluator from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel from ray.rllib.policy import Policy from ray.rllib.policy.sample_batch import convert_ma_batch_to_sample_batch @@ -8,7 +15,6 @@ from ray.rllib.utils.annotations import DeveloperAPI, override from ray.rllib.utils.typing import SampleBatchType from ray.rllib.utils.numpy import convert_to_numpy -import numpy as np logger = logging.getLogger() @@ -116,3 +122,51 @@ def train(self, batch: SampleBatchType) -> Dict[str, Any]: batch = convert_ma_batch_to_sample_batch(batch) losses = self.model.train(batch) return {"loss": np.mean(losses)} + + @override(OfflineEvaluator) + def estimate_on_dataset( + self, dataset: Dataset, *, n_parallelism: int = ... + ) -> Dict[str, Any]: + """Calculates the Direct Method estimate on the given dataset. + + Note: This estimate works for only discrete action spaces for now. + + Args: + dataset: Dataset to compute the estimate on. Each record in dataset should + include the following columns: `obs`, `actions`, `action_prob` and + `rewards`. The `obs` on each row shoud be a vector of D dimensions. + n_parallelism: The number of parallel workers to use. + + Returns: + Dictionary with the following keys: + v_target: The estimated value of the target policy. + v_behavior: The estimated value of the behavior policy. + v_gain: The estimated gain of the target policy over the behavior + policy. + v_std: The standard deviation of the estimated value of the target. + """ + # compute v_values + batch_size = max(dataset.count() // n_parallelism, 1) + updated_ds = dataset.map_batches( + compute_q_and_v_values, + batch_size=batch_size, + fn_kwargs={ + "model_class": self.model.__class__, + "model_state": self.model.get_state(), + "compute_q_values": False, + }, + ) + + v_behavior = updated_ds.mean("rewards") + v_target = updated_ds.mean("v_values") + v_gain_mean = v_target / v_behavior + v_gain_ste = ( + updated_ds.std("v_values") / v_behavior / math.sqrt(dataset.count()) + ) + + return { + "v_behavior": v_behavior, + "v_target": v_target, + "v_gain_mean": v_gain_mean, + "v_gain_ste": v_gain_ste, + } diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index b615d035b875..53f6387df5d1 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -1,7 +1,12 @@ import logging import numpy as np +import math +import pandas as pd from typing import Dict, Any, Optional, List + +from ray.data import Dataset + from ray.rllib.policy import Policy from ray.rllib.policy.sample_batch import SampleBatch, convert_ma_batch_to_sample_batch from ray.rllib.utils.annotations import DeveloperAPI, override @@ -10,6 +15,11 @@ from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel +from ray.rllib.offline.offline_evaluator import OfflineEvaluator +from ray.rllib.offline.offline_evaluation_utils import ( + compute_is_weights, + compute_q_and_v_values, +) logger = logging.getLogger() @@ -46,6 +56,7 @@ def __init__( policy: Policy, gamma: float, epsilon_greedy: float = 0.0, + normalize_weights: bool = True, q_model_config: Optional[Dict] = None, ): """Initializes a Doubly Robust OPE Estimator. @@ -56,6 +67,9 @@ def __init__( epsilon_greedy: The probability by which we act acording to a fully random policy during deployment. With 1-epsilon_greedy we act according the target policy. + normalize_weights: If True, the inverse propensity scores are normalized to + their sum across the entire dataset. The effect of this is similar to + weighted importance sampling compared to standard importance sampling. q_model_config: Arguments to specify the Q-model. Must specify a `type` key pointing to the Q-model class. This Q-model is trained in the train() method and is used @@ -67,11 +81,14 @@ def __init__( super().__init__(policy, gamma, epsilon_greedy) q_model_config = q_model_config or {} - model_cls = q_model_config.pop("type", FQETorchModel) + q_model_config["gamma"] = gamma + + self._model_cls = q_model_config.pop("type", FQETorchModel) + self._model_configs = q_model_config + self._normalize_weights = normalize_weights - self.model = model_cls( + self.model = self._model_cls( policy=policy, - gamma=gamma, **q_model_config, ) assert hasattr( @@ -88,6 +105,8 @@ def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: rewards, old_prob = episode["rewards"], episode["action_prob"] new_prob = self.compute_action_probs(episode) + weight = new_prob / old_prob + v_behavior = 0.0 v_target = 0.0 q_values = self.model.estimate_q(episode) @@ -98,7 +117,7 @@ def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: for t in reversed(range(episode.count)): v_behavior = rewards[t] + self.gamma * v_behavior - v_target = v_values[t] + (new_prob[t] / old_prob[t]) * ( + v_target = v_values[t] + weight[t] * ( rewards[t] + self.gamma * v_target - q_values[t] ) v_target = v_target.item() @@ -145,3 +164,87 @@ def train(self, batch: SampleBatchType) -> Dict[str, Any]: batch = convert_ma_batch_to_sample_batch(batch) losses = self.model.train(batch) return {"loss": np.mean(losses)} + + @override(OfflineEvaluator) + def estimate_on_dataset( + self, dataset: Dataset, *, n_parallelism: int = ... + ) -> Dict[str, Any]: + """Estimates the policy value using the Doubly Robust estimator. + + The doubly robust estimator uses normalization of importance sampling weights + (aka. propensity ratios) to the average of the importance weights across the + entire dataset. This is done to reduce the variance of the estimate (similar to + weighted importance sampling). You can disable this by setting + `normalize_weights=False` in the constructor. + + Note: This estimate works for only discrete action spaces for now. + + Args: + dataset: Dataset to compute the estimate on. Each record in dataset should + include the following columns: `obs`, `actions`, `action_prob` and + `rewards`. The `obs` on each row shoud be a vector of D dimensions. + n_parallelism: Number of parallelism to use for the computation. + + Returns: + A dict with the following keys: + v_target: The estimated value of the target policy. + v_behavior: The estimated value of the behavior policy. + v_gain: The estimated gain of the target policy over the behavior + policy. + v_std: The standard deviation of the estimated value of the target. + """ + + # step 1: compute the weights and weighted rewards + batch_size = max(dataset.count() // n_parallelism, 1) + updated_ds = dataset.map_batches( + compute_is_weights, + batch_size=batch_size, + fn_kwargs={ + "policy_state": self.policy.get_state(), + "estimator_class": self.__class__, + }, + ) + + # step 2: compute q_values and v_values + batch_size = max(updated_ds.count() // n_parallelism, 1) + updated_ds = updated_ds.map_batches( + compute_q_and_v_values, + batch_size=batch_size, + fn_kwargs={ + "model_class": self.model.__class__, + "model_state": self.model.get_state(), + }, + ) + + # step 3: compute the v_target + def compute_v_target(batch: pd.DataFrame, normalizer: float = 1.0): + weights = batch["weights"] / normalizer + batch["v_target"] = batch["v_values"] + weights * ( + batch["rewards"] - batch["q_values"] + ) + batch["v_behavior"] = batch["rewards"] + return batch + + normalizer = updated_ds.mean("weights") if self._normalize_weights else 1.0 + updated_ds = updated_ds.map_batches( + compute_v_target, + batch_size=batch_size, + fn_kwargs={"normalizer": normalizer}, + ) + + v_behavior = updated_ds.mean("v_behavior") + v_target = updated_ds.mean("v_target") + v_gain_mean = v_target / v_behavior + v_gain_ste = ( + updated_ds.std("v_target") + / normalizer + / v_behavior + / math.sqrt(dataset.count()) + ) + + return { + "v_behavior": v_behavior, + "v_target": v_target, + "v_gain_mean": v_gain_mean, + "v_gain_ste": v_gain_ste, + } diff --git a/rllib/offline/estimators/fqe_torch_model.py b/rllib/offline/estimators/fqe_torch_model.py index 9a21fac6dc91..fa15613e93d7 100644 --- a/rllib/offline/estimators/fqe_torch_model.py +++ b/rllib/offline/estimators/fqe_torch_model.py @@ -1,3 +1,4 @@ +from typing import Dict, Any from ray.rllib.models.utils import get_initializer from ray.rllib.policy import Policy @@ -25,7 +26,7 @@ def __init__( self, policy: Policy, gamma: float, - model: ModelConfigDict = None, + model_config: ModelConfigDict = None, n_iters: int = 1, lr: float = 1e-3, min_loss_threshold: float = 1e-4, @@ -37,7 +38,7 @@ def __init__( Args: policy: Policy to evaluate. gamma: Discount factor of the environment. - model: The ModelConfigDict for self.q_model, defaults to: + model_config: The ModelConfigDict for self.q_model, defaults to: { "fcnet_hiddens": [8, 8], "fcnet_activation": "relu", @@ -59,19 +60,20 @@ def __init__( self.observation_space = policy.observation_space self.action_space = policy.action_space - if model is None: - model = { + if model_config is None: + model_config = { "fcnet_hiddens": [32, 32, 32], "fcnet_activation": "relu", "vf_share_layers": True, } + self.model_config = model_config self.device = self.policy.device self.q_model: TorchModelV2 = ModelCatalog.get_model_v2( self.observation_space, self.action_space, self.action_space.n, - model, + model_config, framework="torch", name="TorchQModel", ).to(self.device) @@ -80,7 +82,7 @@ def __init__( self.observation_space, self.action_space, self.action_space.n, - model, + model_config, framework="torch", name="TargetTorchQModel", ).to(self.device) @@ -244,3 +246,52 @@ def _compute_action_probs(self, obs: TensorType) -> TensorType: ), "FQE only supports Categorical or MultiCategorical distributions!" action_probs = action_dist.dist.probs return action_probs + + def get_state(self) -> Dict[str, Any]: + """Returns the current state of the FQE Model.""" + return { + "policy_state": self.policy.get_state(), + "model_config": self.model_config, + "n_iters": self.n_iters, + "lr": self.lr, + "min_loss_threshold": self.min_loss_threshold, + "clip_grad_norm": self.clip_grad_norm, + "minibatch_size": self.minibatch_size, + "polyak_coef": self.polyak_coef, + "gamma": self.gamma, + "q_model_state": self.q_model.state_dict(), + "target_q_model_state": self.target_q_model.state_dict(), + } + + def set_state(self, state: Dict[str, Any]) -> None: + """Sets the current state of the FQE Model. + Args: + state: A state dict returned by `get_state()`. + """ + self.n_iters = state["n_iters"] + self.lr = state["lr"] + self.min_loss_threshold = state["min_loss_threshold"] + self.clip_grad_norm = state["clip_grad_norm"] + self.minibatch_size = state["minibatch_size"] + self.polyak_coef = state["polyak_coef"] + self.gamma = state["gamma"] + self.policy.set_state(state["policy_state"]) + self.q_model.load_state_dict(state["q_model_state"]) + self.target_q_model.load_state_dict(state["target_q_model_state"]) + + @classmethod + def from_state(cls, state: Dict[str, Any]) -> "FQETorchModel": + """Creates a FQE Model from a state dict. + + Args: + state: A state dict returned by `get_state`. + + Returns: + An instance of the FQETorchModel. + """ + policy = Policy.from_state(state["policy_state"]) + model = cls( + policy=policy, gamma=state["gamma"], model_config=state["model_config"] + ) + model.set_state(state) + return model diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index b2a779379450..500cf9e147e4 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -1,7 +1,13 @@ -from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator +from typing import Dict, List, Any +import math + +from ray.data import Dataset + from ray.rllib.utils.annotations import override, DeveloperAPI +from ray.rllib.offline.offline_evaluator import OfflineEvaluator +from ray.rllib.offline.offline_evaluation_utils import compute_is_weights +from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.policy.sample_batch import SampleBatch -from typing import Dict, List @DeveloperAPI @@ -65,3 +71,49 @@ def estimate_on_single_step_samples( estimates_per_epsiode["v_target"] = v_target return estimates_per_epsiode + + @override(OfflineEvaluator) + def estimate_on_dataset( + self, dataset: Dataset, *, n_parallelism: int = ... + ) -> Dict[str, Any]: + """Computes the Importance sampling estimate on the given dataset. + + Note: This estimate works for both continuous and discrete action spaces. + + Args: + dataset: Dataset to compute the estimate on. Each record in dataset should + include the following columns: `obs`, `actions`, `action_prob` and + `rewards`. The `obs` on each row shoud be a vector of D dimensions. + n_parallelism: The number of parallel workers to use. + + Returns: + A dictionary containing the following keys: + v_target: The estimated value of the target policy. + v_behavior: The estimated value of the behavior policy. + v_gain_mean: The mean of the gain of the target policy over the + behavior policy. + v_gain_ste: The standard error of the gain of the target policy over + the behavior policy. + """ + batch_size = max(dataset.count() // n_parallelism, 1) + updated_ds = dataset.map_batches( + compute_is_weights, + batch_size=batch_size, + fn_kwargs={ + "policy_state": self.policy.get_state(), + "estimator_class": self.__class__, + }, + ) + v_target = updated_ds.mean("weighted_rewards") + v_behavior = updated_ds.mean("rewards") + v_gain_mean = v_target / v_behavior + v_gain_ste = ( + updated_ds.std("weighted_rewards") / v_behavior / math.sqrt(dataset.count()) + ) + + return { + "v_target": v_target, + "v_behavior": v_behavior, + "v_gain_mean": v_gain_mean, + "v_gain_ste": v_gain_ste, + } diff --git a/rllib/offline/estimators/tests/test_dm_learning.py b/rllib/offline/estimators/tests/test_dm_learning.py index a7b7da94722f..a193e84e89a4 100644 --- a/rllib/offline/estimators/tests/test_dm_learning.py +++ b/rllib/offline/estimators/tests/test_dm_learning.py @@ -40,7 +40,7 @@ def setUpClass(cls): "n_iters": 800, "minibatch_size": 64, "polyak_coef": 1.0, - "model": { + "model_config": { "fcnet_hiddens": [32, 32, 32], "activation": "relu", }, diff --git a/rllib/offline/estimators/tests/test_dr_learning.py b/rllib/offline/estimators/tests/test_dr_learning.py index 6db32be89521..da79ccdcefa7 100644 --- a/rllib/offline/estimators/tests/test_dr_learning.py +++ b/rllib/offline/estimators/tests/test_dr_learning.py @@ -40,7 +40,7 @@ def setUpClass(cls): "n_iters": 800, "minibatch_size": 64, "polyak_coef": 1.0, - "model": { + "model_config": { "fcnet_hiddens": [32, 32, 32], "activation": "relu", }, diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 8d3fd2f8f796..9cebb3457358 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -1,9 +1,14 @@ +from typing import TYPE_CHECKING, Tuple + import copy +import gym +import numpy as np import os -import unittest +import pandas as pd from pathlib import Path +import unittest -import numpy as np +import ray from ray.data import read_json from ray.rllib.algorithms.dqn import DQNConfig from ray.rllib.examples.env.cliff_walking_wall_env import CliffWalkingWallEnv @@ -21,7 +26,9 @@ from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.test_utils import check -import ray +if TYPE_CHECKING: + from ray.rllib.policy import Policy + torch, _ = try_import_torch() @@ -35,12 +42,71 @@ } +def compute_expected_is_or_wis_estimator( + df: pd.DataFrame, policy: "Policy", num_actions: int, is_wis: bool = False +) -> Tuple[float, float]: + """Computes the expected IS or WIS estimator for the given policy and data. + + The policy is assumed to be deterministic over some discrete action space. i.e. the + output of a policy has probablity 1.0 over the action it chooses. + + Args: + df: The data to compute the estimator for. + policy: The policy to compute the estimator for. + num_actions: The number of actions in the action space. + is_wis: Whether to compute the IS or WIS estimator. + + Returns: + A tuple of the estimator value and the standard error of the estimator. + """ + sample_batch = {SampleBatch.OBS: np.vstack(df[SampleBatch.OBS].values)} + + actions, _, extra_outs = policy.compute_actions_from_input_dict( + sample_batch, explore=False + ) + + logged_actions = df[SampleBatch.ACTIONS].astype(int) + ips_gain = ( + num_actions + * sum(df[SampleBatch.REWARDS] * (1.0 * (actions == logged_actions).values)) + / df[SampleBatch.REWARDS].sum() + ) + avg_ips_weight = ( + num_actions * sum((1.0 * (actions == logged_actions).values)) / len(actions) + ) + + if is_wis: + gain = float(ips_gain / avg_ips_weight) + else: + gain = float(ips_gain) + + ips_gain_vec = ( + num_actions + * df[SampleBatch.REWARDS] + * (1.0 * (actions == logged_actions)).values + / df[SampleBatch.REWARDS].mean() + ) + + if is_wis: + se = float( + np.std(ips_gain_vec / avg_ips_weight) + / np.sqrt(len(ips_gain_vec / avg_ips_weight)) + ) + else: + se = float(np.std(ips_gain_vec) / np.sqrt(len(ips_gain_vec))) + + return gain, se + + class TestOPE(unittest.TestCase): """Compilation tests for using OPE both standalone and in an RLlib Algorithm""" @classmethod def setUpClass(cls): ray.init() + seed = 42 + np.random.seed(seed) + rllib_dir = Path(__file__).parent.parent.parent.parent train_data = os.path.join(rllib_dir, "tests/data/cartpole/small.json") @@ -49,7 +115,7 @@ def setUpClass(cls): n_episodes = 3 cls.q_model_config = {"n_iters": 160} - config = ( + cls.config_dqn_on_cartpole = ( DQNConfig() .environment(env=env_name) .framework("torch") @@ -72,7 +138,44 @@ def setUpClass(cls): ) .resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", 0))) ) - cls.algo = config.build() + + num_rollout_workers = 4 + dsize = num_rollout_workers * 1024 + feature_dim = 64 + action_dim = 8 + + data = { + SampleBatch.OBS: np.random.randn(dsize, 1, feature_dim), + SampleBatch.ACTIONS: np.random.randint(0, action_dim, dsize).reshape(-1, 1), + SampleBatch.REWARDS: np.random.rand(dsize).reshape(-1, 1), + SampleBatch.ACTION_PROB: 1 / action_dim * np.ones((dsize, 1)), + } + cls.train_df = pd.DataFrame({k: list(v) for k, v in data.items()}) + cls.train_df["type"] = "SampleBatch" + + train_ds = ray.data.from_pandas(cls.train_df).repartition(num_rollout_workers) + + cls.dqn_on_fake_ds = ( + DQNConfig() + .environment( + observation_space=gym.spaces.Box(-1, 1, (feature_dim,)), + action_space=gym.spaces.Discrete(action_dim), + ) + .rollouts(num_rollout_workers=num_rollout_workers) + .framework("torch") + # .rollouts(num_rollout_workers=num_rollout_workers) + .offline_data( + input_="dataset", + input_config={"loader_fn": lambda: train_ds}, + ) + .evaluation( + evaluation_num_workers=num_rollout_workers, + ope_split_batch_by_episode=False, + ) + # make the policy deterministic + .training(categorical_distribution_temperature=1e-20) + .debugging(seed=seed) + ) # Read n_episodes of data, assuming that one line is one episode reader = DatasetReader(read_json(train_data)) @@ -85,30 +188,32 @@ def setUpClass(cls): def tearDownClass(cls): ray.shutdown() - def test_is_and_wis_standalone(self): + def test_is_and_wis_estimate(self): ope_classes = [ ImportanceSampling, WeightedImportanceSampling, ] + algo = self.config_dqn_on_cartpole.build() for class_module in ope_classes: estimator = class_module( - policy=self.algo.get_policy(), + policy=algo.get_policy(), gamma=self.gamma, ) estimates = estimator.estimate(self.batch) self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS) check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"]) - def test_dm_and_dr_standalone(self): + def test_dm_and_dr_estimate(self): ope_classes = [ DirectMethod, DoublyRobust, ] + algo = self.config_dqn_on_cartpole.build() for class_module in ope_classes: estimator = class_module( - policy=self.algo.get_policy(), + policy=algo.get_policy(), gamma=self.gamma, q_model_config=self.q_model_config, ) @@ -118,18 +223,57 @@ def test_dm_and_dr_standalone(self): self.assertEqual(set(estimates.keys()), ESTIMATOR_OUTPUTS) check(estimates["v_gain"], estimates["v_target"] / estimates["v_behavior"]) - def test_ope_in_algo(self): + def test_ope_estimate_algo(self): # Test OPE in DQN, during training as well as by calling evaluate() - results = self.algo.train() + algo = self.config_dqn_on_cartpole.build() + results = algo.train() ope_results = results["evaluation"]["off_policy_estimator"] # Check that key exists AND is not {} self.assertEqual(set(ope_results.keys()), {"is", "wis", "dm_fqe", "dr_fqe"}) # Check algo.evaluate() manually as well - results = self.algo.evaluate() + results = algo.evaluate() ope_results = results["evaluation"]["off_policy_estimator"] self.assertEqual(set(ope_results.keys()), {"is", "wis", "dm_fqe", "dr_fqe"}) + def test_is_wis_on_estimate_on_dataset(self): + """Test that the IS and WIS estimators work. + + First we compute the estimates with RLlib's algorithm and then compare the + results to the estimates that are manually computed on raw data frame version + of the dataset to check correctness. + """ + config = self.dqn_on_fake_ds.copy() + config = config.evaluation( + off_policy_estimation_methods={ + "is": {"type": ImportanceSampling}, + "wis": {"type": WeightedImportanceSampling}, + }, + ) + num_actions = config.action_space.n + algo = config.build() + + evaluated_results = algo.evaluate() + ope_results = evaluated_results["evaluation"]["off_policy_estimator"] + policy = algo.get_policy() + + wis_gain, wis_ste = compute_expected_is_or_wis_estimator( + self.train_df, policy, num_actions=num_actions, is_wis=True + ) + + is_gain, is_ste = compute_expected_is_or_wis_estimator( + self.train_df, policy, num_actions=num_actions, is_wis=False + ) + + check(wis_gain, ope_results["wis"]["v_gain_mean"]) + check(wis_ste, ope_results["wis"]["v_gain_ste"]) + check(is_gain, ope_results["is"]["v_gain_mean"]) + check(is_ste, ope_results["is"]["v_gain_ste"]) + + def test_dr_on_estimate_on_dataset(self): + # TODO (Kourosh): How can we unittest this without querying into the model? + pass + class TestFQE(unittest.TestCase): """Compilation and learning tests for the Fitted-Q Evaluation model""" @@ -238,7 +382,7 @@ def test_fqe_optimal_convergence(self): q_model_config = { "polyak_coef": 1.0, - "model": { + "model_config": { "fcnet_hiddens": [], "activation": "linear", }, diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 1115868aff35..0b3e2667764a 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -128,8 +128,11 @@ def tearDownClass(cls): ray.shutdown() def test_is_and_wis_math(self): - """Tests that the importance sampling and weighted importance sampling - methods are correct based on the math.""" + """Tests that the importance sampling methods. + + It checks whether is and wis methods outputs are consistent when + split_batch_by_episode is True or False (RL vs. Bandits) + """ ope_classes = [ ImportanceSampling, @@ -171,9 +174,11 @@ def test_is_and_wis_math(self): ) def test_dm_dr_math(self): - """Tests that the Direct Method and Doubly Robust methods are correct in terms - of RL vs. bandits. This does not check if v_gain > 1.0 because it needs a real - target policy to train on.""" + """Tests the Direct Method and Doubly Robust methods. + + It checks whether DM and DR methods outputs are consistent when + split_batch_by_episode is True or False (RL vs. Bandits) + """ ope_classes = [ DirectMethod, diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 08789c9bb3fb..2bd5d566e525 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -1,7 +1,12 @@ from typing import Dict, Any, List import numpy as np +import math +from ray.data import Dataset + +from ray.rllib.offline.offline_evaluator import OfflineEvaluator from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator +from ray.rllib.offline.offline_evaluation_utils import compute_is_weights from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.policy import Policy from ray.rllib.utils.annotations import override, DeveloperAPI @@ -76,6 +81,9 @@ def estimate_on_single_step_samples( estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target + estimates_per_epsiode["weights"] = weights + estimates_per_epsiode["new_prob"] = new_prob + estimates_per_epsiode["old_prob"] = old_prob return estimates_per_epsiode @@ -118,3 +126,53 @@ def peek_on_single_episode(self, episode: SampleBatch) -> None: f"Make sure dataset contains only unique episodes with unique ids." ) self.p[eps_id] = episode_p + + @override(OfflineEvaluator) + def estimate_on_dataset( + self, dataset: Dataset, *, n_parallelism: int = ... + ) -> Dict[str, Any]: + """Computes the weighted importance sampling estimate on a dataset. + + Note: This estimate works for both continuous and discrete action spaces. + + Args: + dataset: Dataset to compute the estimate on. Each record in dataset should + include the following columns: `obs`, `actions`, `action_prob` and + `rewards`. The `obs` on each row shoud be a vector of D dimensions. + n_parallelism: Number of parallel workers to use for the computation. + + Returns: + Dictionary with the following keys: + v_target: The weighted importance sampling estimate. + v_behavior: The behavior policy estimate. + v_gain_mean: The mean of the gain of the target policy over the + behavior policy. + v_gain_ste: The standard error of the gain of the target policy over + the behavior policy. + """ + # compute the weights and weighted rewards + batch_size = max(dataset.count() // n_parallelism, 1) + updated_ds = dataset.map_batches( + compute_is_weights, + batch_size=batch_size, + fn_kwargs={ + "policy_state": self.policy.get_state(), + "estimator_class": self.__class__, + }, + ) + v_target = updated_ds.mean("weighted_rewards") / updated_ds.mean("weights") + v_behavior = updated_ds.mean("rewards") + v_gain_mean = v_target / v_behavior + v_gain_ste = ( + updated_ds.std("weighted_rewards") + / updated_ds.mean("weights") + / v_behavior + / math.sqrt(dataset.count()) + ) + + return { + "v_target": v_target, + "v_behavior": v_behavior, + "v_gain_mean": v_gain_mean, + "v_gain_ste": v_gain_ste, + } diff --git a/rllib/offline/feature_importance.py b/rllib/offline/feature_importance.py index f9b5ceec931c..5ffac614eb71 100644 --- a/rllib/offline/feature_importance.py +++ b/rllib/offline/feature_importance.py @@ -1,21 +1,120 @@ +import copy +import numpy as np +import pandas as pd from typing import Callable, Dict, Any + +import ray +from ray.data import Dataset + from ray.rllib.policy import Policy -from ray.rllib.policy.sample_batch import convert_ma_batch_to_sample_batch -from ray.rllib.utils.annotations import override, DeveloperAPI +from ray.rllib.policy.sample_batch import SampleBatch, convert_ma_batch_to_sample_batch +from ray.rllib.utils.annotations import override, DeveloperAPI, ExperimentalAPI from ray.rllib.utils.typing import SampleBatchType from ray.rllib.offline.offline_evaluator import OfflineEvaluator -import numpy as np -import copy - @DeveloperAPI -def perturb_fn(batch: np.ndarray, index: int): +def _perturb_fn(batch: np.ndarray, index: int): # shuffle the indexth column features random_inds = np.random.permutation(batch.shape[0]) batch[:, index] = batch[random_inds, index] +@ExperimentalAPI +def _perturb_df(batch: pd.DataFrame, index: int): + obs_batch = np.vstack(batch["obs"].values) + _perturb_fn(obs_batch, index) + batch["perturbed_obs"] = list(obs_batch) + return batch + + +def _compute_actions( + batch: pd.DataFrame, + policy_state: Dict[str, Any], + input_key: str = "", + output_key: str = "", +): + """A custom local function to do batch prediction of a policy. + + Given the policy state the action predictions are computed as a function of + `input_key` and stored in the `output_key` column. + + Args: + batch: A sub-batch from the dataset. + policy_state: The state of the policy to use for the prediction. + input_key: The key to use for the input to the policy. If not given, the + default is SampleBatch.OBS. + output_key: The key to use for the output of the policy. If not given, the + default is "predicted_actions". + + Returns: + The modified batch with the predicted actions added as a column. + """ + if not input_key: + input_key = SampleBatch.OBS + + policy = Policy.from_state(policy_state) + sample_batch = SampleBatch( + { + SampleBatch.OBS: np.vstack(batch[input_key].values), + } + ) + actions, _, _ = policy.compute_actions_from_input_dict(sample_batch, explore=False) + + if not output_key: + output_key = "predicted_actions" + batch[output_key] = actions + + return batch + + +@ray.remote +def get_feature_importance_on_index( + dataset: ray.data.Dataset, + *, + index: int, + perturb_fn: Callable[[pd.DataFrame, int], None], + batch_size: int, + policy_state: Dict[str, Any], +): + """A remote function to compute the feature importance of a given index. + + Args: + dataset: The dataset to use for the computation. The dataset should have `obs` + and `actions` columns. Each record should be flat d-dimensional array. + index: The index of the feature to compute the importance for. + perturb_fn: The function to use for perturbing the dataset at the given index. + batch_size: The batch size to use for the computation. + policy_state: The state of the policy to use for the computation. + + Returns: + The modified dataset that contains a `delta` column which is the absolute + difference between the expected output and the output due to the perturbation. + """ + perturbed_ds = dataset.map_batches( + perturb_fn, batch_size=batch_size, fn_kwargs={"index": index} + ) + perturbed_actions = perturbed_ds.map_batches( + _compute_actions, + batch_size=batch_size, + fn_kwargs={ + "output_key": "perturbed_actions", + "input_key": "perturbed_obs", + "policy_state": policy_state, + }, + ) + + def delta_fn(batch): + # take the abs difference between columns 'ref_actions` and `perturbed_actions` + # and store it in `diff` + batch["delta"] = np.abs(batch["ref_actions"] - batch["perturbed_actions"]) + return batch + + delta = perturbed_actions.map_batches(delta_fn, batch_size=batch_size) + + return delta + + @DeveloperAPI class FeatureImportance(OfflineEvaluator): @override(OfflineEvaluator) @@ -23,7 +122,8 @@ def __init__( self, policy: Policy, repeat: int = 1, - perturb_fn: Callable[[np.ndarray, int], None] = perturb_fn, + limit_fraction: float = 1.0, + perturb_fn: Callable[[pd.DataFrame, int], pd.DataFrame] = _perturb_df, ): """Feature importance in a model inspection technique that can be used for any fitted predictor when the data is tablular. @@ -47,7 +147,8 @@ def __init__( { "feature_importance": { "type": FeatureImportance, - "repeat": 10 + "repeat": 10, + "limit_fraction": 0.1, } } ) @@ -61,13 +162,17 @@ def __init__( policy: the policy to use for feature importance. repeat: number of times to repeat the perturbation. perturb_fn: function to perturb the features. By default reshuffle the - features within the batch. + features within the batch. + limit_fraction: fraction of the dataset to use for feature importance + This is only used in estimate_on_dataset when the dataset is too large + to compute feature importance on. """ super().__init__(policy) self.repeat = repeat self.perturb_fn = perturb_fn + self.limit_fraction = limit_fraction - def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: + def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: """Estimate the feature importance of the policy. Given a batch of tabular observations, the importance of each feature is @@ -90,11 +195,10 @@ def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: for r in range(self.repeat): for i in range(n_features): copy_obs_batch = copy.deepcopy(obs_batch) - perturb_fn(copy_obs_batch, index=i) + _perturb_fn(copy_obs_batch, index=i) perturbed_actions, _, _ = self.policy.compute_actions( copy_obs_batch, explore=False ) - importance[r, i] = np.mean(np.abs(perturbed_actions - ref_actions)) # take an average across repeats @@ -102,3 +206,72 @@ def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: metrics = {f"feature_{i}": importance[i] for i in range(len(importance))} return metrics + + @override(OfflineEvaluator) + def estimate_on_dataset( + self, dataset: Dataset, *, n_parallelism: int = ... + ) -> Dict[str, Any]: + """Estimate the feature importance of the policy given a dataset. + + For each feature in the dataset, the importance is computed by applying + perturbations to each feature and computing the difference between the + perturbed prediction and the reference prediction. The importance + computation for each feature and each perturbation is repeated `self.repeat` + times. If dataset is large the user can initialize the estimator with a + `limit_fraction` to limit the dataset to a fraction of the original dataset. + + The dataset should include a column named `obs` where each row is a vector of D + dimensions. The importance is computed for each dimension of the vector. + + Note (Implementation detail): The computation across features are distributed + with ray workers since each feature is independent of each other. + + Args: + dataset: the dataset to use for feature importance. + n_parallelism: number of parallel workers to use for feature importance. + + Returns: + A dict mapping each feature index string to its importance. + """ + + policy_state = self.policy.get_state() + # step 1: limit the dataset to a few first rows + ds = dataset.limit(int(self.limit_fraction * dataset.count())) + + # step 2: compute the reference actions + bsize = max(1, ds.count() // n_parallelism) + actions_ds = ds.map_batches( + _compute_actions, + batch_size=bsize, + fn_kwargs={ + "output_key": "ref_actions", + "policy_state": policy_state, + }, + ) + + # step 3: compute the feature importance + n_features = ds.take(1)[0][SampleBatch.OBS].shape[-1] + importance = np.zeros((self.repeat, n_features)) + for r in range(self.repeat): + # shuffle the entire dataset + shuffled_ds = actions_ds.random_shuffle() + bsize_per_task = max(1, (shuffled_ds.count() * n_features) // n_parallelism) + + # for each index perturb the dataset and compute the feat importance score + remote_fns = [ + get_feature_importance_on_index.remote( + dataset=shuffled_ds, + index=i, + perturb_fn=self.perturb_fn, + bsize=bsize_per_task, + policy_state=policy_state, + ) + for i in range(n_features) + ] + ds_w_fi_scores = ray.get(remote_fns) + importance[r] = np.array([d.mean("delta") for d in ds_w_fi_scores]) + + importance = importance.mean(0) + metrics = {f"feature_{i}": importance[i] for i in range(len(importance))} + + return metrics diff --git a/rllib/offline/offline_evaluation_utils.py b/rllib/offline/offline_evaluation_utils.py new file mode 100644 index 000000000000..220c784cbaaa --- /dev/null +++ b/rllib/offline/offline_evaluation_utils.py @@ -0,0 +1,131 @@ +import numpy as np +import pandas as pd +from typing import Any, Dict, Type, TYPE_CHECKING + +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy import Policy +from ray.rllib.utils.numpy import convert_to_numpy +from ray.rllib.utils.annotations import DeveloperAPI + +if TYPE_CHECKING: + from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel + from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator + + +@DeveloperAPI +def compute_q_and_v_values( + batch: pd.DataFrame, + model_class: Type["FQETorchModel"], + model_state: Dict[str, Any], + compute_q_values: bool = True, +) -> pd.DataFrame: + """Computes the Q and V values for the given batch of samples. + + This function is to be used with map_batches() to perform a batch prediction on a + dataset of records with `obs` and `actions` columns. + + Args: + batch: A sub-batch from the dataset. + model_class: The model class to use for the prediction. This class should be a + sub-class of FQEModel that implements the estimate_q() and estimate_v() + methods. + model_state: The state of the model to use for the prediction. + compute_q_values: Whether to compute the Q values or not. If False, only the V + is computed and returned. + + Returns: + The modified batch with the Q and V values added as columns. + """ + model = model_class.from_state(model_state) + + sample_batch = SampleBatch( + { + SampleBatch.OBS: np.vstack(batch[SampleBatch.OBS]), + SampleBatch.ACTIONS: np.vstack(batch[SampleBatch.ACTIONS]).squeeze(-1), + } + ) + + v_values = model.estimate_v(sample_batch) + v_values = convert_to_numpy(v_values) + batch["v_values"] = v_values + + if compute_q_values: + q_values = model.estimate_q(sample_batch) + q_values = convert_to_numpy(q_values) + batch["q_values"] = q_values + + return batch + + +@DeveloperAPI +def compute_is_weights( + batch: pd.DataFrame, + policy_state: Dict[str, Any], + estimator_class: Type["OffPolicyEstimator"], +) -> pd.DataFrame: + """Computes the importance sampling weights for the given batch of samples. + + For a lot of off-policy estimators, the importance sampling weights are computed as + the propensity score ratio between the new and old policies + (i.e. new_pi(act|obs) / old_pi(act|obs)). This function is to be used with + map_batches() to perform a batch prediction on a dataset of records with `obs`, + `actions`, `action_prob` and `rewards` columns. + + Args: + batch: A sub-batch from the dataset. + policy_state: The state of the policy to use for the prediction. + estimator_class: The estimator class to use for the prediction. This class + + Returns: + The modified batch with the importance sampling weights, weighted rewards, new + and old propensities added as columns. + """ + policy = policy = Policy.from_state(policy_state) + estimator = estimator_class(policy=policy, gamma=0, epsilon_greedy=0) + sample_batch = SampleBatch( + { + SampleBatch.OBS: np.vstack(batch["obs"].values), + SampleBatch.ACTIONS: np.vstack(batch["actions"].values).squeeze(-1), + SampleBatch.ACTION_PROB: np.vstack(batch["action_prob"].values).squeeze(-1), + SampleBatch.REWARDS: np.vstack(batch["rewards"].values).squeeze(-1), + } + ) + new_prob = estimator.compute_action_probs(sample_batch) + old_prob = sample_batch[SampleBatch.ACTION_PROB] + rewards = sample_batch[SampleBatch.REWARDS] + weights = new_prob / old_prob + weighted_rewards = weights * rewards + + batch["weights"] = weights + batch["weighted_rewards"] = weighted_rewards + batch["new_prob"] = new_prob + batch["old_prob"] = old_prob + + return batch + + +@DeveloperAPI +def remove_time_dim(batch: pd.DataFrame) -> pd.DataFrame: + """Removes the time dimension from the given sub-batch of the dataset. + + If each row in a dataset has a time dimension ([T, D]), and T=1, this function will + remove the T dimension to convert each row to of shape [D]. If T > 1, the row is + left unchanged. This function is to be used with map_batches(). + + Args: + batch: The batch to remove the time dimension from. + Returns: + The modified batch with the time dimension removed (when applicable) + """ + BATCHED_KEYS = { + SampleBatch.OBS, + SampleBatch.ACTIONS, + SampleBatch.ACTION_PROB, + SampleBatch.REWARDS, + SampleBatch.NEXT_OBS, + SampleBatch.DONES, + } + for k in batch.columns: + if k in BATCHED_KEYS: + batch[k] = batch[k].apply(lambda x: x[0] if len(x) == 1 else x) + return batch diff --git a/rllib/offline/offline_evaluator.py b/rllib/offline/offline_evaluator.py index 40df8d91295f..60b87ff1296d 100644 --- a/rllib/offline/offline_evaluator.py +++ b/rllib/offline/offline_evaluator.py @@ -1,15 +1,19 @@ +import abc +import os import logging from typing import Dict, Any +from ray.data import Dataset + from ray.rllib.policy import Policy -from ray.rllib.utils.annotations import DeveloperAPI +from ray.rllib.utils.annotations import DeveloperAPI, ExperimentalAPI from ray.rllib.utils.typing import SampleBatchType logger = logging.getLogger(__name__) @DeveloperAPI -class OfflineEvaluator: +class OfflineEvaluator(abc.ABC): """Interface for an offline evaluator of a policy""" @DeveloperAPI @@ -22,6 +26,7 @@ def __init__(self, policy: Policy, **kwargs): """ self.policy = policy + @abc.abstractmethod @DeveloperAPI def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: """Returns the evaluation results for the given batch of episodes. @@ -49,3 +54,24 @@ def train(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: Any optional metrics to return from the evaluator """ return {} + + @ExperimentalAPI + def estimate_on_dataset( + self, + dataset: Dataset, + *, + n_parallelism: int = os.cpu_count(), + ) -> Dict[str, Any]: + + """Calculates the estimate of the metrics based on the given offline dataset. + + Typically, the dataset is passed through only once via n_parallel tasks in + mini-batches to improve the run-time of metric estimation. + + Args: + dataset: The ray dataset object to do offline evaluation on. + n_parallelism: The number of parallelism to use for the computation. + + Returns: + Dict[str, Any]: A dictionary of the estimated values. + """ diff --git a/rllib/offline/resource.py b/rllib/offline/resource.py index 5b36e65245d8..57572db63fbe 100644 --- a/rllib/offline/resource.py +++ b/rllib/offline/resource.py @@ -1,18 +1,30 @@ -from ray.rllib.offline.dataset_reader import ( - _get_resource_bundles as dataset_reader_get_resource_bundles, -) +from typing import Dict, List, TYPE_CHECKING from ray.rllib.utils.annotations import PublicAPI -from ray.rllib.utils.typing import PartialAlgorithmConfigDict -from typing import Dict, List + +if TYPE_CHECKING: + from ray.rllib.algorithms.algorithm_config import AlgorithmConfig + +DEFAULT_NUM_CPUS_PER_TASK = 0.5 @PublicAPI def get_offline_io_resource_bundles( - config: PartialAlgorithmConfigDict, + config: "AlgorithmConfig", ) -> List[Dict[str, float]]: # DatasetReader is the only offline I/O component today that # requires compute resources. - if config["input"] == "dataset": - return dataset_reader_get_resource_bundles(config["input_config"]) + if config.input_ == "dataset": + input_config = config.input_config + # TODO (Kourosh): parallelism is use for reading the dataset, which defaults to + # num_workers. This logic here relies on the information that dataset reader + # will have the same logic. So to remove the information leakage, inside + # Algorithm config, we should set parallelism to num_workers if not specified + # and only deal with parallelism here or in dataset_reader.py. same thing is + # true with cpus_per_task. + parallelism = input_config.get("parallelism", config.get("num_workers", 1)) + cpus_per_task = input_config.get( + "num_cpus_per_read_task", DEFAULT_NUM_CPUS_PER_TASK + ) + return [{"CPU": cpus_per_task} for _ in range(parallelism)] else: return [] diff --git a/rllib/offline/tests/test_feature_importance.py b/rllib/offline/tests/test_feature_importance.py index 3a13abd1ff53..c73e73916b10 100644 --- a/rllib/offline/tests/test_feature_importance.py +++ b/rllib/offline/tests/test_feature_importance.py @@ -27,6 +27,10 @@ def test_feat_importance_cartpole(self): # check if the estimate is positive assert all(val > 0 for val in estimate.values()) + def test_feat_importance_estimate_on_dataset(self): + # TODO (Kourosh): add a test for this + pass + if __name__ == "__main__": import pytest