Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Fix ope speed #28834

Merged
merged 17 commits into from
Sep 29, 2022
9 changes: 8 additions & 1 deletion rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1814,7 +1814,7 @@ py_test(
name = "test_feature_importance",
tags = ["team:rllib", "offline", "torch_only"],
size = "medium",
srcs = ["offline/estimators/tests/test_feature_importance.py"]
srcs = ["offline/tests/test_feature_importance.py"]
)

py_test(
Expand All @@ -1833,6 +1833,13 @@ py_test(
data = ["tests/data/cartpole/small.json"],
)

py_test(
name = "test_ope_math",
tags = ["team:rllib", "offline"],
size = "small",
srcs = ["offline/estimators/tests/test_ope_math.py"]
)

py_test(
name = "test_dm_learning",
tags = ["team:rllib", "offline"],
Expand Down
29 changes: 23 additions & 6 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Type,
Union,
)
import tree

import ray
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
Expand Down Expand Up @@ -910,6 +911,8 @@ def duration_fn(num_units_done):
_agent_steps if self._by_agent_steps else _env_steps
)
if self.reward_estimators:
# TODO: (kourosh) This approach will cause an OOM issue when
# the dataset gets huge (should be ok for now).
all_batches.extend(batches)

agent_steps_this_iter += _agent_steps
Expand All @@ -933,13 +936,27 @@ def duration_fn(num_units_done):
# TODO: Remove this key at some point. Here for backward compatibility.
metrics["timesteps_this_iter"] = env_steps_this_iter

if self.reward_estimators:
# Compute off-policy estimates
# Compute off-policy estimates
estimates = defaultdict(list)
# for each batch run the estimator's fwd pass
for name, estimator in self.reward_estimators.items():
for batch in all_batches:
estimate_result = estimator.estimate(
batch,
split_batch_by_episode=self.config[
"ope_split_batch_by_episode"
],
)
estimates[name].append(estimate_result)

# collate estimates from all batches
if estimates:
metrics["off_policy_estimator"] = {}
total_batch = concat_samples(all_batches)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't concatenate and then split by episode here. In case of bandits we don't even need to split_by_episodes since each rows is already one episode. In case of RL, each batch is already ended at some episode so we gain nothing by concating all batches together.

for name, estimator in self.reward_estimators.items():
estimates = estimator.estimate(total_batch)
metrics["off_policy_estimator"][name] = estimates
for name, estimate_list in estimates.items():
avg_estimate = tree.map_structure(
lambda *x: np.mean(x, axis=0), *estimate_list
)
metrics["off_policy_estimator"][name] = avg_estimate

