Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Fix ope speed #28834

Merged
merged 17 commits into from
Sep 29, 2022
Merged

[RLlib] Fix ope speed #28834

merged 17 commits into from
Sep 29, 2022

Conversation

kouroshHakha
Copy link
Contributor

Why are these changes needed?

The ope abstraction is updated to separate multi-timestep ope (RL) vs. single-timestep ope (bandits) to make bandits faster.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

… OPE and feature importance

2. introduced estimate_multi_step vs. estimate_single_step

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
@@ -936,10 +939,20 @@ def duration_fn(num_units_done):
if self.reward_estimators:
# Compute off-policy estimates
metrics["off_policy_estimator"] = {}
total_batch = concat_samples(all_batches)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't concatenate and then split by episode here. In case of bandits we don't even need to split_by_episodes since each rows is already one episode. In case of RL, each batch is already ended at some episode so we gain nothing by concating all batches together.

for batch in all_batches:
for name, estimator in self.reward_estimators.items():
estimate_result = estimator.estimate(
batch, split_by_episode=self.config["ope_split_by_episode"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this optional parameter that is default to True in algorithm config. This will allow bandit users to gain speed up by setting this to False.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name this split_batch_by_episode?
also a random suggestion, I think we can reduce a level of nesty-ness, since the outmost if self.reward_estimators is not necessary:

estimates = defaultdict(list)
for name, estimator in self.reward_estimators.items():
    for batch in all_batches:
        estimate_result = estimator.estimate(...)
        estimates[name].append(estimate_result)

if estimates:
    metrics["off_policy_estimator"] = {}
for name, estimate_list in estimates.items():
    avg_estimate = tree....
    metrics["off_policy_estimator"][name] = avg_estimate

just to make the code look a bit nicer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for name: I don't know, I think ope should be part of this since it's an evaluation() variable and users need to know from the name that this is for OPE only. So something like ope_split_batch_by_episode is kinda the name that I want to put here but it's more verbose which I think is ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, makes sense, ope_split_batch_by_episode sounds much better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -925,10 +932,12 @@ def evaluation(
self.evaluation_num_workers = evaluation_num_workers
if custom_evaluation_function is not None:
self.custom_evaluation_function = custom_evaluation_function
if always_attach_evaluation_results:
if always_attach_evaluation_results is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that these are buggy. i.e. previously if always_attach_evaluation_results was set to False and by default it was true this call would not have overriden it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to explicitly check if these variables are not None otherwise False would also not get assigned

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -9,6 +9,8 @@
from ray.rllib.offline.output_writer import OutputWriter, NoopOutput
from ray.rllib.offline.resource import get_offline_io_resource_bundles
from ray.rllib.offline.shuffled_input import ShuffledInput
from ray.rllib.offline.feature_importance import FeatureImportance
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving feature importance out of it's previous place because it doesn't really fit the definition of off-policy evaluation in literature. It is now a sub-class of OfflineEvaluator which OffPolicyEstimator is also a sub-class of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a separate pr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not done here it will break the feature_importance code.

@@ -62,50 +63,38 @@ def __init__(
), "self.model must implement `estimate_v`!"

@override(OffPolicyEstimator)
def estimate(self, batch: SampleBatchType) -> Dict[str, Any]:
"""Compute off-policy estimates.
def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now separating episodic RL logic from bandits which will be easier to maintain and debug. It will also focus on our current use-case which is bandits.

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, only a few minor issues.
more significantly, I want to raise the soul-searching question of should we add some unit tests for these mathy estimate_xxx() functions, now that they are nicely separated?

for batch in all_batches:
for name, estimator in self.reward_estimators.items():
estimate_result = estimator.estimate(
batch, split_by_episode=self.config["ope_split_by_episode"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name this split_batch_by_episode?
also a random suggestion, I think we can reduce a level of nesty-ness, since the outmost if self.reward_estimators is not necessary:

estimates = defaultdict(list)
for name, estimator in self.reward_estimators.items():
    for batch in all_batches:
        estimate_result = estimator.estimate(...)
        estimates[name].append(estimate_result)

if estimates:
    metrics["off_policy_estimator"] = {}
for name, estimate_list in estimates.items():
    avg_estimate = tree....
    metrics["off_policy_estimator"][name] = avg_estimate

just to make the code look a bit nicer.

@@ -925,10 +932,12 @@ def evaluation(
self.evaluation_num_workers = evaluation_num_workers
if custom_evaluation_function is not None:
self.custom_evaluation_function = custom_evaluation_function
if always_attach_evaluation_results:
if always_attach_evaluation_results is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


@override(OffPolicyEstimator)
def estimate_single_step(self, batch: SampleBatch) -> Dict[str, float]:
estimates_per_epsiode = {"v_behavior": None, "v_target": None}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice. rename this variable to something else since we are not dealing with an episode here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should I name it? it's a batch of single time steps

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimate_per_sample?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about estimate_on_single_episode vs. estimate_on_single_step_samples?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@override(OffPolicyEstimator)
def estimate_single_step(self, batch: SampleBatchType) -> Dict[str, float]:
estimates_per_epsiode = {"v_behavior": None, "v_target": None}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

estimates["v_delta"] = estimates["v_target"] - estimates["v_behavior"]

return estimates
def estimate_multi_step(self, episode: SampleBatch) -> Dict[str, float]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a good time to add some unit tests for these single/multi-step util functions?
we want to double check the math next right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if is_overridden(self.estimate_single_step):
estimates_per_epsiode.append(self.estimate_single_step(batch))
else:
raise NotImplementedError(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you have to do this? if it's not overridden, the default implementation will just raise the Error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I don't know what's the best way to do this. I wanted to give more informative error message because they can also go on with multi_step by setting ope_split_by_episode=True

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so why not just raise this specific message in estimate_single_step() right above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because that function does not need to know about how it's called? like based on ope_split_by_episode ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way would be to try and catch the error and change the error message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. huh ok.
but however we want to do this, should we do the same for self.estimate_multi_step() call above as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimate_multi_step() always works. the other one only works on bandits. An estimator down the line may only implement one or the other or both and should raise not implemented error. The error message should tell the user that hey if you unintentionally wanted to do estimate_multi_step() but your split_by_eps is False and estimate_single_step is not implemented. you should make it True. I think of a better way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, I think we are mixing some things up here.
if you look at the two API calls above, they are both NotImplemented.
this makes me feel like the knowledge of multi-step will always work is coming from the children implementations that we already have, and it's not from the perspective of an abstract API class. like, if someone is only reading the OffPolicyEstimator class by itself, they will get curious why we catch in one place, but not the other.
either, it's fine to have the knowledge leak from children class to the APIs here, in which case, we should be able to write a more specific error message for the APIs, or we keep the API class pure, and makes no assumption about children implementation.
not trying to nit-picking, you get my idea, just trying to tell the story from a reader of our code.
thanks man.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you are right. I agree with you on the leakage problem. We don't want this info to leak from children to the base parent abstract class. I'll remove the override check then.

These nit-pickings are actually important imo. Shoot more :)

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice man. a few nits left, and also lint is failing.

for class_module in ope_classes:
for policy_tag in ["good", "bad"]:
target_policy = self.policies[policy_tag]
estimator_good = class_module(target_policy, gamma=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got confused for a second. seems this should just be estimator, since it can be good and bad depends on policy_tag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good catch. misnomer :)

return all_episodes

@OverrideToImplementCustomLogic
def peak_on_single_episode(self, episode: SampleBatch) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, peek_on_single_episode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


eps_id = episode[SampleBatch.EPS_ID][0]
if eps_id not in self.p:
raise ValueError(f"Episode {eps_id} not passed through the fit function")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the error message? it's not fit anymore? maybe say:
"Can not find target weight for episode {eps_id}. Did it go though the peek_on_single_episode() function?"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

eps_id = episode[SampleBatch.EPS_ID][0]
if eps_id in self.p:
raise ValueError(
f"Episode {eps_id} already paseed through the fit function"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, cool! let's merge this after tests pass.

@kouroshHakha
Copy link
Contributor Author

@gjoliver Let's merge?

@gjoliver gjoliver merged commit e6c995d into ray-project:master Sep 29, 2022
@gjoliver
Copy link
Member

done. like this change a lot.

WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
* 1. Introduced new abstraction: OfflineEvaluator that is the parent of OPE and feature importance
2. introduced estimate_multi_step vs. estimate_single_step

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* algorithm ope evaluation is now able to skip split_by_episode

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* lint

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* lint

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* fixed some unittests

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* wip

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* wip

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* fixed dm and dr variance issues

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* lint

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* cleaned up the inheritance

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* lint

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* lint

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* fixed test

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* nit

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* fixed nits

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

* fixed the typos

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>

Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants