From d68327a97198d02fef25e5b8e27a17c93bc3e82c Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 27 Sep 2022 16:31:06 -0700 Subject: [PATCH 01/16] 1. Introduced new abstraction: OfflineEvaluator that is the parent of OPE and feature importance 2. introduced estimate_multi_step vs. estimate_single_step Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/__init__.py | 3 + rllib/offline/estimators/__init__.py | 2 - rllib/offline/estimators/direct_method.py | 73 +++++----- rllib/offline/estimators/doubly_robust.py | 105 +++++++------- .../offline/estimators/feature_importance.py | 109 ++------------- .../offline/estimators/importance_sampling.py | 104 +++++++------- .../estimators/off_policy_estimator.py | 100 ++++++++++++-- .../weighted_importance_sampling.py | 130 ++++++++---------- rllib/offline/feature_importance.py | 102 ++++++++++++++ rllib/offline/offline_evaluator.py | 51 +++++++ .../tests/test_feature_importance.py | 2 +- 11 files changed, 450 insertions(+), 331 deletions(-) create mode 100644 rllib/offline/feature_importance.py create mode 100644 rllib/offline/offline_evaluator.py rename rllib/offline/{estimators => }/tests/test_feature_importance.py (91%) diff --git a/rllib/offline/__init__.py b/rllib/offline/__init__.py index 33f917d66af9..cc4a0d9bb05d 100644 --- a/rllib/offline/__init__.py +++ b/rllib/offline/__init__.py @@ -9,6 +9,8 @@ from ray.rllib.offline.output_writer import OutputWriter, NoopOutput from ray.rllib.offline.resource import get_offline_io_resource_bundles from ray.rllib.offline.shuffled_input import ShuffledInput +from ray.rllib.offline.feature_importance import FeatureImportance + __all__ = [ "IOContext", @@ -24,4 +26,5 @@ "DatasetWriter", "get_dataset_and_shards", "get_offline_io_resource_bundles", + "FeatureImportance", ] diff --git a/rllib/offline/estimators/__init__.py b/rllib/offline/estimators/__init__.py index c0884063d793..74131faf3eb6 100644 --- a/rllib/offline/estimators/__init__.py +++ b/rllib/offline/estimators/__init__.py @@ -5,7 +5,6 @@ from ray.rllib.offline.estimators.direct_method import DirectMethod from ray.rllib.offline.estimators.doubly_robust import DoublyRobust from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator -from ray.rllib.offline.estimators.feature_importance import FeatureImportance __all__ = [ "OffPolicyEstimator", @@ -13,5 +12,4 @@ "WeightedImportanceSampling", "DirectMethod", "DoublyRobust", - "FeatureImportance", ] diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index 7e1b376d211b..62a7c4c0931b 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -3,6 +3,7 @@ from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel from ray.rllib.policy import Policy +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.annotations import DeveloperAPI, override from ray.rllib.utils.typing import SampleBatchType from ray.rllib.utils.numpy import convert_to_numpy @@ -62,50 +63,38 @@ def __init__( ), "self.model must implement `estimate_v`!" @override(OffPolicyEstimator) - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Compute off-policy estimates. + def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + rewards = episode["rewards"] - Args: - batch: The SampleBatch to run off-policy estimation on + v_behavior = 0.0 + for t in range(episode.count): + v_behavior += rewards[t] * self.gamma ** t - Returns: - A dict consists of the following metrics: - - v_behavior: The discounted return averaged over episodes in the batch - - v_behavior_std: The standard deviation corresponding to v_behavior - - v_target: The estimated discounted return for `self.policy`, - averaged over episodes in the batch - - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8) - - v_delta: The difference between v_target and v_behavior. - """ - batch = self.convert_ma_batch_to_sample_batch(batch) - self.check_action_prob_in_batch(batch) - estimates_per_epsiode = {"v_behavior": [], "v_target": []} - # Calculate Direct Method OPE estimates - for episode in batch.split_by_episode(): - rewards = episode["rewards"] - v_behavior = 0.0 - v_target = 0.0 - for t in range(episode.count): - v_behavior += rewards[t] * self.gamma ** t - - init_step = episode[0:1] - v_target = self.model.estimate_v(init_step) - v_target = convert_to_numpy(v_target).item() - - estimates_per_epsiode["v_behavior"].append(v_behavior) - estimates_per_epsiode["v_target"].append(v_target) - - estimates = { - "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), - "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), - "v_target": np.mean(estimates_per_epsiode["v_target"]), - "v_target_std": np.std(estimates_per_epsiode["v_target"]), - } - estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) - estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] - - return estimates + v_target = self._compute_v_target(episode[:1]) + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode + + @override(OffPolicyEstimator) + def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + rewards = batch["rewards"] + + v_behavior = np.mean(rewards) + v_target = self._compute_v_target(batch) + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode + + def _compute_v_target(self, init_step): + v_target = self.model.estimate_v(init_step) + v_target = convert_to_numpy(v_target).mean() + return v_target @override(OffPolicyEstimator) def train(self, batch: SampleBatchType) -> Dict[str, Any]: diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index 8f1b90c7d849..8a249a3d1033 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -77,61 +77,60 @@ def __init__( ), "self.model must implement `estimate_q`!" @override(OffPolicyEstimator) - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Compute off-policy estimates. + def estimate_multi_step(self, episode: SampleBatchType) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + + rewards, old_prob = episode["rewards"], episode["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, episode + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + v_behavior = 0.0 + v_target = 0.0 + q_values = self.model.estimate_q(episode) + q_values = convert_to_numpy(q_values) + v_values = self.model.estimate_v(episode) + v_values = convert_to_numpy(v_values) + assert q_values.shape == v_values.shape == (episode.count,) + + for t in reversed(range(episode.count)): + v_behavior = rewards[t] + self.gamma * v_behavior + v_target = v_values[t] + (new_prob[t] / old_prob[t]) * ( + rewards[t] + self.gamma * v_target - q_values[t] + ) + v_target = v_target.item() - Args: - batch: The SampleBatch to run off-policy estimation on + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target - Returns: - A dict consists of the following metrics: - - v_behavior: The discounted return averaged over episodes in the batch - - v_behavior_std: The standard deviation corresponding to v_behavior - - v_target: The estimated discounted return for `self.policy`, - averaged over episodes in the batch - - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8)' - - v_delta: The difference between v_target and v_behavior. - """ - batch = self.convert_ma_batch_to_sample_batch(batch) - self.check_action_prob_in_batch(batch) - estimates_per_epsiode = {"v_behavior": [], "v_target": []} - # Calculate doubly robust OPE estimates - for episode in batch.split_by_episode(): - rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) - new_prob = np.exp(convert_to_numpy(log_likelihoods)) - - v_behavior = 0.0 - v_target = 0.0 - q_values = self.model.estimate_q(episode) - q_values = convert_to_numpy(q_values) - v_values = self.model.estimate_v(episode) - v_values = convert_to_numpy(v_values) - assert q_values.shape == v_values.shape == (episode.count,) - - for t in reversed(range(episode.count)): - v_behavior = rewards[t] + self.gamma * v_behavior - v_target = v_values[t] + (new_prob[t] / old_prob[t]) * ( - rewards[t] + self.gamma * v_target - q_values[t] - ) - v_target = v_target.item() - - estimates_per_epsiode["v_behavior"].append(v_behavior) - estimates_per_epsiode["v_target"].append(v_target) - - estimates = { - "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), - "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), - "v_target": np.mean(estimates_per_epsiode["v_target"]), - "v_target_std": np.std(estimates_per_epsiode["v_target"]), - } - estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) - estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] - - return estimates + return estimates_per_epsiode + + + @override(OffPolicyEstimator) + def estimate_single_step(self, batch: SampleBatchType) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + + rewards, old_prob = batch["rewards"], batch["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, batch + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + q_values = self.model.estimate_q(batch) + q_values = convert_to_numpy(q_values) + v_values = self.model.estimate_v(batch) + v_values = convert_to_numpy(v_values) + + v_behavior = np.mean(rewards) + + weight = new_prob / old_prob + v_target = np.mean(v_values + weight * (rewards - q_values)) + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode @override(OffPolicyEstimator) def train(self, batch: SampleBatchType) -> Dict[str, Any]: diff --git a/rllib/offline/estimators/feature_importance.py b/rllib/offline/estimators/feature_importance.py index dcc6e242f62f..07034d4dfb35 100644 --- a/rllib/offline/estimators/feature_importance.py +++ b/rllib/offline/estimators/feature_importance.py @@ -1,104 +1,11 @@ -# TODO (@Kourosh) move this to a better location and consolidate the parent class with -# OPE +from ray.rllib.offline.feature_importance import FeatureImportance -from typing import Callable, Dict, Any -from ray.rllib.policy import Policy -from ray.rllib.utils.typing import SampleBatchType -from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator +__all__ = ["FeatureImportance"] -import numpy as np -import copy +from ray.rllib.utils.deprecation import deprecation_warning - -def perturb_fn(batch: np.ndarray, index: int): - # shuffle the indexth column features - random_inds = np.random.permutation(batch.shape[0]) - batch[:, index] = batch[random_inds, index] - - -class FeatureImportance(OffPolicyEstimator): - def __init__( - self, - policy: Policy, - gamma: float, - repeat: int = 1, - perturb_fn: Callable[[np.ndarray, int], None] = perturb_fn, - ): - """Feature importance in a model inspection technique that can be used for any - fitted predictor when the data is tablular. - - This implementation is also known as permutation importance that is defined to - be the variation of the model's prediction when a single feature value is - randomly shuffled. In RLlib it is implemented as a custom OffPolicyEstimator - which is used to evaluate RLlib policies without performing environment - interactions. - - Example usage: In the example below the feature importance module is used to - evaluate the policy and the each feature's importance is computed after each - training iteration. The permutation are repeated `self.repeat` times and the - results are averages across repeats. - - ```python - config = ( - AlgorithmConfig() - .offline_data( - off_policy_estimation_methods= - { - "feature_importance": { - "type": FeatureImportance, - "repeat": 10 - } - } - ) - ) - - algorithm = DQN(config=config) - results = algorithm.train() - ``` - - Args: - policy: the policy to use for feature importance. - repeat: number of times to repeat the perturbation. - gamma: dummy discount factor to be passed to the super class. - perturb_fn: function to perturb the features. By default reshuffle the - features within the batch. - """ - super().__init__(policy, gamma) - self.repeat = repeat - self.perturb_fn = perturb_fn - - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Estimate the feature importance of the policy. - - Given a batch of tabular observations, the importance of each feature is - computed by perturbing each feature and computing the difference between the - perturbed policy and the reference policy. The importance is computed for each - feature and each perturbation is repeated `self.repeat` times. - - Args: - batch: the batch of data to use for feature importance. - - Returns: - A dict mapping each feature index string to its importance. - """ - - obs_batch = batch["obs"] - n_features = obs_batch.shape[-1] - importance = np.zeros((self.repeat, n_features)) - - ref_actions, _, _ = self.policy.compute_actions(obs_batch, explore=False) - for r in range(self.repeat): - for i in range(n_features): - copy_obs_batch = copy.deepcopy(obs_batch) - perturb_fn(copy_obs_batch, index=i) - perturbed_actions, _, _ = self.policy.compute_actions( - copy_obs_batch, explore=False - ) - - importance[r, i] = np.mean(np.abs(perturbed_actions - ref_actions)) - - # take an average across repeats - importance = importance.mean(0) - metrics = {f"feature_{i}": importance[i] for i in range(len(importance))} - - return metrics +deprecation_warning( + "ray.rllib.offline.estimators.feature_importance.FeatureImportance", + "ray.rllib.offline.feature_importance.FeatureImportance", + error=False +) \ No newline at end of file diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index 256ca8102bd9..bf405f8463f0 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -1,5 +1,6 @@ from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.utils.annotations import override, DeveloperAPI +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.typing import SampleBatchType from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict @@ -24,58 +25,51 @@ class ImportanceSampling(OffPolicyEstimator): For more information refer to https://arxiv.org/pdf/1911.06854.pdf""" @override(OffPolicyEstimator) - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Compute off-policy estimates. - - Args: - batch: The SampleBatch to run off-policy estimation on - - Returns: - A dict consists of the following metrics: - - v_behavior: The discounted return averaged over episodes in the batch - - v_behavior_std: The standard deviation corresponding to v_behavior - - v_target: The estimated discounted return for `self.policy`, - averaged over episodes in the batch - - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8) - - v_delta: The difference between v_target and v_behavior. - """ - batch = self.convert_ma_batch_to_sample_batch(batch) - self.check_action_prob_in_batch(batch) - estimates_per_epsiode = {"v_behavior": [], "v_target": []} - for episode in batch.split_by_episode(): - rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) - new_prob = np.exp(convert_to_numpy(log_likelihoods)) - - # calculate importance ratios - p = [] - for t in range(episode.count): - if t == 0: - pt_prev = 1.0 - else: - pt_prev = p[t - 1] - p.append(pt_prev * new_prob[t] / old_prob[t]) - - # calculate stepwise IS estimate - v_behavior = 0.0 - v_target = 0.0 - for t in range(episode.count): - v_behavior += rewards[t] * self.gamma ** t - v_target += p[t] * rewards[t] * self.gamma ** t - - estimates_per_epsiode["v_behavior"].append(v_behavior) - estimates_per_epsiode["v_target"].append(v_target) - - estimates = { - "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), - "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), - "v_target": np.mean(estimates_per_epsiode["v_target"]), - "v_target_std": np.std(estimates_per_epsiode["v_target"]), - } - estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) - estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] - - return estimates + def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + + rewards, old_prob = episode["rewards"], episode["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, episode + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + # calculate importance ratios + p = [] + for t in range(episode.count): + if t == 0: + pt_prev = 1.0 + else: + pt_prev = p[t - 1] + p.append(pt_prev * new_prob[t] / old_prob[t]) + + # calculate stepwise IS estimate + v_behavior = 0.0 + v_target = 0.0 + for t in range(episode.count): + v_behavior += rewards[t] * self.gamma ** t + v_target += p[t] * rewards[t] * self.gamma ** t + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode + + @override(OffPolicyEstimator) + def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + + rewards, old_prob = batch["rewards"], batch["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, batch + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + weights = new_prob / old_prob + v_behavior = np.mean(rewards) + v_target = np.mean(weights * rewards) + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index 5a20e8b0d95b..feba8b0ca2d7 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -1,3 +1,6 @@ +import numpy as np +import tree + import logging from ray.rllib.policy.sample_batch import ( MultiAgentBatch, @@ -6,21 +9,22 @@ ) from ray.rllib.policy import Policy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict -from ray.rllib.utils.annotations import DeveloperAPI +from ray.rllib.utils.annotations import DeveloperAPI, is_overridden from ray.rllib.utils.deprecation import Deprecated from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.typing import TensorType, SampleBatchType +from ray.rllib.offline.offline_evaluator import OfflineEvaluator from typing import Dict, Any -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) @DeveloperAPI -class OffPolicyEstimator: - """Interface for an off policy reward estimator.""" +class OffPolicyEstimator(OfflineEvaluator): + """Interface for an off policy estimator for counterfactual evaluation.""" @DeveloperAPI - def __init__(self, policy: Policy, gamma: float): + def __init__(self, policy: Policy, gamma: float = 0.0): """Initializes an OffPolicyEstimator instance. Args: @@ -31,18 +35,98 @@ def __init__(self, policy: Policy, gamma: float): self.gamma = gamma @DeveloperAPI - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Returns off-policy estimates for the given batch of episodes. + def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: + """Returns off-policy estimates for the given one episode. + + Args: + batch: The episode to calculate the off-policy estimates (OPE) on. The + episode must be a sample batch type that contains the fields "obs", + "actions", and "action_prob". + + Returns: + The off-policy estimates (OPE) calculated on the given episode. The returned + dict can be any arbitrary mapping of strings to metrics. + """ + raise NotImplementedError + + @DeveloperAPI + def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: + """Returns off-policy estimates for the batch of single timesteps. This is + highly optimized for bandits assuming each episode is a single timestep. + + Args: + batch: The episode to calculate the off-policy estimates (OPE) on. The + episode must be a sample batch type that contains the fields "obs", + "actions", and "action_prob". + + Returns: + The off-policy estimates (OPE) calculated on the given episode. The returned + dict can be any arbitrary mapping of strings to metrics. + """ + raise NotImplementedError + + @DeveloperAPI + def estimate(self, + batch: SampleBatchType, + split_by_episode: bool = True + ) -> Dict[str, Any]: + """Compute off-policy estimates. Args: batch: The batch to calculate the off-policy estimates (OPE) on. The batch must contain the fields "obs", "actions", and "action_prob". + split_by_episode: Whether to split the batch by episode. Returns: The off-policy estimates (OPE) calculated on the given batch. The returned dict can be any arbitrary mapping of strings to metrics. + The dict consists of the following metrics: + - v_behavior: The discounted return averaged over episodes in the batch + - v_behavior_std: The standard deviation corresponding to v_behavior + - v_target: The estimated discounted return for `self.policy`, + averaged over episodes in the batch + - v_target_std: The standard deviation corresponding to v_target + - v_gain: v_target / max(v_behavior, 1e-8) + - v_delta: The difference between v_target and v_behavior. """ - raise NotImplementedError + batch = self.convert_ma_batch_to_sample_batch(batch) + self.check_action_prob_in_batch(batch) + estimates_per_epsiode = [] + if split_by_episode: + for episode in batch.split_by_episode(): + assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( + "The episode must contain only one episode id. For some reason " + "the split_by_episode() method could not successfully split " + "the batch by episodes. Each row in the dataset should be " + "one episode. Check your evaluation dataset for errors." + ) + estimate_step_results = self.estimate_multi_step(episode) + estimates_per_epsiode.append(estimate_step_results) + else: + if is_overridden(self.estimate_single_step): + estimates_per_epsiode.append(self.estimate_single_step(batch)) + else: + raise NotImplementedError( + "The method estimate_single_step is not implemented. " + "Please override the method estimate_single_step or set " + "split_by_episode to True." + ) + + # turn a list of identical dicts into a dict of lists + estimates_per_epsiode = tree.map_structure( + lambda *x: list(x), *estimates_per_epsiode + ) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + + return estimates @DeveloperAPI def convert_ma_batch_to_sample_batch(self, batch: SampleBatchType) -> SampleBatch: diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 6b12cc718c45..3fb4a19faccc 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -1,13 +1,14 @@ + +from typing import Dict +import numpy as np + from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict from ray.rllib.policy import Policy from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.numpy import convert_to_numpy -from ray.rllib.utils.typing import SampleBatchType -import numpy as np -from typing import Dict, Any - - +import warnings @DeveloperAPI class WeightedImportanceSampling(OffPolicyEstimator): """The step-wise WIS estimator. @@ -34,67 +35,58 @@ def __init__(self, policy: Policy, gamma: float): self.filter_counts = [] @override(OffPolicyEstimator) - def estimate(self, batch: SampleBatchType) -> Dict[str, Any]: - """Compute off-policy estimates. - - Args: - batch: The SampleBatch to run off-policy estimation on - - Returns: - A dict consists of the following metrics: - - v_behavior: The discounted return averaged over episodes in the batch - - v_behavior_std: The standard deviation corresponding to v_behavior - - v_target: The estimated discounted return for `self.policy`, - averaged over episodes in the batch - - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8), averaged over episodes - - v_gain_std: The standard deviation corresponding to v_gain - - v_delta: The difference between v_target and v_behavior. - """ - batch = self.convert_ma_batch_to_sample_batch(batch) - self.check_action_prob_in_batch(batch) - estimates_per_epsiode = {"v_behavior": [], "v_target": []} - for episode in batch.split_by_episode(): - rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) - new_prob = np.exp(convert_to_numpy(log_likelihoods)) - - # calculate importance ratios - p = [] - for t in range(episode.count): - if t == 0: - pt_prev = 1.0 - else: - pt_prev = p[t - 1] - p.append(pt_prev * new_prob[t] / old_prob[t]) - for t, v in enumerate(p): - if t >= len(self.filter_values): - self.filter_values.append(v) - self.filter_counts.append(1.0) - else: - self.filter_values[t] += v - self.filter_counts[t] += 1.0 - - # calculate stepwise weighted IS estimate - v_behavior = 0.0 - v_target = 0.0 - for t in range(episode.count): - v_behavior += rewards[t] * self.gamma ** t - w_t = self.filter_values[t] / self.filter_counts[t] - v_target += p[t] / w_t * rewards[t] * self.gamma ** t - - estimates_per_epsiode["v_behavior"].append(v_behavior) - estimates_per_epsiode["v_target"].append(v_target) - - estimates = { - "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), - "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), - "v_target": np.mean(estimates_per_epsiode["v_target"]), - "v_target_std": np.std(estimates_per_epsiode["v_target"]), - } - estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) - estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] - - return estimates + def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + rewards, old_prob = episode["rewards"], episode["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, episode + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + # calculate importance ratios + p = [] + for t in range(episode.count): + if t == 0: + pt_prev = 1.0 + else: + pt_prev = p[t - 1] + p.append(pt_prev * new_prob[t] / old_prob[t]) + for t, v in enumerate(p): + if t >= len(self.filter_values): + self.filter_values.append(v) + self.filter_counts.append(1.0) + else: + self.filter_values[t] += v + self.filter_counts[t] += 1.0 + + # calculate stepwise weighted IS estimate + v_behavior = 0.0 + v_target = 0.0 + + for t in range(episode.count): + v_behavior += rewards[t] * self.gamma ** t + w_t = self.filter_values[t] / self.filter_counts[t] + v_target += p[t] / w_t * rewards[t] * self.gamma ** t + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode + + @override(OffPolicyEstimator) + def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {"v_behavior": None, "v_target": None} + rewards, old_prob = batch["rewards"], batch["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict( + self.policy, batch + ) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + weights = new_prob / old_prob + v_behavior = np.mean(rewards) + v_target = np.mean(weights * rewards) / np.mean(weights) + + estimates_per_epsiode["v_behavior"] = v_behavior + estimates_per_epsiode["v_target"] = v_target + + return estimates_per_epsiode diff --git a/rllib/offline/feature_importance.py b/rllib/offline/feature_importance.py new file mode 100644 index 000000000000..534339c9d8ae --- /dev/null +++ b/rllib/offline/feature_importance.py @@ -0,0 +1,102 @@ +from typing import Callable, Dict, Any +from ray.rllib.policy import Policy +from ray.rllib.utils.annotations import override, DeveloperAPI +from ray.rllib.utils.typing import SampleBatchType +from ray.rllib.offline.offline_evaluator import OfflineEvaluator + +import numpy as np +import copy + + +def perturb_fn(batch: np.ndarray, index: int): + # shuffle the indexth column features + random_inds = np.random.permutation(batch.shape[0]) + batch[:, index] = batch[random_inds, index] + +@DeveloperAPI +class FeatureImportance(OfflineEvaluator): + + @override(OfflineEvaluator) + def __init__( + self, + policy: Policy, + repeat: int = 1, + perturb_fn: Callable[[np.ndarray, int], None] = perturb_fn, + ): + """Feature importance in a model inspection technique that can be used for any + fitted predictor when the data is tablular. + + This implementation is also known as permutation importance that is defined to + be the variation of the model's prediction when a single feature value is + randomly shuffled. In RLlib it is implemented as a custom OffPolicyEstimator + which is used to evaluate RLlib policies without performing environment + interactions. + + Example usage: In the example below the feature importance module is used to + evaluate the policy and the each feature's importance is computed after each + training iteration. The permutation are repeated `self.repeat` times and the + results are averages across repeats. + + ```python + config = ( + AlgorithmConfig() + .offline_data( + off_policy_estimation_methods= + { + "feature_importance": { + "type": FeatureImportance, + "repeat": 10 + } + } + ) + ) + + algorithm = DQN(config=config) + results = algorithm.train() + ``` + + Args: + policy: the policy to use for feature importance. + repeat: number of times to repeat the perturbation. + perturb_fn: function to perturb the features. By default reshuffle the + features within the batch. + """ + super().__init__(policy) + self.repeat = repeat + self.perturb_fn = perturb_fn + + def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: + """Estimate the feature importance of the policy. + + Given a batch of tabular observations, the importance of each feature is + computed by perturbing each feature and computing the difference between the + perturbed policy and the reference policy. The importance is computed for each + feature and each perturbation is repeated `self.repeat` times. + + Args: + batch: the batch of data to use for feature importance. + + Returns: + A dict mapping each feature index string to its importance. + """ + + obs_batch = batch["obs"] + n_features = obs_batch.shape[-1] + importance = np.zeros((self.repeat, n_features)) + + ref_actions, _, _ = self.policy.compute_actions(obs_batch, explore=False) + for r in range(self.repeat): + for i in range(n_features): + copy_obs_batch = copy.deepcopy(obs_batch) + perturb_fn(copy_obs_batch, index=i) + perturbed_actions, _, _ = self.policy.compute_actions( + copy_obs_batch, explore=False + ) + + importance[r, i] = np.mean(np.abs(perturbed_actions - ref_actions)) + + # take an average across repeats + importance = importance.mean(0) + metrics = {f"feature_{i}": importance[i] for i in range(len(importance))} + + return metrics diff --git a/rllib/offline/offline_evaluator.py b/rllib/offline/offline_evaluator.py new file mode 100644 index 000000000000..28a64a154672 --- /dev/null +++ b/rllib/offline/offline_evaluator.py @@ -0,0 +1,51 @@ +import logging +from ray.rllib.policy import Policy +from ray.rllib.utils.annotations import DeveloperAPI +from ray.rllib.utils.typing import SampleBatchType +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +@DeveloperAPI +class OfflineEvaluator: + """Interface for an offline evaluator of a policy""" + + @DeveloperAPI + def __init__(self, policy: Policy, **kwargs): + """Initializes an OffPolicyEstimator instance. + + Args: + policy: Policy to evaluate. + kwargs: forward compatibility placeholder. + """ + self.policy = policy + + + @DeveloperAPI + def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: + """Returns the evaluation results for the given batch of episodes. + + Args: + batch: The batch to evaluate. + kwargs: forward compatibility placeholder. + + Returns: + The evaluation done on the given batch. The returned + dict can be any arbitrary mapping of strings to metrics. + """ + raise NotImplementedError + + + @DeveloperAPI + def train(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: + """Sometimes you need to train a model inside an evaluator. This method + abstracts the training process. + + Args: + batch: SampleBatch to train on + kwargs: forward compatibility placeholder. + + Returns: + Any optional metrics to return from the evaluator + """ + return {} diff --git a/rllib/offline/estimators/tests/test_feature_importance.py b/rllib/offline/tests/test_feature_importance.py similarity index 91% rename from rllib/offline/estimators/tests/test_feature_importance.py rename to rllib/offline/tests/test_feature_importance.py index 5a53cba05170..14b3d60752cf 100644 --- a/rllib/offline/estimators/tests/test_feature_importance.py +++ b/rllib/offline/tests/test_feature_importance.py @@ -20,7 +20,7 @@ def test_feat_importance_cartpole(self): sample_batch = synchronous_parallel_sample(worker_set=runner.workers) for repeat in [1, 10]: - evaluator = FeatureImportance(policy=policy, gamma=0.0, repeat=repeat) + evaluator = FeatureImportance(policy=policy, repeat=repeat) estimate = evaluator.estimate(sample_batch) From c9f82c718707f1f8609591031226923bdcfed6c1 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 27 Sep 2022 16:33:39 -0700 Subject: [PATCH 02/16] algorithm ope evaluation is now able to skip split_by_episode Signed-off-by: Kourosh Hakhamaneshi --- rllib/algorithms/algorithm.py | 22 ++++++++++++++++++---- rllib/algorithms/algorithm_config.py | 13 +++++++++++-- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index c205e267f804..6c9754984e86 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -25,6 +25,7 @@ Type, Union, ) +import tree import ray from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag @@ -910,6 +911,8 @@ def duration_fn(num_units_done): _agent_steps if self._by_agent_steps else _env_steps ) if self.reward_estimators: + # TODO: (kourosh) This approach will cause an OOM issue when + # the dataset gets huge all_batches.extend(batches) agent_steps_this_iter += _agent_steps @@ -936,10 +939,21 @@ def duration_fn(num_units_done): if self.reward_estimators: # Compute off-policy estimates metrics["off_policy_estimator"] = {} - total_batch = concat_samples(all_batches) - for name, estimator in self.reward_estimators.items(): - estimates = estimator.estimate(total_batch) - metrics["off_policy_estimator"][name] = estimates + estimates = defaultdict(list) + # for each batch run the estimator's fwd pass + for batch in all_batches: + for name, estimator in self.reward_estimators.items(): + estimate_result = estimator.estimate( + batch, + split_by_episode=self.config['ope_split_by_episode'] + ) + estimates[name].append(estimate_result) + # collate estimates from all batches + for name, estimate_list in estimates.items(): + avg_estimate = tree.map_structure( + lambda *x: np.mean(x, axis=0), *estimate_list + ) + metrics["off_policy_estimator"][name] = avg_estimate # Evaluation does not run for every step. # Save evaluation metrics on trainer, so it can be attached to diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 46e8fb13b1a6..d61b465e68b8 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -179,6 +179,7 @@ def __init__(self, algo_class=None): self.evaluation_parallel_to_training = False self.evaluation_config = {} self.off_policy_estimation_methods = {} + self.ope_split_by_episode = True self.evaluation_num_workers = 0 self.custom_evaluation_function = None self.always_attach_evaluation_results = False @@ -826,6 +827,7 @@ def evaluation( Union["AlgorithmConfig", PartialAlgorithmConfigDict] ] = None, off_policy_estimation_methods: Optional[Dict] = None, + ope_split_by_episode: Optional[bool] = None, evaluation_num_workers: Optional[int] = None, custom_evaluation_function: Optional[Callable] = None, always_attach_evaluation_results: Optional[bool] = None, @@ -879,6 +881,11 @@ def evaluation( You can also add additional config arguments to be passed to the OffPolicyEstimator in the dict, e.g. {"qreg_dr": {"type": DoublyRobust, "q_model_type": "qreg", "k": 5}} + ope_split_by_episode: Whether to use SampleBatch.split_by_episode() to + split the input batch to episodes before estimating the ope metrics. In + case of bandits you should make this False to see improvements in ope + evaluation speed. In case of bandits, it is ok to not split by episode, + since each record is one timestep already. The default is True. evaluation_num_workers: Number of parallel workers to use for evaluation. Note that this is set to zero by default, which means evaluation will be run in the algorithm process (only if evaluation_interval is not @@ -924,10 +931,12 @@ def evaluation( self.evaluation_num_workers = evaluation_num_workers if custom_evaluation_function is not None: self.custom_evaluation_function = custom_evaluation_function - if always_attach_evaluation_results: + if always_attach_evaluation_results is not None: self.always_attach_evaluation_results = always_attach_evaluation_results - if enable_async_evaluation: + if enable_async_evaluation is not None: self.enable_async_evaluation = enable_async_evaluation + if ope_split_by_episode is not None: + self.ope_split_by_episode = ope_split_by_episode return self From 601e92b4f01317cf94b6ec11cd2b885bd5984a83 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 27 Sep 2022 16:35:24 -0700 Subject: [PATCH 03/16] lint Signed-off-by: Kourosh Hakhamaneshi --- rllib/algorithms/algorithm.py | 5 ++--- rllib/algorithms/algorithm_config.py | 8 ++++---- rllib/offline/estimators/doubly_robust.py | 13 ++++--------- rllib/offline/estimators/feature_importance.py | 6 +++--- rllib/offline/estimators/importance_sampling.py | 10 +++------- rllib/offline/estimators/off_policy_estimator.py | 16 ++++++++-------- .../estimators/weighted_importance_sampling.py | 11 ++++------- rllib/offline/feature_importance.py | 2 +- rllib/offline/offline_evaluator.py | 7 +++---- 9 files changed, 32 insertions(+), 46 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 6c9754984e86..1be127357bce 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -911,7 +911,7 @@ def duration_fn(num_units_done): _agent_steps if self._by_agent_steps else _env_steps ) if self.reward_estimators: - # TODO: (kourosh) This approach will cause an OOM issue when + # TODO: (kourosh) This approach will cause an OOM issue when # the dataset gets huge all_batches.extend(batches) @@ -944,8 +944,7 @@ def duration_fn(num_units_done): for batch in all_batches: for name, estimator in self.reward_estimators.items(): estimate_result = estimator.estimate( - batch, - split_by_episode=self.config['ope_split_by_episode'] + batch, split_by_episode=self.config["ope_split_by_episode"] ) estimates[name].append(estimate_result) # collate estimates from all batches diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index d61b465e68b8..717bc90c0925 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -881,10 +881,10 @@ def evaluation( You can also add additional config arguments to be passed to the OffPolicyEstimator in the dict, e.g. {"qreg_dr": {"type": DoublyRobust, "q_model_type": "qreg", "k": 5}} - ope_split_by_episode: Whether to use SampleBatch.split_by_episode() to - split the input batch to episodes before estimating the ope metrics. In - case of bandits you should make this False to see improvements in ope - evaluation speed. In case of bandits, it is ok to not split by episode, + ope_split_by_episode: Whether to use SampleBatch.split_by_episode() to + split the input batch to episodes before estimating the ope metrics. In + case of bandits you should make this False to see improvements in ope + evaluation speed. In case of bandits, it is ok to not split by episode, since each record is one timestep already. The default is True. evaluation_num_workers: Number of parallel workers to use for evaluation. Note that this is set to zero by default, which means evaluation will diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index 8a249a3d1033..dc685defd789 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -79,11 +79,9 @@ def __init__( @override(OffPolicyEstimator) def estimate_multi_step(self, episode: SampleBatchType) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} - + rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) new_prob = np.exp(convert_to_numpy(log_likelihoods)) v_behavior = 0.0 @@ -106,15 +104,12 @@ def estimate_multi_step(self, episode: SampleBatchType) -> Dict[str, float]: return estimates_per_epsiode - @override(OffPolicyEstimator) def estimate_single_step(self, batch: SampleBatchType) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} - + rewards, old_prob = batch["rewards"], batch["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, batch - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) new_prob = np.exp(convert_to_numpy(log_likelihoods)) q_values = self.model.estimate_q(batch) diff --git a/rllib/offline/estimators/feature_importance.py b/rllib/offline/estimators/feature_importance.py index 07034d4dfb35..dbe5013fb6e2 100644 --- a/rllib/offline/estimators/feature_importance.py +++ b/rllib/offline/estimators/feature_importance.py @@ -6,6 +6,6 @@ deprecation_warning( "ray.rllib.offline.estimators.feature_importance.FeatureImportance", - "ray.rllib.offline.feature_importance.FeatureImportance", - error=False -) \ No newline at end of file + "ray.rllib.offline.feature_importance.FeatureImportance", + error=False, +) diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index bf405f8463f0..1ca5fbe4d920 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -29,9 +29,7 @@ def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) new_prob = np.exp(convert_to_numpy(log_likelihoods)) # calculate importance ratios @@ -60,15 +58,13 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = batch["rewards"], batch["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, batch - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) new_prob = np.exp(convert_to_numpy(log_likelihoods)) weights = new_prob / old_prob v_behavior = np.mean(rewards) v_target = np.mean(weights * rewards) - + estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index feba8b0ca2d7..6e56ba1c174b 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -19,6 +19,7 @@ logger = logging.getLogger(__name__) + @DeveloperAPI class OffPolicyEstimator(OfflineEvaluator): """Interface for an off policy estimator for counterfactual evaluation.""" @@ -36,11 +37,11 @@ def __init__(self, policy: Policy, gamma: float = 0.0): @DeveloperAPI def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: - """Returns off-policy estimates for the given one episode. + """Returns off-policy estimates for the given one episode. Args: batch: The episode to calculate the off-policy estimates (OPE) on. The - episode must be a sample batch type that contains the fields "obs", + episode must be a sample batch type that contains the fields "obs", "actions", and "action_prob". Returns: @@ -52,11 +53,11 @@ def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: @DeveloperAPI def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: """Returns off-policy estimates for the batch of single timesteps. This is - highly optimized for bandits assuming each episode is a single timestep. + highly optimized for bandits assuming each episode is a single timestep. Args: batch: The episode to calculate the off-policy estimates (OPE) on. The - episode must be a sample batch type that contains the fields "obs", + episode must be a sample batch type that contains the fields "obs", "actions", and "action_prob". Returns: @@ -66,9 +67,8 @@ def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: raise NotImplementedError @DeveloperAPI - def estimate(self, - batch: SampleBatchType, - split_by_episode: bool = True + def estimate( + self, batch: SampleBatchType, split_by_episode: bool = True ) -> Dict[str, Any]: """Compute off-policy estimates. @@ -92,7 +92,7 @@ def estimate(self, batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) estimates_per_epsiode = [] - if split_by_episode: + if split_by_episode: for episode in batch.split_by_episode(): assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( "The episode must contain only one episode id. For some reason " diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 3fb4a19faccc..30bfd25b3dc7 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -1,4 +1,3 @@ - from typing import Dict import numpy as np @@ -9,6 +8,8 @@ from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.numpy import convert_to_numpy import warnings + + @DeveloperAPI class WeightedImportanceSampling(OffPolicyEstimator): """The step-wise WIS estimator. @@ -38,9 +39,7 @@ def __init__(self, policy: Policy, gamma: float): def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, episode - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) new_prob = np.exp(convert_to_numpy(log_likelihoods)) # calculate importance ratios @@ -77,9 +76,7 @@ def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = batch["rewards"], batch["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict( - self.policy, batch - ) + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) new_prob = np.exp(convert_to_numpy(log_likelihoods)) weights = new_prob / old_prob diff --git a/rllib/offline/feature_importance.py b/rllib/offline/feature_importance.py index 534339c9d8ae..ad5f7cc19a70 100644 --- a/rllib/offline/feature_importance.py +++ b/rllib/offline/feature_importance.py @@ -13,9 +13,9 @@ def perturb_fn(batch: np.ndarray, index: int): random_inds = np.random.permutation(batch.shape[0]) batch[:, index] = batch[random_inds, index] + @DeveloperAPI class FeatureImportance(OfflineEvaluator): - @override(OfflineEvaluator) def __init__( self, diff --git a/rllib/offline/offline_evaluator.py b/rllib/offline/offline_evaluator.py index 28a64a154672..2af668bf17e2 100644 --- a/rllib/offline/offline_evaluator.py +++ b/rllib/offline/offline_evaluator.py @@ -6,6 +6,7 @@ logger = logging.getLogger(__name__) + @DeveloperAPI class OfflineEvaluator: """Interface for an offline evaluator of a policy""" @@ -20,14 +21,13 @@ def __init__(self, policy: Policy, **kwargs): """ self.policy = policy - @DeveloperAPI def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: """Returns the evaluation results for the given batch of episodes. Args: batch: The batch to evaluate. - kwargs: forward compatibility placeholder. + kwargs: forward compatibility placeholder. Returns: The evaluation done on the given batch. The returned @@ -35,10 +35,9 @@ def estimate(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: """ raise NotImplementedError - @DeveloperAPI def train(self, batch: SampleBatchType, **kwargs) -> Dict[str, Any]: - """Sometimes you need to train a model inside an evaluator. This method + """Sometimes you need to train a model inside an evaluator. This method abstracts the training process. Args: From 676babd664b3e01317bd1628d629217e563dd657 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 27 Sep 2022 16:36:13 -0700 Subject: [PATCH 04/16] lint Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/importance_sampling.py | 3 +-- rllib/offline/estimators/weighted_importance_sampling.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index 1ca5fbe4d920..b7a73e87b8b4 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -1,10 +1,9 @@ from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.policy.sample_batch import SampleBatch -from ray.rllib.utils.typing import SampleBatchType from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict -from typing import Dict, Any +from typing import Dict import numpy as np diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 30bfd25b3dc7..dc249409cf0f 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -7,7 +7,6 @@ from ray.rllib.policy import Policy from ray.rllib.utils.annotations import override, DeveloperAPI from ray.rllib.utils.numpy import convert_to_numpy -import warnings @DeveloperAPI From 5e4d9e0a9977950f242dc2572744043d69872598 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 27 Sep 2022 17:09:52 -0700 Subject: [PATCH 05/16] fixed some unittests Signed-off-by: Kourosh Hakhamaneshi --- rllib/BUILD | 2 +- rllib/algorithms/algorithm.py | 2 +- rllib/offline/estimators/off_policy_estimator.py | 15 +++++++++++---- rllib/offline/feature_importance.py | 1 + 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 1de1c9347f6f..9fe9b1e0a66b 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1814,7 +1814,7 @@ py_test( name = "test_feature_importance", tags = ["team:rllib", "offline", "torch_only"], size = "medium", - srcs = ["offline/estimators/tests/test_feature_importance.py"] + srcs = ["offline/tests/test_feature_importance.py"] ) py_test( diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 55db3fa1df9d..5dc6fdebb4b7 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -912,7 +912,7 @@ def duration_fn(num_units_done): ) if self.reward_estimators: # TODO: (kourosh) This approach will cause an OOM issue when - # the dataset gets huge + # the dataset gets huge (should be ok for now). all_batches.extend(batches) agent_steps_this_iter += _agent_steps diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index 44c4b5eb82b9..c1353c7dab3d 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -9,7 +9,11 @@ ) from ray.rllib.policy import Policy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict -from ray.rllib.utils.annotations import DeveloperAPI, is_overridden +from ray.rllib.utils.annotations import ( + DeveloperAPI, + is_overridden, + OverrideToImplementCustomLogic, +) from ray.rllib.utils.deprecation import Deprecated from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.typing import TensorType, SampleBatchType @@ -36,13 +40,15 @@ def __init__(self, policy: Policy, gamma: float = 0.0): self.gamma = gamma @DeveloperAPI + @OverrideToImplementCustomLogic def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: """Returns off-policy estimates for the given one episode. Args: batch: The episode to calculate the off-policy estimates (OPE) on. The episode must be a sample batch type that contains the fields "obs", - "actions", and "action_prob". + "actions", and "action_prob" and it needs to represent a + complete trajectory. Returns: The off-policy estimates (OPE) calculated on the given episode. The returned @@ -51,13 +57,14 @@ def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: raise NotImplementedError @DeveloperAPI + @OverrideToImplementCustomLogic def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: """Returns off-policy estimates for the batch of single timesteps. This is highly optimized for bandits assuming each episode is a single timestep. Args: - batch: The episode to calculate the off-policy estimates (OPE) on. The - episode must be a sample batch type that contains the fields "obs", + batch: The batch to calculate the off-policy estimates (OPE) on. The + batch must be a sample batch type that contains the fields "obs", "actions", and "action_prob". Returns: diff --git a/rllib/offline/feature_importance.py b/rllib/offline/feature_importance.py index ad5f7cc19a70..d4cb808cdff0 100644 --- a/rllib/offline/feature_importance.py +++ b/rllib/offline/feature_importance.py @@ -8,6 +8,7 @@ import copy +@DeveloperAPI def perturb_fn(batch: np.ndarray, index: int): # shuffle the indexth column features random_inds = np.random.permutation(batch.shape[0]) From e4e53f63e3bed7e5f9e64cc018b1e6cdf28ee1ce Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 10:21:49 -0700 Subject: [PATCH 06/16] wip Signed-off-by: Kourosh Hakhamaneshi --- rllib/algorithms/algorithm.py | 23 ++--- rllib/algorithms/algorithm_config.py | 10 +- .../offline/estimators/importance_sampling.py | 4 +- .../estimators/off_policy_estimator.py | 29 +++--- rllib/offline/estimators/tests/test_ope.py | 93 +++++++++++++++++++ .../weighted_importance_sampling.py | 4 +- 6 files changed, 131 insertions(+), 32 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 5dc6fdebb4b7..f05712f2fd26 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -936,18 +936,19 @@ def duration_fn(num_units_done): # TODO: Remove this key at some point. Here for backward compatibility. metrics["timesteps_this_iter"] = env_steps_this_iter - if self.reward_estimators: - # Compute off-policy estimates - metrics["off_policy_estimator"] = {} - estimates = defaultdict(list) - # for each batch run the estimator's fwd pass + # Compute off-policy estimates + estimates = defaultdict(list) + # for each batch run the estimator's fwd pass + for name, estimator in self.reward_estimators.items(): for batch in all_batches: - for name, estimator in self.reward_estimators.items(): - estimate_result = estimator.estimate( - batch, split_by_episode=self.config["ope_split_by_episode"] - ) - estimates[name].append(estimate_result) - # collate estimates from all batches + estimate_result = estimator.estimate( + batch, split_batch_by_episode=self.config["ope_split_batch_by_episode"] + ) + estimates[name].append(estimate_result) + + # collate estimates from all batches + if estimates: + metrics["off_policy_estimator"] = {} for name, estimate_list in estimates.items(): avg_estimate = tree.map_structure( lambda *x: np.mean(x, axis=0), *estimate_list diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 2b857651c384..429aefa93243 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -180,7 +180,7 @@ def __init__(self, algo_class=None): self.evaluation_parallel_to_training = False self.evaluation_config = {} self.off_policy_estimation_methods = {} - self.ope_split_by_episode = True + self.ope_split_batch_by_episode = True self.evaluation_num_workers = 0 self.custom_evaluation_function = None self.always_attach_evaluation_results = False @@ -828,7 +828,7 @@ def evaluation( Union["AlgorithmConfig", PartialAlgorithmConfigDict] ] = None, off_policy_estimation_methods: Optional[Dict] = None, - ope_split_by_episode: Optional[bool] = None, + ope_split_batch_by_episode: Optional[bool] = None, evaluation_num_workers: Optional[int] = None, custom_evaluation_function: Optional[Callable] = None, always_attach_evaluation_results: Optional[bool] = None, @@ -882,7 +882,7 @@ def evaluation( You can also add additional config arguments to be passed to the OffPolicyEstimator in the dict, e.g. {"qreg_dr": {"type": DoublyRobust, "q_model_type": "qreg", "k": 5}} - ope_split_by_episode: Whether to use SampleBatch.split_by_episode() to + ope_split_batch_by_episode: Whether to use SampleBatch.split_by_episode() to split the input batch to episodes before estimating the ope metrics. In case of bandits you should make this False to see improvements in ope evaluation speed. In case of bandits, it is ok to not split by episode, @@ -936,8 +936,8 @@ def evaluation( self.always_attach_evaluation_results = always_attach_evaluation_results if enable_async_evaluation is not None: self.enable_async_evaluation = enable_async_evaluation - if ope_split_by_episode is not None: - self.ope_split_by_episode = ope_split_by_episode + if ope_split_batch_by_episode is not None: + self.ope_split_batch_by_episode = ope_split_batch_by_episode return self diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index b7a73e87b8b4..c7b63e37db4f 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -61,8 +61,8 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: new_prob = np.exp(convert_to_numpy(log_likelihoods)) weights = new_prob / old_prob - v_behavior = np.mean(rewards) - v_target = np.mean(weights * rewards) + v_behavior = rewards + v_target = weights * rewards estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index c1353c7dab3d..dac13397e771 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -18,7 +18,7 @@ from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.typing import TensorType, SampleBatchType from ray.rllib.offline.offline_evaluator import OfflineEvaluator -from typing import Dict, Any +from typing import Dict, Any, Sequence logger = logging.getLogger(__name__) @@ -58,7 +58,10 @@ def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: @DeveloperAPI @OverrideToImplementCustomLogic - def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: + def estimate_single_step(self, + batch: SampleBatch, + **kwargs + ) -> Dict[str, Sequence[float]]: """Returns off-policy estimates for the batch of single timesteps. This is highly optimized for bandits assuming each episode is a single timestep. @@ -69,20 +72,21 @@ def estimate_single_step(self, batch: SampleBatch, **kwargs) -> Dict[str, Any]: Returns: The off-policy estimates (OPE) calculated on the given episode. The returned - dict can be any arbitrary mapping of strings to metrics. + dict can be any arbitrary mapping of strings to a list of floats capturing + the values per each record. """ raise NotImplementedError @DeveloperAPI def estimate( - self, batch: SampleBatchType, split_by_episode: bool = True + self, batch: SampleBatchType, split_batch_by_episode: bool = True ) -> Dict[str, Any]: """Compute off-policy estimates. Args: batch: The batch to calculate the off-policy estimates (OPE) on. The batch must contain the fields "obs", "actions", and "action_prob". - split_by_episode: Whether to split the batch by episode. + split_batch_by_episode: Whether to split the batch by episode. Returns: The off-policy estimates (OPE) calculated on the given batch. The returned @@ -99,7 +103,7 @@ def estimate( batch = self.convert_ma_batch_to_sample_batch(batch) self.check_action_prob_in_batch(batch) estimates_per_epsiode = [] - if split_by_episode: + if split_batch_by_episode: for episode in batch.split_by_episode(): assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( "The episode must contain only one episode id. For some reason " @@ -109,9 +113,15 @@ def estimate( ) estimate_step_results = self.estimate_multi_step(episode) estimates_per_epsiode.append(estimate_step_results) + + # turn a list of identical dicts into a dict of lists + estimates_per_epsiode = tree.map_structure( + lambda *x: list(x), *estimates_per_epsiode + ) else: if is_overridden(self.estimate_single_step): - estimates_per_epsiode.append(self.estimate_single_step(batch)) + # the returned dict is a mapping of strings to a list of floats + estimates_per_epsiode = self.estimate_single_step(batch) else: raise NotImplementedError( "The method estimate_single_step is not implemented. " @@ -119,11 +129,6 @@ def estimate( "split_by_episode to True." ) - # turn a list of identical dicts into a dict of lists - estimates_per_epsiode = tree.map_structure( - lambda *x: list(x), *estimates_per_epsiode - ) - estimates = { "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 53c43b27e34c..3fe4a1ff3169 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -2,6 +2,7 @@ import os import unittest from pathlib import Path +import time import numpy as np from ray.data import read_json @@ -35,6 +36,98 @@ } +class FakePolicy: + """A fake policy used in test ope math to emulate a target policy that is better and worse than the random behavioral policy. + + In case of an improved policy, we assign 50 percent higher probs to those actions that attained a higher reward and 50 percent lower probs to those actions that attained a lower reward. We do the reverse in case of a worse policy. + """ + + def __init__(self, sample_batch, improved=True): + self.sample_batch = sample_batch + self.improved = improved + self.config = {} + + def compute_log_likelihoods( + self, + actions, + obs_batch, + state_batches, + prev_action_batch, + prev_reward_batch, + actions_normalized, + ): + inds = obs_batch[:, 0] + old_probs = self.sample_batch[SampleBatch.ACTION_PROB][inds] + old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] + + if self.improved: + # assign 50 percent higher prob to those that gave a good reward and 50 percent lower prob to those that gave a bad reward + # rewards are 1 or 2 in this case + new_probs = ( + (old_rewards == 2) * 1.5 * old_probs + + (old_rewards == 1) * 0.5 * old_probs + ) + else: + new_probs = ( + (old_rewards == 2) * 0.5 * old_probs + + (old_rewards == 1) * 1.5 * old_probs + ) + + return np.log(new_probs) + +class TestOPEMath(unittest.TestCase): + """Tests some sanity checks that should pass based on the math of ope methods.""" + + @classmethod + def setUpClass(cls): + ray.init() + + bsize = 1024 + action_dim = 2 + cls.sample_batch = SampleBatch({ + SampleBatch.OBS: np.arange(bsize).reshape(-1, 1), + SampleBatch.ACTIONS: np.random.randint(0, action_dim, size=bsize), + SampleBatch.REWARDS: np.random.randint(1, 3, size=bsize), # rewards are 1 or 2 + SampleBatch.DONES: np.ones(bsize), + SampleBatch.EPS_ID: np.arange(bsize), + SampleBatch.ACTION_PROB: np.ones(bsize)/action_dim, + }) + + cls.good_target_policy = FakePolicy(cls.sample_batch, improved=True) + cls.bad_target_policy = FakePolicy(cls.sample_batch, improved=False) + + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + + def test_is_math(self): + estimator_good = ImportanceSampling(self.good_target_policy, gamma=0) + s = time.time() + estimate_1 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=True, + ) + dt1 = time.time() - s + s = time.time() + estimate_2 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=False + ) + dt2 = time.time() - s + + breakpoint() + # check if the v_gain is larger than 1 + self.assertGreater(estimate_2["v_gain"], 1) + + # check that the estimates are the same + check(estimate_1, estimate_2) + + self.assertGreater(dt1, dt2, + f"in bandits split_by_episode = False should improve " + f"performance, dt2={dt2}, dt1={dt1}" + ) + + class TestOPE(unittest.TestCase): """Compilation tests for using OPE both standalone and in an RLlib Algorithm""" diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index dc249409cf0f..72c6c04bc127 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -79,8 +79,8 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: new_prob = np.exp(convert_to_numpy(log_likelihoods)) weights = new_prob / old_prob - v_behavior = np.mean(rewards) - v_target = np.mean(weights * rewards) / np.mean(weights) + v_behavior = rewards + v_target = weights * rewards / np.mean(weights) estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target From 34cd60260d156919e1a59f42c3913441af0a8919 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 12:14:40 -0700 Subject: [PATCH 07/16] wip Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/direct_method.py | 2 +- rllib/offline/estimators/doubly_robust.py | 2 +- .../offline/estimators/importance_sampling.py | 2 +- .../estimators/off_policy_estimator.py | 4 +- rllib/offline/estimators/tests/test_ope.py | 52 +++--- .../weighted_importance_sampling.py | 154 +++++++++++++++--- 6 files changed, 164 insertions(+), 52 deletions(-) diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index 62a7c4c0931b..ccffeda1498c 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -63,7 +63,7 @@ def __init__( ), "self.model must implement `estimate_v`!" @override(OffPolicyEstimator) - def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards = episode["rewards"] diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index dc685defd789..6d3e613249d3 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -77,7 +77,7 @@ def __init__( ), "self.model must implement `estimate_q`!" @override(OffPolicyEstimator) - def estimate_multi_step(self, episode: SampleBatchType) -> Dict[str, float]: + def estimate_on_episode(self, episode: SampleBatchType) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = episode["rewards"], episode["action_prob"] diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index c7b63e37db4f..af9e09ad29a3 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -24,7 +24,7 @@ class ImportanceSampling(OffPolicyEstimator): For more information refer to https://arxiv.org/pdf/1911.06854.pdf""" @override(OffPolicyEstimator) - def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = episode["rewards"], episode["action_prob"] diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index dac13397e771..165901bb9d3f 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -41,7 +41,7 @@ def __init__(self, policy: Policy, gamma: float = 0.0): @DeveloperAPI @OverrideToImplementCustomLogic - def estimate_multi_step(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: + def estimate_on_episode(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: """Returns off-policy estimates for the given one episode. Args: @@ -111,7 +111,7 @@ def estimate( "the batch by episodes. Each row in the dataset should be " "one episode. Check your evaluation dataset for errors." ) - estimate_step_results = self.estimate_multi_step(episode) + estimate_step_results = self.estimate_on_episode(episode) estimates_per_epsiode.append(estimate_step_results) # turn a list of identical dicts into a dict of lists diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 3fe4a1ff3169..7a1cb516cf69 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -96,36 +96,44 @@ def setUpClass(cls): cls.good_target_policy = FakePolicy(cls.sample_batch, improved=True) cls.bad_target_policy = FakePolicy(cls.sample_batch, improved=False) - @classmethod def tearDownClass(cls): ray.shutdown() + def test_is_and_wis_math(self): + """Tests that the importance sampling and weighted importance sampling + methods are correct based on the math.""" - def test_is_math(self): - estimator_good = ImportanceSampling(self.good_target_policy, gamma=0) - s = time.time() - estimate_1 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=True, - ) - dt1 = time.time() - s - s = time.time() - estimate_2 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=False - ) - dt2 = time.time() - s + ope_classes = [ + ImportanceSampling, + WeightedImportanceSampling, + ] - breakpoint() - # check if the v_gain is larger than 1 - self.assertGreater(estimate_2["v_gain"], 1) + for class_module in ope_classes: + estimator_good = class_module(self.good_target_policy, gamma=0) - # check that the estimates are the same - check(estimate_1, estimate_2) + s = time.time() + estimate_1 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=True, + ) + dt1 = time.time() - s - self.assertGreater(dt1, dt2, - f"in bandits split_by_episode = False should improve " - f"performance, dt2={dt2}, dt1={dt1}" - ) + s = time.time() + estimate_2 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=False + ) + dt2 = time.time() - s + + # check if the v_gain is larger than 1 + self.assertGreater(estimate_2["v_gain"], 1) + + # check that the estimates are the same + check(estimate_1, estimate_2) + + self.assertGreater(dt1, dt2, + f"in bandits split_by_episode = False should improve " + f"performance, dt2={dt2}, dt1={dt1}" + ) class TestOPE(unittest.TestCase): diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 72c6c04bc127..e82425df10fd 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -1,8 +1,9 @@ -from typing import Dict +from typing import Dict, Any import numpy as np +import tree from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator -from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.sample_batch import SampleBatch, SampleBatchType from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict from ray.rllib.policy import Policy from ray.rllib.utils.annotations import override, DeveloperAPI @@ -31,40 +32,49 @@ class WeightedImportanceSampling(OffPolicyEstimator): @override(OffPolicyEstimator) def __init__(self, policy: Policy, gamma: float): super().__init__(policy, gamma) - self.filter_values = [] + self.filter_values = [] # map from time to cummulative propensity values + # map from time to number of episodes that reached this time self.filter_counts = [] + self.p = {} # map from eps id to mapping from time to propensity values @override(OffPolicyEstimator) - def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]: + def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} rewards, old_prob = episode["rewards"], episode["action_prob"] - log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) - new_prob = np.exp(convert_to_numpy(log_likelihoods)) - - # calculate importance ratios - p = [] - for t in range(episode.count): - if t == 0: - pt_prev = 1.0 - else: - pt_prev = p[t - 1] - p.append(pt_prev * new_prob[t] / old_prob[t]) - for t, v in enumerate(p): - if t >= len(self.filter_values): - self.filter_values.append(v) - self.filter_counts.append(1.0) - else: - self.filter_values[t] += v - self.filter_counts[t] += 1.0 - + # log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) + # new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + # # calculate importance ratios + # p = [] + # for t in range(episode.count): + # if t == 0: + # pt_prev = 1.0 + # else: + # pt_prev = p[t - 1] + # p.append(pt_prev * new_prob[t] / old_prob[t]) + + # for t, v in enumerate(p): + # if t >= len(self.filter_values): + # self.filter_values.append(v) + # self.filter_counts.append(1.0) + # else: + # self.filter_values[t] += v + # self.filter_counts[t] += 1.0 + + + + eps_id = episode[SampleBatch.EPS_ID][0] + if eps_id not in self.p: + raise ValueError(f"Episode {eps_id} not passed through the fit function") + # calculate stepwise weighted IS estimate v_behavior = 0.0 v_target = 0.0 - + episode_p = self.p[eps_id] for t in range(episode.count): v_behavior += rewards[t] * self.gamma ** t w_t = self.filter_values[t] / self.filter_counts[t] - v_target += p[t] / w_t * rewards[t] * self.gamma ** t + v_target += episode_p[t] / w_t * rewards[t] * self.gamma ** t estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target @@ -86,3 +96,97 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: estimates_per_epsiode["v_target"] = v_target return estimates_per_epsiode + + def _reset_stats(self): + self.filter_values = [] + self.filter_counts = [] + self.p = {} + + def _fit_on_episode(self, episode: SampleBatch) -> None: + old_prob = episode["action_prob"] + log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) + new_prob = np.exp(convert_to_numpy(log_likelihoods)) + + # calculate importance ratios + episode_p = [] + for t in range(episode.count): + if t == 0: + pt_prev = 1.0 + else: + pt_prev = episode_p[t - 1] + episode_p.append(pt_prev * new_prob[t] / old_prob[t]) + + for t, p_t in enumerate(episode_p): + if t >= len(self.filter_values): + self.filter_values.append(p_t) + self.filter_counts.append(1.0) + else: + self.filter_values[t] += p_t + self.filter_counts[t] += 1.0 + + eps_id = episode[SampleBatch.EPS_ID][0] + if eps_id in self.p: + raise ValueError( + f"Episode {eps_id} already paseed through the fit function" + ) + self.p[eps_id] = episode_p + + @DeveloperAPI + def estimate( + self, batch: SampleBatchType, split_batch_by_episode: bool = True + ) -> Dict[str, Any]: + """Compute off-policy estimates. + + Args: + batch: The batch to calculate the off-policy estimates (OPE) on. The + batch must contain the fields "obs", "actions", and "action_prob". + split_batch_by_episode: Whether to split the batch by episode. + + Returns: + The off-policy estimates (OPE) calculated on the given batch. The returned + dict can be any arbitrary mapping of strings to metrics. + The dict consists of the following metrics: + - v_behavior: The discounted return averaged over episodes in the batch + - v_behavior_std: The standard deviation corresponding to v_behavior + - v_target: The estimated discounted return for `self.policy`, + averaged over episodes in the batch + - v_target_std: The standard deviation corresponding to v_target + - v_gain: v_target / max(v_behavior, 1e-8) + - v_delta: The difference between v_target and v_behavior. + """ + batch = self.convert_ma_batch_to_sample_batch(batch) + self.check_action_prob_in_batch(batch) + estimates_per_epsiode = [] + if split_batch_by_episode: + self._reset_stats() + all_episodes = batch.split_by_episode() + for episode in all_episodes: + assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( + "The episode must contain only one episode id. For some reason " + "the split_by_episode() method could not successfully split " + "the batch by episodes. Each row in the dataset should be " + "one episode. Check your evaluation dataset for errors." + ) + self._fit_on_episode(episode) + + for episode in all_episodes: + estimate_step_results = self.estimate_on_episode(episode) + estimates_per_epsiode.append(estimate_step_results) + + # turn a list of identical dicts into a dict of lists + estimates_per_epsiode = tree.map_structure( + lambda *x: list(x), *estimates_per_epsiode + ) + else: + estimates_per_epsiode = self.estimate_single_step(batch) + + estimates = { + "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), + "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), + "v_target": np.mean(estimates_per_epsiode["v_target"]), + "v_target_std": np.std(estimates_per_epsiode["v_target"]), + } + estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) + estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] + + return estimates From ddf29105dda9aef311d9a462014e5d57d1065b48 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 14:12:51 -0700 Subject: [PATCH 08/16] fixed dm and dr variance issues Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/direct_method.py | 8 +- rllib/offline/estimators/doubly_robust.py | 4 +- rllib/offline/estimators/tests/test_ope.py | 102 --------- .../offline/estimators/tests/test_ope_math.py | 200 ++++++++++++++++++ 4 files changed, 206 insertions(+), 108 deletions(-) create mode 100644 rllib/offline/estimators/tests/test_ope_math.py diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index ccffeda1498c..985dc724a786 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -64,7 +64,7 @@ def __init__( @override(OffPolicyEstimator) def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + estimates_per_epsiode = {} rewards = episode["rewards"] v_behavior = 0.0 @@ -80,10 +80,10 @@ def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: @override(OffPolicyEstimator) def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + estimates_per_epsiode = {} rewards = batch["rewards"] - v_behavior = np.mean(rewards) + v_behavior = rewards v_target = self._compute_v_target(batch) estimates_per_epsiode["v_behavior"] = v_behavior @@ -93,7 +93,7 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: def _compute_v_target(self, init_step): v_target = self.model.estimate_v(init_step) - v_target = convert_to_numpy(v_target).mean() + v_target = convert_to_numpy(v_target) return v_target @override(OffPolicyEstimator) diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index 6d3e613249d3..3bd6f14904d9 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -117,10 +117,10 @@ def estimate_single_step(self, batch: SampleBatchType) -> Dict[str, float]: v_values = self.model.estimate_v(batch) v_values = convert_to_numpy(v_values) - v_behavior = np.mean(rewards) + v_behavior = rewards weight = new_prob / old_prob - v_target = np.mean(v_values + weight * (rewards - q_values)) + v_target = v_values + weight * (rewards - q_values) estimates_per_epsiode["v_behavior"] = v_behavior estimates_per_epsiode["v_target"] = v_target diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 7a1cb516cf69..4d6ef2382d21 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -2,7 +2,6 @@ import os import unittest from pathlib import Path -import time import numpy as np from ray.data import read_json @@ -35,107 +34,6 @@ "v_delta", } - -class FakePolicy: - """A fake policy used in test ope math to emulate a target policy that is better and worse than the random behavioral policy. - - In case of an improved policy, we assign 50 percent higher probs to those actions that attained a higher reward and 50 percent lower probs to those actions that attained a lower reward. We do the reverse in case of a worse policy. - """ - - def __init__(self, sample_batch, improved=True): - self.sample_batch = sample_batch - self.improved = improved - self.config = {} - - def compute_log_likelihoods( - self, - actions, - obs_batch, - state_batches, - prev_action_batch, - prev_reward_batch, - actions_normalized, - ): - inds = obs_batch[:, 0] - old_probs = self.sample_batch[SampleBatch.ACTION_PROB][inds] - old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] - - if self.improved: - # assign 50 percent higher prob to those that gave a good reward and 50 percent lower prob to those that gave a bad reward - # rewards are 1 or 2 in this case - new_probs = ( - (old_rewards == 2) * 1.5 * old_probs + - (old_rewards == 1) * 0.5 * old_probs - ) - else: - new_probs = ( - (old_rewards == 2) * 0.5 * old_probs + - (old_rewards == 1) * 1.5 * old_probs - ) - - return np.log(new_probs) - -class TestOPEMath(unittest.TestCase): - """Tests some sanity checks that should pass based on the math of ope methods.""" - - @classmethod - def setUpClass(cls): - ray.init() - - bsize = 1024 - action_dim = 2 - cls.sample_batch = SampleBatch({ - SampleBatch.OBS: np.arange(bsize).reshape(-1, 1), - SampleBatch.ACTIONS: np.random.randint(0, action_dim, size=bsize), - SampleBatch.REWARDS: np.random.randint(1, 3, size=bsize), # rewards are 1 or 2 - SampleBatch.DONES: np.ones(bsize), - SampleBatch.EPS_ID: np.arange(bsize), - SampleBatch.ACTION_PROB: np.ones(bsize)/action_dim, - }) - - cls.good_target_policy = FakePolicy(cls.sample_batch, improved=True) - cls.bad_target_policy = FakePolicy(cls.sample_batch, improved=False) - - @classmethod - def tearDownClass(cls): - ray.shutdown() - - def test_is_and_wis_math(self): - """Tests that the importance sampling and weighted importance sampling - methods are correct based on the math.""" - - ope_classes = [ - ImportanceSampling, - WeightedImportanceSampling, - ] - - for class_module in ope_classes: - estimator_good = class_module(self.good_target_policy, gamma=0) - - s = time.time() - estimate_1 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=True, - ) - dt1 = time.time() - s - - s = time.time() - estimate_2 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=False - ) - dt2 = time.time() - s - - # check if the v_gain is larger than 1 - self.assertGreater(estimate_2["v_gain"], 1) - - # check that the estimates are the same - check(estimate_1, estimate_2) - - self.assertGreater(dt1, dt2, - f"in bandits split_by_episode = False should improve " - f"performance, dt2={dt2}, dt1={dt1}" - ) - - class TestOPE(unittest.TestCase): """Compilation tests for using OPE both standalone and in an RLlib Algorithm""" diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py new file mode 100644 index 000000000000..5ca87c24393e --- /dev/null +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -0,0 +1,200 @@ +import unittest +import time +import gym +import torch + +import numpy as np +from ray.rllib.offline.estimators import ( + DirectMethod, + DoublyRobust, + ImportanceSampling, + WeightedImportanceSampling, +) +from ray.rllib.models.torch.torch_action_dist import TorchCategorical +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.torch_policy_v2 import TorchPolicyV2 +from ray.rllib.utils.test_utils import check + +import ray + + +class FakePolicy(TorchPolicyV2): + """A fake policy used in test ope math to emulate a target policy that is better and worse than the random behavioral policy. + + In case of an improved policy, we assign higher probs to those actions that attained a higher reward and lower probs to those actions that attained a lower reward. We do the reverse in case of a worse policy. + """ + + def __init__(self, observation_space, action_space, sample_batch, improved=True): + self.sample_batch = sample_batch + self.improved = improved + self.config = {} + + # things that are needed for FQE Torch Model + self.model = ... + self.observation_space = observation_space + self.action_space = action_space + self.device = 'cpu' + + def action_distribution_fn( + self, model, obs_batch=None, **kwargs + ): + # used in DM and DR (FQE torch model to be precise) + dist_class = TorchCategorical + + inds = obs_batch[SampleBatch.OBS][:, 0] + + old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] + old_actions = self.sample_batch[SampleBatch.ACTIONS][inds] + + dist_inputs = torch.ones( + (len(inds), self.action_space.n), dtype=torch.float32 + ) + + # add 0.5 to the action that gave a good reward (2) and subtract 0.5 from the + # action that gave a bad reward (1) + # to acheive this I can just subtract 1.5 from old_reward + delta = (old_rewards - 1.5) + if not self.improved: + # reverse the logic for a worse policy + delta = -delta + dist_inputs[torch.arange(len(inds)), old_actions] = ( + dist_inputs[torch.arange(len(inds)), old_actions] + delta + ).float() + + return dist_inputs, dist_class, None + + + def compute_log_likelihoods( + self, + actions, + obs_batch, + *args, + **kwargs, + ): + # used in IS and WIS + inds = obs_batch[:, 0] + old_probs = self.sample_batch[SampleBatch.ACTION_PROB][inds] + old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] + + if self.improved: + # assign 50 percent higher prob to those that gave a good reward and 50 percent lower prob to those that gave a bad reward + # rewards are 1 or 2 in this case + new_probs = ( + (old_rewards == 2) * 1.5 * old_probs + + (old_rewards == 1) * 0.5 * old_probs + ) + else: + new_probs = ( + (old_rewards == 2) * 0.5 * old_probs + + (old_rewards == 1) * 1.5 * old_probs + ) + + return np.log(new_probs) + +class TestOPEMath(unittest.TestCase): + """Tests some sanity checks that should pass based on the math of ope methods.""" + + @classmethod + def setUpClass(cls): + ray.init() + + bsize = 1024 + action_dim = 2 + observation_space = gym.spaces.Box(-float('inf'), float('inf'), (1, )) + action_space = gym.spaces.Discrete(action_dim) + cls.sample_batch = SampleBatch({ + SampleBatch.OBS: np.arange(bsize).reshape(-1, 1), + SampleBatch.NEXT_OBS: np.arange(bsize).reshape(-1, 1) + 1, + SampleBatch.ACTIONS: np.random.randint(0, action_dim, size=bsize), + SampleBatch.REWARDS: np.random.randint(1, 3, size=bsize), # rewards are 1 or 2 + SampleBatch.DONES: np.ones(bsize), + SampleBatch.EPS_ID: np.arange(bsize), + SampleBatch.ACTION_PROB: np.ones(bsize)/action_dim, + }) + + + cls.policies = { + "good": FakePolicy( + observation_space, action_space, cls.sample_batch, improved=True + ), + "bad": FakePolicy( + observation_space, action_space, cls.sample_batch, improved=False + ) + } + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + def test_is_and_wis_math(self): + """Tests that the importance sampling and weighted importance sampling + methods are correct based on the math.""" + + ope_classes = [ + ImportanceSampling, + WeightedImportanceSampling, + ] + + for class_module in ope_classes: + for policy_tag in ["good", "bad"]: + target_policy = self.policies[policy_tag] + estimator_good = class_module(target_policy, gamma=0) + + s = time.time() + estimate_1 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=True, + ) + dt1 = time.time() - s + + s = time.time() + estimate_2 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=False + ) + dt2 = time.time() - s + + if policy_tag == "good": + # check if the v_gain is larger than 1 + self.assertGreater(estimate_1["v_gain"], 1) + else: + self.assertLess(estimate_1["v_gain"], 1) + + # check that the estimates are the same for bandit vs RL + check(estimate_1, estimate_2) + + self.assertGreater(dt1, dt2, + f"in bandits split_by_episode = False should improve " + f"performance, dt_wo_split={dt2}, dt_with_split={dt1}" + ) + + + def test_dm_dr_math(self): + """Tests that the Direct Method and Doubly Robust methods are correct in terms of RL vs. bandits.""" + + ope_classes = [ + DirectMethod, + DoublyRobust, + ] + + for class_module in ope_classes: + target_policy = self.policies["good"] + estimator_good = class_module(target_policy, gamma=0) + + s = time.time() + estimate_1 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=True, + ) + dt1 = time.time() - s + + s = time.time() + estimate_2 = estimator_good.estimate( + self.sample_batch, split_batch_by_episode=False + ) + dt2 = time.time() - s + + # check that the estimates are the same for bandit vs RL + check(estimate_1, estimate_2) + + self.assertGreater(dt1, dt2, + f"in bandits split_by_episode = False should improve " + f"performance, dt_wo_split={dt2}, dt_with_split={dt1}" + ) From 240e2be86d1c079ac916fd362c8f0d19e5e8f0bf Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 14:15:20 -0700 Subject: [PATCH 09/16] lint Signed-off-by: Kourosh Hakhamaneshi --- rllib/BUILD | 7 ++ rllib/algorithms/algorithm.py | 5 +- .../estimators/off_policy_estimator.py | 7 +- rllib/offline/estimators/tests/test_ope.py | 1 + .../offline/estimators/tests/test_ope_math.py | 99 ++++++++++--------- .../weighted_importance_sampling.py | 29 +----- 6 files changed, 72 insertions(+), 76 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 9fe9b1e0a66b..8faa11908f6f 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1833,6 +1833,13 @@ py_test( data = ["tests/data/cartpole/small.json"], ) +py_test( + name = "test_ope_math", + tags = ["team:rllib", "offline"], + size = "small", + srcs = ["offline/estimators/tests/test_ope_math.py"] +) + py_test( name = "test_dm_learning", tags = ["team:rllib", "offline"], diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index f05712f2fd26..02a4fd895480 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -942,7 +942,10 @@ def duration_fn(num_units_done): for name, estimator in self.reward_estimators.items(): for batch in all_batches: estimate_result = estimator.estimate( - batch, split_batch_by_episode=self.config["ope_split_batch_by_episode"] + batch, + split_batch_by_episode=self.config[ + "ope_split_batch_by_episode" + ], ) estimates[name].append(estimate_result) diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index 165901bb9d3f..1a1c4d32b9a0 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -58,9 +58,8 @@ def estimate_on_episode(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: @DeveloperAPI @OverrideToImplementCustomLogic - def estimate_single_step(self, - batch: SampleBatch, - **kwargs + def estimate_single_step( + self, batch: SampleBatch, **kwargs ) -> Dict[str, Sequence[float]]: """Returns off-policy estimates for the batch of single timesteps. This is highly optimized for bandits assuming each episode is a single timestep. @@ -72,7 +71,7 @@ def estimate_single_step(self, Returns: The off-policy estimates (OPE) calculated on the given episode. The returned - dict can be any arbitrary mapping of strings to a list of floats capturing + dict can be any arbitrary mapping of strings to a list of floats capturing the values per each record. """ raise NotImplementedError diff --git a/rllib/offline/estimators/tests/test_ope.py b/rllib/offline/estimators/tests/test_ope.py index 4d6ef2382d21..53c43b27e34c 100644 --- a/rllib/offline/estimators/tests/test_ope.py +++ b/rllib/offline/estimators/tests/test_ope.py @@ -34,6 +34,7 @@ "v_delta", } + class TestOPE(unittest.TestCase): """Compilation tests for using OPE both standalone and in an RLlib Algorithm""" diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 5ca87c24393e..451e68b758f4 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -19,9 +19,12 @@ class FakePolicy(TorchPolicyV2): - """A fake policy used in test ope math to emulate a target policy that is better and worse than the random behavioral policy. + """A fake policy used in test ope math to emulate a target policy that is better + and worse than the random behavioral policy. - In case of an improved policy, we assign higher probs to those actions that attained a higher reward and lower probs to those actions that attained a lower reward. We do the reverse in case of a worse policy. + In case of an improved policy, we assign higher probs to those actions that + attained a higher reward and lower probs to those actions that attained a lower + reward. We do the reverse in case of a worse policy. """ def __init__(self, observation_space, action_space, sample_batch, improved=True): @@ -33,27 +36,23 @@ def __init__(self, observation_space, action_space, sample_batch, improved=True) self.model = ... self.observation_space = observation_space self.action_space = action_space - self.device = 'cpu' + self.device = "cpu" - def action_distribution_fn( - self, model, obs_batch=None, **kwargs - ): + def action_distribution_fn(self, model, obs_batch=None, **kwargs): # used in DM and DR (FQE torch model to be precise) dist_class = TorchCategorical inds = obs_batch[SampleBatch.OBS][:, 0] - + old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] old_actions = self.sample_batch[SampleBatch.ACTIONS][inds] - dist_inputs = torch.ones( - (len(inds), self.action_space.n), dtype=torch.float32 - ) + dist_inputs = torch.ones((len(inds), self.action_space.n), dtype=torch.float32) - # add 0.5 to the action that gave a good reward (2) and subtract 0.5 from the + # add 0.5 to the action that gave a good reward (2) and subtract 0.5 from the # action that gave a bad reward (1) # to acheive this I can just subtract 1.5 from old_reward - delta = (old_rewards - 1.5) + delta = old_rewards - 1.5 if not self.improved: # reverse the logic for a worse policy delta = -delta @@ -63,34 +62,33 @@ def action_distribution_fn( return dist_inputs, dist_class, None - def compute_log_likelihoods( self, actions, obs_batch, *args, **kwargs, - ): + ): # used in IS and WIS inds = obs_batch[:, 0] old_probs = self.sample_batch[SampleBatch.ACTION_PROB][inds] old_rewards = self.sample_batch[SampleBatch.REWARDS][inds] if self.improved: - # assign 50 percent higher prob to those that gave a good reward and 50 percent lower prob to those that gave a bad reward + # assign 50 percent higher prob to those that gave a good reward and 50 + # percent lower prob to those that gave a bad reward # rewards are 1 or 2 in this case - new_probs = ( - (old_rewards == 2) * 1.5 * old_probs + - (old_rewards == 1) * 0.5 * old_probs - ) + new_probs = (old_rewards == 2) * 1.5 * old_probs + ( + old_rewards == 1 + ) * 0.5 * old_probs else: - new_probs = ( - (old_rewards == 2) * 0.5 * old_probs + - (old_rewards == 1) * 1.5 * old_probs - ) + new_probs = (old_rewards == 2) * 0.5 * old_probs + ( + old_rewards == 1 + ) * 1.5 * old_probs return np.log(new_probs) + class TestOPEMath(unittest.TestCase): """Tests some sanity checks that should pass based on the math of ope methods.""" @@ -100,18 +98,21 @@ def setUpClass(cls): bsize = 1024 action_dim = 2 - observation_space = gym.spaces.Box(-float('inf'), float('inf'), (1, )) + observation_space = gym.spaces.Box(-float("inf"), float("inf"), (1,)) action_space = gym.spaces.Discrete(action_dim) - cls.sample_batch = SampleBatch({ - SampleBatch.OBS: np.arange(bsize).reshape(-1, 1), - SampleBatch.NEXT_OBS: np.arange(bsize).reshape(-1, 1) + 1, - SampleBatch.ACTIONS: np.random.randint(0, action_dim, size=bsize), - SampleBatch.REWARDS: np.random.randint(1, 3, size=bsize), # rewards are 1 or 2 - SampleBatch.DONES: np.ones(bsize), - SampleBatch.EPS_ID: np.arange(bsize), - SampleBatch.ACTION_PROB: np.ones(bsize)/action_dim, - }) - + cls.sample_batch = SampleBatch( + { + SampleBatch.OBS: np.arange(bsize).reshape(-1, 1), + SampleBatch.NEXT_OBS: np.arange(bsize).reshape(-1, 1) + 1, + SampleBatch.ACTIONS: np.random.randint(0, action_dim, size=bsize), + SampleBatch.REWARDS: np.random.randint( + 1, 3, size=bsize + ), # rewards are 1 or 2 + SampleBatch.DONES: np.ones(bsize), + SampleBatch.EPS_ID: np.arange(bsize), + SampleBatch.ACTION_PROB: np.ones(bsize) / action_dim, + } + ) cls.policies = { "good": FakePolicy( @@ -119,7 +120,7 @@ def setUpClass(cls): ), "bad": FakePolicy( observation_space, action_space, cls.sample_batch, improved=False - ) + ), } @classmethod @@ -127,7 +128,7 @@ def tearDownClass(cls): ray.shutdown() def test_is_and_wis_math(self): - """Tests that the importance sampling and weighted importance sampling + """Tests that the importance sampling and weighted importance sampling methods are correct based on the math.""" ope_classes = [ @@ -142,7 +143,8 @@ def test_is_and_wis_math(self): s = time.time() estimate_1 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=True, + self.sample_batch, + split_batch_by_episode=True, ) dt1 = time.time() - s @@ -151,7 +153,7 @@ def test_is_and_wis_math(self): self.sample_batch, split_batch_by_episode=False ) dt2 = time.time() - s - + if policy_tag == "good": # check if the v_gain is larger than 1 self.assertGreater(estimate_1["v_gain"], 1) @@ -161,14 +163,16 @@ def test_is_and_wis_math(self): # check that the estimates are the same for bandit vs RL check(estimate_1, estimate_2) - self.assertGreater(dt1, dt2, + self.assertGreater( + dt1, + dt2, f"in bandits split_by_episode = False should improve " - f"performance, dt_wo_split={dt2}, dt_with_split={dt1}" + f"performance, dt_wo_split={dt2}, dt_with_split={dt1}", ) - def test_dm_dr_math(self): - """Tests that the Direct Method and Doubly Robust methods are correct in terms of RL vs. bandits.""" + """Tests that the Direct Method and Doubly Robust methods are correct in terms + of RL vs. bandits.""" ope_classes = [ DirectMethod, @@ -181,7 +185,8 @@ def test_dm_dr_math(self): s = time.time() estimate_1 = estimator_good.estimate( - self.sample_batch, split_batch_by_episode=True, + self.sample_batch, + split_batch_by_episode=True, ) dt1 = time.time() - s @@ -190,11 +195,13 @@ def test_dm_dr_math(self): self.sample_batch, split_batch_by_episode=False ) dt2 = time.time() - s - + # check that the estimates are the same for bandit vs RL check(estimate_1, estimate_2) - self.assertGreater(dt1, dt2, + self.assertGreater( + dt1, + dt2, f"in bandits split_by_episode = False should improve " - f"performance, dt_wo_split={dt2}, dt_with_split={dt1}" + f"performance, dt_wo_split={dt2}, dt_with_split={dt1}", ) diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index e82425df10fd..3b74a8f349d9 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -32,41 +32,20 @@ class WeightedImportanceSampling(OffPolicyEstimator): @override(OffPolicyEstimator) def __init__(self, policy: Policy, gamma: float): super().__init__(policy, gamma) - self.filter_values = [] # map from time to cummulative propensity values + self.filter_values = [] # map from time to cummulative propensity values # map from time to number of episodes that reached this time self.filter_counts = [] - self.p = {} # map from eps id to mapping from time to propensity values + self.p = {} # map from eps id to mapping from time to propensity values @override(OffPolicyEstimator) def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: estimates_per_epsiode = {"v_behavior": None, "v_target": None} - rewards, old_prob = episode["rewards"], episode["action_prob"] - # log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) - # new_prob = np.exp(convert_to_numpy(log_likelihoods)) - - # # calculate importance ratios - # p = [] - # for t in range(episode.count): - # if t == 0: - # pt_prev = 1.0 - # else: - # pt_prev = p[t - 1] - # p.append(pt_prev * new_prob[t] / old_prob[t]) - - # for t, v in enumerate(p): - # if t >= len(self.filter_values): - # self.filter_values.append(v) - # self.filter_counts.append(1.0) - # else: - # self.filter_values[t] += v - # self.filter_counts[t] += 1.0 - - + rewards = episode["rewards"] eps_id = episode[SampleBatch.EPS_ID][0] if eps_id not in self.p: raise ValueError(f"Episode {eps_id} not passed through the fit function") - + # calculate stepwise weighted IS estimate v_behavior = 0.0 v_target = 0.0 From 33401da759ccd1be3fc51299b98e0e647243c4e2 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 16:02:02 -0700 Subject: [PATCH 10/16] cleaned up the inheritance Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/direct_method.py | 8 +- rllib/offline/estimators/doubly_robust.py | 18 +-- .../offline/estimators/importance_sampling.py | 12 +- .../estimators/off_policy_estimator.py | 88 ++++++++++---- .../weighted_importance_sampling.py | 107 +++++------------- 5 files changed, 115 insertions(+), 118 deletions(-) diff --git a/rllib/offline/estimators/direct_method.py b/rllib/offline/estimators/direct_method.py index 985dc724a786..8c0def3e48b3 100644 --- a/rllib/offline/estimators/direct_method.py +++ b/rllib/offline/estimators/direct_method.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel from ray.rllib.policy import Policy @@ -63,7 +63,7 @@ def __init__( ), "self.model must implement `estimate_v`!" @override(OffPolicyEstimator) - def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: + def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: estimates_per_epsiode = {} rewards = episode["rewards"] @@ -79,7 +79,9 @@ def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: return estimates_per_epsiode @override(OffPolicyEstimator) - def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: + def estimate_on_single_step_samples( + self, batch: SampleBatch + ) -> Dict[str, List[float]]: estimates_per_epsiode = {} rewards = batch["rewards"] diff --git a/rllib/offline/estimators/doubly_robust.py b/rllib/offline/estimators/doubly_robust.py index 3bd6f14904d9..6a0e0ffcc926 100644 --- a/rllib/offline/estimators/doubly_robust.py +++ b/rllib/offline/estimators/doubly_robust.py @@ -1,9 +1,11 @@ import logging -from typing import Dict, Any, Optional +import numpy as np + +from typing import Dict, Any, Optional, List from ray.rllib.policy import Policy +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.annotations import DeveloperAPI, override from ray.rllib.utils.typing import SampleBatchType -import numpy as np from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict @@ -77,8 +79,8 @@ def __init__( ), "self.model must implement `estimate_q`!" @override(OffPolicyEstimator) - def estimate_on_episode(self, episode: SampleBatchType) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: + estimates_per_epsiode = {} rewards, old_prob = episode["rewards"], episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) @@ -105,8 +107,10 @@ def estimate_on_episode(self, episode: SampleBatchType) -> Dict[str, float]: return estimates_per_epsiode @override(OffPolicyEstimator) - def estimate_single_step(self, batch: SampleBatchType) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_step_samples( + self, batch: SampleBatch + ) -> Dict[str, List[float]]: + estimates_per_epsiode = {} rewards, old_prob = batch["rewards"], batch["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) @@ -135,7 +139,7 @@ def train(self, batch: SampleBatchType) -> Dict[str, Any]: batch: A SampleBatch or MultiAgentbatch to train on Returns: - A dict with key "loss" and value as the mean training loss. + A dict with key "loss" and value as the mean training loss. """ batch = self.convert_ma_batch_to_sample_batch(batch) losses = self.model.train(batch) diff --git a/rllib/offline/estimators/importance_sampling.py b/rllib/offline/estimators/importance_sampling.py index af9e09ad29a3..162d35a41dea 100644 --- a/rllib/offline/estimators/importance_sampling.py +++ b/rllib/offline/estimators/importance_sampling.py @@ -3,7 +3,7 @@ from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict -from typing import Dict +from typing import Dict, List import numpy as np @@ -24,8 +24,8 @@ class ImportanceSampling(OffPolicyEstimator): For more information refer to https://arxiv.org/pdf/1911.06854.pdf""" @override(OffPolicyEstimator) - def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, float]: + estimates_per_epsiode = {} rewards, old_prob = episode["rewards"], episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) @@ -53,8 +53,10 @@ def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: return estimates_per_epsiode @override(OffPolicyEstimator) - def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_step_samples( + self, batch: SampleBatch + ) -> Dict[str, List[float]]: + estimates_per_epsiode = {} rewards, old_prob = batch["rewards"], batch["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index 1a1c4d32b9a0..8f297855cf23 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -1,5 +1,6 @@ import numpy as np import tree +from typing import Dict, Any, List import logging from ray.rllib.policy.sample_batch import ( @@ -11,15 +12,12 @@ from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict from ray.rllib.utils.annotations import ( DeveloperAPI, - is_overridden, OverrideToImplementCustomLogic, ) from ray.rllib.utils.deprecation import Deprecated from ray.rllib.utils.numpy import convert_to_numpy from ray.rllib.utils.typing import TensorType, SampleBatchType from ray.rllib.offline.offline_evaluator import OfflineEvaluator -from typing import Dict, Any, Sequence - logger = logging.getLogger(__name__) @@ -40,8 +38,7 @@ def __init__(self, policy: Policy, gamma: float = 0.0): self.gamma = gamma @DeveloperAPI - @OverrideToImplementCustomLogic - def estimate_on_episode(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: + def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: """Returns off-policy estimates for the given one episode. Args: @@ -57,10 +54,10 @@ def estimate_on_episode(self, episode: SampleBatch, **kwargs) -> Dict[str, Any]: raise NotImplementedError @DeveloperAPI - @OverrideToImplementCustomLogic - def estimate_single_step( - self, batch: SampleBatch, **kwargs - ) -> Dict[str, Sequence[float]]: + def estimate_on_single_step_samples( + self, + batch: SampleBatch, + ) -> Dict[str, List[float]]: """Returns off-policy estimates for the batch of single timesteps. This is highly optimized for bandits assuming each episode is a single timestep. @@ -70,12 +67,58 @@ def estimate_single_step( "actions", and "action_prob". Returns: - The off-policy estimates (OPE) calculated on the given episode. The returned - dict can be any arbitrary mapping of strings to a list of floats capturing - the values per each record. + The off-policy estimates (OPE) calculated on the given batch of single time + step samples. The returned dict can be any arbitrary mapping of strings to + a list of floats capturing the values per each record. """ raise NotImplementedError + def on_before_split_batch_by_episode( + self, sample_batch: SampleBatch + ) -> SampleBatch: + """Called before the batch is split by episode. You can perform any + preprocessing on the batch that you want here. + e.g. adding done flags to the batch, or reseting some stats that you want to + track per episode later during estimation, .etc. + + Args: + sample_batch: The batch to split by episode. This contains multiple + episodes. + + Returns: + The modified batch before calling split_by_episode(). + """ + return sample_batch + + @OverrideToImplementCustomLogic + def on_after_split_batch_by_episode( + self, all_episodes: List[SampleBatch] + ) -> List[SampleBatch]: + """Called after the batch is split by episode. You can perform any + postprocessing on each episode that you want here. + e.g. computing advantage per episode, .etc. + + Args: + all_episodes: The list of episodes in the original batch. Each element is a + sample batch type that is a single episode. + """ + + return all_episodes + + @OverrideToImplementCustomLogic + def peak_on_single_episode(self, episode: SampleBatch) -> None: + """This is called on each episode before it is passed to + estimate_on_single_episode(). Using this method, you can get a peak at the + entire validation dataset before runnining the estimation. For examlpe if you + need to perform any normalizations of any sorts on the dataset, you can compute + the normalization parameters here. + + Args: + episode: The episode that is split from the original batch. This is a + sample batch type that is a single episode. + """ + pass + @DeveloperAPI def estimate( self, batch: SampleBatchType, split_batch_by_episode: bool = True @@ -103,14 +146,20 @@ def estimate( self.check_action_prob_in_batch(batch) estimates_per_epsiode = [] if split_batch_by_episode: - for episode in batch.split_by_episode(): + batch = self.on_before_split_batch_by_episode(batch) + all_episodes = batch.split_by_episode() + all_episodes = self.on_after_split_batch_by_episode(all_episodes) + for episode in all_episodes: assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( "The episode must contain only one episode id. For some reason " "the split_by_episode() method could not successfully split " "the batch by episodes. Each row in the dataset should be " "one episode. Check your evaluation dataset for errors." ) - estimate_step_results = self.estimate_on_episode(episode) + self.peak_on_single_episode(episode) + + for episode in all_episodes: + estimate_step_results = self.estimate_on_single_episode(episode) estimates_per_epsiode.append(estimate_step_results) # turn a list of identical dicts into a dict of lists @@ -118,15 +167,8 @@ def estimate( lambda *x: list(x), *estimates_per_epsiode ) else: - if is_overridden(self.estimate_single_step): - # the returned dict is a mapping of strings to a list of floats - estimates_per_epsiode = self.estimate_single_step(batch) - else: - raise NotImplementedError( - "The method estimate_single_step is not implemented. " - "Please override the method estimate_single_step or set " - "split_by_episode to True." - ) + # the returned dict is a mapping of strings to a list of floats + estimates_per_epsiode = self.estimate_on_single_step_samples(batch) estimates = { "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 3b74a8f349d9..5f368747c011 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -1,9 +1,8 @@ -from typing import Dict, Any +from typing import Dict, Any, List import numpy as np -import tree from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator -from ray.rllib.policy.sample_batch import SampleBatch, SampleBatchType +from ray.rllib.policy.sample_batch import SampleBatch from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict from ray.rllib.policy import Policy from ray.rllib.utils.annotations import override, DeveloperAPI @@ -32,14 +31,16 @@ class WeightedImportanceSampling(OffPolicyEstimator): @override(OffPolicyEstimator) def __init__(self, policy: Policy, gamma: float): super().__init__(policy, gamma) - self.filter_values = [] # map from time to cummulative propensity values + # map from time to cummulative propensity values + self.cummulative_ips_values = [] # map from time to number of episodes that reached this time - self.filter_counts = [] - self.p = {} # map from eps id to mapping from time to propensity values + self.episode_timestep_count = [] + # map from eps id to mapping from time to propensity values + self.p = {} @override(OffPolicyEstimator) - def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: + estimates_per_epsiode = {} rewards = episode["rewards"] eps_id = episode[SampleBatch.EPS_ID][0] @@ -52,7 +53,7 @@ def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: episode_p = self.p[eps_id] for t in range(episode.count): v_behavior += rewards[t] * self.gamma ** t - w_t = self.filter_values[t] / self.filter_counts[t] + w_t = self.cummulative_ips_values[t] / self.episode_timestep_count[t] v_target += episode_p[t] / w_t * rewards[t] * self.gamma ** t estimates_per_epsiode["v_behavior"] = v_behavior @@ -61,8 +62,10 @@ def estimate_on_episode(self, episode: SampleBatch) -> Dict[str, float]: return estimates_per_epsiode @override(OffPolicyEstimator) - def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: - estimates_per_epsiode = {"v_behavior": None, "v_target": None} + def estimate_on_single_step_samples( + self, batch: SampleBatch + ) -> Dict[str, List[float]]: + estimates_per_epsiode = {} rewards, old_prob = batch["rewards"], batch["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch) new_prob = np.exp(convert_to_numpy(log_likelihoods)) @@ -76,12 +79,16 @@ def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]: return estimates_per_epsiode - def _reset_stats(self): - self.filter_values = [] - self.filter_counts = [] + def on_before_split_batch_by_episode( + self, sample_batch: SampleBatch + ) -> SampleBatch: + self.cummulative_ips_values = [] + self.episode_timestep_count = [] self.p = {} - def _fit_on_episode(self, episode: SampleBatch) -> None: + return sample_batch + + def peak_on_single_episode(self, episode: SampleBatch) -> None: old_prob = episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) new_prob = np.exp(convert_to_numpy(log_likelihoods)) @@ -96,12 +103,12 @@ def _fit_on_episode(self, episode: SampleBatch) -> None: episode_p.append(pt_prev * new_prob[t] / old_prob[t]) for t, p_t in enumerate(episode_p): - if t >= len(self.filter_values): - self.filter_values.append(p_t) - self.filter_counts.append(1.0) + if t >= len(self.cummulative_ips_values): + self.cummulative_ips_values.append(p_t) + self.episode_timestep_count.append(1.0) else: - self.filter_values[t] += p_t - self.filter_counts[t] += 1.0 + self.cummulative_ips_values[t] += p_t + self.episode_timestep_count[t] += 1.0 eps_id = episode[SampleBatch.EPS_ID][0] if eps_id in self.p: @@ -109,63 +116,3 @@ def _fit_on_episode(self, episode: SampleBatch) -> None: f"Episode {eps_id} already paseed through the fit function" ) self.p[eps_id] = episode_p - - @DeveloperAPI - def estimate( - self, batch: SampleBatchType, split_batch_by_episode: bool = True - ) -> Dict[str, Any]: - """Compute off-policy estimates. - - Args: - batch: The batch to calculate the off-policy estimates (OPE) on. The - batch must contain the fields "obs", "actions", and "action_prob". - split_batch_by_episode: Whether to split the batch by episode. - - Returns: - The off-policy estimates (OPE) calculated on the given batch. The returned - dict can be any arbitrary mapping of strings to metrics. - The dict consists of the following metrics: - - v_behavior: The discounted return averaged over episodes in the batch - - v_behavior_std: The standard deviation corresponding to v_behavior - - v_target: The estimated discounted return for `self.policy`, - averaged over episodes in the batch - - v_target_std: The standard deviation corresponding to v_target - - v_gain: v_target / max(v_behavior, 1e-8) - - v_delta: The difference between v_target and v_behavior. - """ - batch = self.convert_ma_batch_to_sample_batch(batch) - self.check_action_prob_in_batch(batch) - estimates_per_epsiode = [] - if split_batch_by_episode: - self._reset_stats() - all_episodes = batch.split_by_episode() - for episode in all_episodes: - assert len(set(episode[SampleBatch.EPS_ID])) == 1, ( - "The episode must contain only one episode id. For some reason " - "the split_by_episode() method could not successfully split " - "the batch by episodes. Each row in the dataset should be " - "one episode. Check your evaluation dataset for errors." - ) - self._fit_on_episode(episode) - - for episode in all_episodes: - estimate_step_results = self.estimate_on_episode(episode) - estimates_per_epsiode.append(estimate_step_results) - - # turn a list of identical dicts into a dict of lists - estimates_per_epsiode = tree.map_structure( - lambda *x: list(x), *estimates_per_epsiode - ) - else: - estimates_per_epsiode = self.estimate_single_step(batch) - - estimates = { - "v_behavior": np.mean(estimates_per_epsiode["v_behavior"]), - "v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]), - "v_target": np.mean(estimates_per_epsiode["v_target"]), - "v_target_std": np.std(estimates_per_epsiode["v_target"]), - } - estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8) - estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"] - - return estimates From e340b254addd9430aaa19d66a5d8ac4bd4e6b1d1 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 16:03:31 -0700 Subject: [PATCH 11/16] lint Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/tests/test_ope_math.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 451e68b758f4..3aad004c603b 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -172,7 +172,8 @@ def test_is_and_wis_math(self): def test_dm_dr_math(self): """Tests that the Direct Method and Doubly Robust methods are correct in terms - of RL vs. bandits.""" + of RL vs. bandits. This does not check if v_gain > 1.0 because it needs a real + target policy to train on.""" ope_classes = [ DirectMethod, From 80bb48bbf8f2a03cb3e72c38bc6b2a94d3b852e4 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 16:03:52 -0700 Subject: [PATCH 12/16] lint Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/tests/test_ope_math.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 3aad004c603b..0833ade7fb51 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -172,7 +172,7 @@ def test_is_and_wis_math(self): def test_dm_dr_math(self): """Tests that the Direct Method and Doubly Robust methods are correct in terms - of RL vs. bandits. This does not check if v_gain > 1.0 because it needs a real + of RL vs. bandits. This does not check if v_gain > 1.0 because it needs a real target policy to train on.""" ope_classes = [ From b1db2ec201ac9e0dadad1c0d4e055f7cd09f4605 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 16:59:23 -0700 Subject: [PATCH 13/16] fixed test Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/tests/test_ope_math.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 0833ade7fb51..2df2a1e627ca 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -206,3 +206,10 @@ def test_dm_dr_math(self): f"in bandits split_by_episode = False should improve " f"performance, dt_wo_split={dt2}, dt_with_split={dt1}", ) + + +if __name__ == "__main__": + import pytest + import sys + + sys.exit(pytest.main(["-v", __file__])) From b576b839f9336f0a2ee516934cc8e4baae22b08f Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 17:02:28 -0700 Subject: [PATCH 14/16] nit Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/tests/test_ope_math.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rllib/offline/estimators/tests/test_ope_math.py b/rllib/offline/estimators/tests/test_ope_math.py index 2df2a1e627ca..1115868aff35 100644 --- a/rllib/offline/estimators/tests/test_ope_math.py +++ b/rllib/offline/estimators/tests/test_ope_math.py @@ -139,17 +139,17 @@ def test_is_and_wis_math(self): for class_module in ope_classes: for policy_tag in ["good", "bad"]: target_policy = self.policies[policy_tag] - estimator_good = class_module(target_policy, gamma=0) + estimator = class_module(target_policy, gamma=0) s = time.time() - estimate_1 = estimator_good.estimate( + estimate_1 = estimator.estimate( self.sample_batch, split_batch_by_episode=True, ) dt1 = time.time() - s s = time.time() - estimate_2 = estimator_good.estimate( + estimate_2 = estimator.estimate( self.sample_batch, split_batch_by_episode=False ) dt2 = time.time() - s @@ -182,17 +182,17 @@ def test_dm_dr_math(self): for class_module in ope_classes: target_policy = self.policies["good"] - estimator_good = class_module(target_policy, gamma=0) + estimator = class_module(target_policy, gamma=0) s = time.time() - estimate_1 = estimator_good.estimate( + estimate_1 = estimator.estimate( self.sample_batch, split_batch_by_episode=True, ) dt1 = time.time() - s s = time.time() - estimate_2 = estimator_good.estimate( + estimate_2 = estimator.estimate( self.sample_batch, split_batch_by_episode=False ) dt2 = time.time() - s From 7d59e37460a8f2661af559972a2264bdd3284fc6 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 17:04:36 -0700 Subject: [PATCH 15/16] fixed nits Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/off_policy_estimator.py | 6 +++--- rllib/offline/estimators/weighted_importance_sampling.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/rllib/offline/estimators/off_policy_estimator.py b/rllib/offline/estimators/off_policy_estimator.py index 8f297855cf23..bffe520fcab2 100644 --- a/rllib/offline/estimators/off_policy_estimator.py +++ b/rllib/offline/estimators/off_policy_estimator.py @@ -106,9 +106,9 @@ def on_after_split_batch_by_episode( return all_episodes @OverrideToImplementCustomLogic - def peak_on_single_episode(self, episode: SampleBatch) -> None: + def peek_on_single_episode(self, episode: SampleBatch) -> None: """This is called on each episode before it is passed to - estimate_on_single_episode(). Using this method, you can get a peak at the + estimate_on_single_episode(). Using this method, you can get a peek at the entire validation dataset before runnining the estimation. For examlpe if you need to perform any normalizations of any sorts on the dataset, you can compute the normalization parameters here. @@ -156,7 +156,7 @@ def estimate( "the batch by episodes. Each row in the dataset should be " "one episode. Check your evaluation dataset for errors." ) - self.peak_on_single_episode(episode) + self.peek_on_single_episode(episode) for episode in all_episodes: estimate_step_results = self.estimate_on_single_episode(episode) diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index 5f368747c011..ae508f98fa2e 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -79,6 +79,7 @@ def estimate_on_single_step_samples( return estimates_per_epsiode + @override(OffPolicyEstimator) def on_before_split_batch_by_episode( self, sample_batch: SampleBatch ) -> SampleBatch: @@ -88,7 +89,8 @@ def on_before_split_batch_by_episode( return sample_batch - def peak_on_single_episode(self, episode: SampleBatch) -> None: + @override(OffPolicyEstimator) + def peek_on_single_episode(self, episode: SampleBatch) -> None: old_prob = episode["action_prob"] log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode) new_prob = np.exp(convert_to_numpy(log_likelihoods)) From 0c3d09d55551958babe5a77cabead0b56bd24a77 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Wed, 28 Sep 2022 17:11:37 -0700 Subject: [PATCH 16/16] fixed the typos Signed-off-by: Kourosh Hakhamaneshi --- rllib/offline/estimators/weighted_importance_sampling.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rllib/offline/estimators/weighted_importance_sampling.py b/rllib/offline/estimators/weighted_importance_sampling.py index ae508f98fa2e..5854e2b48aa4 100644 --- a/rllib/offline/estimators/weighted_importance_sampling.py +++ b/rllib/offline/estimators/weighted_importance_sampling.py @@ -45,7 +45,10 @@ def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]: eps_id = episode[SampleBatch.EPS_ID][0] if eps_id not in self.p: - raise ValueError(f"Episode {eps_id} not passed through the fit function") + raise ValueError( + f"Cannot find target weight for episode {eps_id}. " + f"Did it go though the peek_on_single_episode() function?" + ) # calculate stepwise weighted IS estimate v_behavior = 0.0 @@ -115,6 +118,7 @@ def peek_on_single_episode(self, episode: SampleBatch) -> None: eps_id = episode[SampleBatch.EPS_ID][0] if eps_id in self.p: raise ValueError( - f"Episode {eps_id} already paseed through the fit function" + f"eps_id {eps_id} was already passed to the peek function." + f"Make sure dataset contains only unique episodes with unique ids." ) self.p[eps_id] = episode_p