# Evaluation does not run for every step.
# Save evaluation metrics on trainer, so it can be attached to
Expand Down
13 changes: 11 additions & 2 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self, algo_class=None):
self.evaluation_parallel_to_training = False
self.evaluation_config = {}
self.off_policy_estimation_methods = {}
self.ope_split_batch_by_episode = True
self.evaluation_num_workers = 0
self.custom_evaluation_function = None
self.always_attach_evaluation_results = False
Expand Down Expand Up @@ -827,6 +828,7 @@ def evaluation(
Union["AlgorithmConfig", PartialAlgorithmConfigDict]
] = None,
off_policy_estimation_methods: Optional[Dict] = None,
ope_split_batch_by_episode: Optional[bool] = None,
evaluation_num_workers: Optional[int] = None,
custom_evaluation_function: Optional[Callable] = None,
always_attach_evaluation_results: Optional[bool] = None,
Expand Down Expand Up @@ -880,6 +882,11 @@ def evaluation(
You can also add additional config arguments to be passed to the
OffPolicyEstimator in the dict, e.g.
{"qreg_dr": {"type": DoublyRobust, "q_model_type": "qreg", "k": 5}}
ope_split_batch_by_episode: Whether to use SampleBatch.split_by_episode() to
split the input batch to episodes before estimating the ope metrics. In
case of bandits you should make this False to see improvements in ope
evaluation speed. In case of bandits, it is ok to not split by episode,
since each record is one timestep already. The default is True.
evaluation_num_workers: Number of parallel workers to use for evaluation.
Note that this is set to zero by default, which means evaluation will
be run in the algorithm process (only if evaluation_interval is not
Expand Down Expand Up @@ -925,10 +932,12 @@ def evaluation(
self.evaluation_num_workers = evaluation_num_workers
if custom_evaluation_function is not None:
self.custom_evaluation_function = custom_evaluation_function
if always_attach_evaluation_results:
if always_attach_evaluation_results is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that these are buggy. i.e. previously if always_attach_evaluation_results was set to False and by default it was true this call would not have overriden it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to explicitly check if these variables are not None otherwise False would also not get assigned

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

self.always_attach_evaluation_results = always_attach_evaluation_results
if enable_async_evaluation:
if enable_async_evaluation is not None:
self.enable_async_evaluation = enable_async_evaluation
if ope_split_batch_by_episode is not None:
self.ope_split_batch_by_episode = ope_split_batch_by_episode

return self

Expand Down
3 changes: 3 additions & 0 deletions rllib/offline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from ray.rllib.offline.output_writer import OutputWriter, NoopOutput
from ray.rllib.offline.resource import get_offline_io_resource_bundles
from ray.rllib.offline.shuffled_input import ShuffledInput
from ray.rllib.offline.feature_importance import FeatureImportance
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving feature importance out of it's previous place because it doesn't really fit the definition of off-policy evaluation in literature. It is now a sub-class of OfflineEvaluator which OffPolicyEstimator is also a sub-class of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a separate pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not done here it will break the feature_importance code.



__all__ = [
"IOContext",
Expand All @@ -24,4 +26,5 @@
"DatasetWriter",
"get_dataset_and_shards",
"get_offline_io_resource_bundles",
"FeatureImportance",
]
2 changes: 0 additions & 2 deletions rllib/offline/estimators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@
from ray.rllib.offline.estimators.direct_method import DirectMethod
from ray.rllib.offline.estimators.doubly_robust import DoublyRobust
from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator
from ray.rllib.offline.estimators.feature_importance import FeatureImportance

__all__ = [
"OffPolicyEstimator",
"ImportanceSampling",
"WeightedImportanceSampling",
"DirectMethod",
"DoublyRobust",
"FeatureImportance",
]
77 changes: 34 additions & 43 deletions rllib/offline/estimators/direct_method.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from ray.rllib.offline.estimators.off_policy_estimator import OffPolicyEstimator
from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel
from ray.rllib.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import DeveloperAPI, override
from ray.rllib.utils.typing import SampleBatchType
from ray.rllib.utils.numpy import convert_to_numpy
Expand Down Expand Up @@ -62,50 +63,40 @@ def __init__(
), "self.model must implement `estimate_v`!"

@override(OffPolicyEstimator)
def estimate(self, batch: SampleBatchType) -> Dict[str, Any]:
"""Compute off-policy estimates.
def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]:
estimates_per_epsiode = {}
rewards = episode["rewards"]

Args:
batch: The SampleBatch to run off-policy estimation on
v_behavior = 0.0
for t in range(episode.count):
v_behavior += rewards[t] * self.gamma ** t

Returns:
A dict consists of the following metrics:
- v_behavior: The discounted return averaged over episodes in the batch
- v_behavior_std: The standard deviation corresponding to v_behavior
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8)
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
# Calculate Direct Method OPE estimates
for episode in batch.split_by_episode():
rewards = episode["rewards"]
v_behavior = 0.0
v_target = 0.0
for t in range(episode.count):
v_behavior += rewards[t] * self.gamma ** t

init_step = episode[0:1]
v_target = self.model.estimate_v(init_step)
v_target = convert_to_numpy(v_target).item()

estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)

estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]

return estimates
v_target = self._compute_v_target(episode[:1])

estimates_per_epsiode["v_behavior"] = v_behavior
estimates_per_epsiode["v_target"] = v_target

return estimates_per_epsiode

@override(OffPolicyEstimator)
def estimate_on_single_step_samples(
self, batch: SampleBatch
) -> Dict[str, List[float]]:
estimates_per_epsiode = {}
rewards = batch["rewards"]

v_behavior = rewards
v_target = self._compute_v_target(batch)

estimates_per_epsiode["v_behavior"] = v_behavior
estimates_per_epsiode["v_target"] = v_target

return estimates_per_epsiode

def _compute_v_target(self, init_step):
v_target = self.model.estimate_v(init_step)
v_target = convert_to_numpy(v_target)
return v_target

@override(OffPolicyEstimator)
def train(self, batch: SampleBatchType) -> Dict[str, Any]:
Expand Down
110 changes: 54 additions & 56 deletions rllib/offline/estimators/doubly_robust.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
from typing import Dict, Any, Optional
import numpy as np

from typing import Dict, Any, Optional, List
from ray.rllib.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import DeveloperAPI, override
from ray.rllib.utils.typing import SampleBatchType
import numpy as np
from ray.rllib.utils.numpy import convert_to_numpy
from ray.rllib.utils.policy import compute_log_likelihoods_from_input_dict

Expand Down Expand Up @@ -77,61 +79,57 @@ def __init__(
), "self.model must implement `estimate_q`!"

@override(OffPolicyEstimator)
def estimate(self, batch: SampleBatchType) -> Dict[str, Any]:
"""Compute off-policy estimates.
def estimate_on_single_episode(self, episode: SampleBatch) -> Dict[str, Any]:
estimates_per_epsiode = {}

rewards, old_prob = episode["rewards"], episode["action_prob"]
log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, episode)
new_prob = np.exp(convert_to_numpy(log_likelihoods))

v_behavior = 0.0
v_target = 0.0
q_values = self.model.estimate_q(episode)
q_values = convert_to_numpy(q_values)
v_values = self.model.estimate_v(episode)
v_values = convert_to_numpy(v_values)
assert q_values.shape == v_values.shape == (episode.count,)

for t in reversed(range(episode.count)):
v_behavior = rewards[t] + self.gamma * v_behavior
v_target = v_values[t] + (new_prob[t] / old_prob[t]) * (
rewards[t] + self.gamma * v_target - q_values[t]
)
v_target = v_target.item()

Args:
batch: The SampleBatch to run off-policy estimation on
estimates_per_epsiode["v_behavior"] = v_behavior
estimates_per_epsiode["v_target"] = v_target

Returns:
A dict consists of the following metrics:
- v_behavior: The discounted return averaged over episodes in the batch
- v_behavior_std: The standard deviation corresponding to v_behavior
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8)'
- v_delta: The difference between v_target and v_behavior.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
self.check_action_prob_in_batch(batch)
estimates_per_epsiode = {"v_behavior": [], "v_target": []}
# Calculate doubly robust OPE estimates
for episode in batch.split_by_episode():
rewards, old_prob = episode["rewards"], episode["action_prob"]
log_likelihoods = compute_log_likelihoods_from_input_dict(
self.policy, episode
)
new_prob = np.exp(convert_to_numpy(log_likelihoods))

v_behavior = 0.0
v_target = 0.0
q_values = self.model.estimate_q(episode)
q_values = convert_to_numpy(q_values)
v_values = self.model.estimate_v(episode)
v_values = convert_to_numpy(v_values)
assert q_values.shape == v_values.shape == (episode.count,)

for t in reversed(range(episode.count)):
v_behavior = rewards[t] + self.gamma * v_behavior
v_target = v_values[t] + (new_prob[t] / old_prob[t]) * (
rewards[t] + self.gamma * v_target - q_values[t]
)
v_target = v_target.item()

estimates_per_epsiode["v_behavior"].append(v_behavior)
estimates_per_epsiode["v_target"].append(v_target)

estimates = {
"v_behavior": np.mean(estimates_per_epsiode["v_behavior"]),
"v_behavior_std": np.std(estimates_per_epsiode["v_behavior"]),
"v_target": np.mean(estimates_per_epsiode["v_target"]),
"v_target_std": np.std(estimates_per_epsiode["v_target"]),
}
estimates["v_gain"] = estimates["v_target"] / max(estimates["v_behavior"], 1e-8)
estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]

return estimates
return estimates_per_epsiode

@override(OffPolicyEstimator)
def estimate_on_single_step_samples(
self, batch: SampleBatch
) -> Dict[str, List[float]]:
estimates_per_epsiode = {}

rewards, old_prob = batch["rewards"], batch["action_prob"]
log_likelihoods = compute_log_likelihoods_from_input_dict(self.policy, batch)
new_prob = np.exp(convert_to_numpy(log_likelihoods))

q_values = self.model.estimate_q(batch)
q_values = convert_to_numpy(q_values)
v_values = self.model.estimate_v(batch)
v_values = convert_to_numpy(v_values)

v_behavior = rewards

weight = new_prob / old_prob
v_target = v_values + weight * (rewards - q_values)

estimates_per_epsiode["v_behavior"] = v_behavior
estimates_per_epsiode["v_target"] = v_target

return estimates_per_epsiode

@override(OffPolicyEstimator)
def train(self, batch: SampleBatchType) -> Dict[str, Any]:
Expand All @@ -141,7 +139,7 @@ def train(self, batch: SampleBatchType) -> Dict[str, Any]:
batch: A SampleBatch or MultiAgentbatch to train on

Returns:
A dict with key "loss" and value as the mean training loss.
A dict with key "loss" and value as the mean training loss.
"""
batch = self.convert_ma_batch_to_sample_batch(batch)
losses = self.model.train(batch)
Expand Down
Loading