Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add filters to connector pipeline #27864

Merged

Conversation

ArturNiederfahrenhorst
Copy link
Contributor

@ArturNiederfahrenhorst ArturNiederfahrenhorst commented Aug 15, 2022

Why are these changes needed?

In our efforts to include our diverse experience processing steps into the connector pipeline, this PR includes filters.
As long as connectors can be switched off, we have to support the old and new place for filters and this PR thus tried to find a path that enables both by still indexing the filters alt RolloutWorker().policy_map[<policy_id>].filter and updating them there, while instantiating them in the AgentConnectors of a policy.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
… connector context

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot for tackling this biggest work item on my connector list.
a bunch of high level comments first. happy to discuss a bit. we should probably spend a little extra effort to get these stateful connectors done right.
can you point me to where we sync these stateful filter connectors?


def to_state_dict(self):
return MeanStdObservationFilterAgentConnector.__name__, {
"filter": self.filter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we actually record the state of self.filter so it can be re-constructed without pickle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


@staticmethod
def from_state_dict(ctx: ConnectorContext, params: List[Any]):
connector = MeanStdObservationFilterAgentConnector(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, MeanStdObservationFilterAgentConnector should take demean, destd, clip, and a state dict as input. if state dict is not there, it initializes from fresh state, otherwise, resume from the existing state.
does this make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assert all(ctx.observation_space.shape == connector.filter.shape)
return connector

def reset_state(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when do we usually need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the reset_state() method is part of the SyncedFilterAgentConnector interface.
Synchronization is done via the old filter synchronization mechanism, which I would like to leave in place until we switch connectors on by default. After that, I would like to simply inline all the filter code and call the connector's reset_state method directly.

Until we have gotten that far, I can delete this method if you like.

# env_runner
if not self._is_training:
raise ValueError(
"Changes can only be applied to {} when trainin.".format(self.__name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, why can't we update during inference as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked @kouroshHakha if you update mean-std-filters during deployment because I was wondering the same thing. He told me that it is best practice to stop after training.

"""Copies all state from other filter to self."""
# inline this as soon as we deprecate ordinary filter with non-connector
# env_runner
if not self._is_training:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the discussion in your first comment!

raise ValueError(
"{} can only be synced when trainin.".format(self.__name__)
)
return self.filter.sync(other)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other is a connector, is it better to do self.filter.sync(other.filter) here, so filter doesn't need to be aware of connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This method is so far not used, otherwise, this would have thrown an error. Sorry. Just realized that this is not developer API anymore and I should probably keep unused/untested code completely out of it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok, so the connectors are not synced at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are synchronized, but through the old mechanism.
I think as long as we have to have both, filters in their old place and in connectors, it's best to keep the same mechanism.

MeanStdObservationFilterAgentConnector.__name__,
MeanStdObservationFilterAgentConnector,
)
register_connector(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments here describing the difference between these 2 filter connectors?
thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

self.filters[policy_id] = get_filter(self.observation_filter, filter_shape)
if policy_config.get("enable_connectors"):
ctx = ConnectorContext.from_policy(policy)
connector = get_synced_filter_connector(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to do this here?
can we just call get_synced_filter_connector() in get_agent_connectors_from_config()?
https://github.com/ray-project/ray/blob/master/rllib/connectors/util.py#L25

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any input to this question I had?
just wondering if we can keep things simpler and not bring connectors up to the rollout worker level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end it's definitly cleaner to move this. But right now this code block still depends on self.observation_filter which is ugly to remove or sneak into get_agent_connectors_from_config(). I think we should leave this here for the moment. When connectors are switched on by default, we can refactor a little bit. This whole PR is designed in the way that the connectors solution and the old solution share lots of code and I would like to do another PR that not only remove the other solution, but also makes the connectors solution more elegant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok get it.
I am ok with accessing policy.agent_connectors here to find the filter agent, and save a reference to its filter object. as long as we add some comments explaining it :)
but can we construct the filter agent in get_agent_connectors_from_config()? there is no reason to create the filter agent here and append it right?
we can also add a get(self, name: str) API to ConnectorPipeline so it's easier to check/find a specific connector.

the reason I am hoping to create all the connectors in a centralized place is that we actually print the complete connector setup in get_agent_connectors_from_config(). if we are gonna add things somewhere else, this message is not correct anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. I've implemented this. Let's see if tests pass, I might have to tinker a little more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this as you requested!

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost there! thanks a lot!

# place, we need to put filters into self.filters so that they get
# synchronized
filter_connectors = self.policy_map[policy_id].agent_connectors[
SyncedFilterAgentConnector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the concurrent version of the filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that has practical relevance but is just used for testing (ConcurrentMeanStdFilter is only used in test_rollout_worker and I can't think of a reason why it should be used elsewhere).
So my idea was rewrite that test to use the SyncedFilterAgentConnector instead of the ConcurrentMeanStdFilter after we make the switch and eliminate ConcurrentMeanStdFilter altogether.

-> Better get rid of ConcurrentMeanStdFilter because it's just used only for testing and instead test RolloutWorker as a ray actor. Does that make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can you add a Note or TODO here, saying that nobody should use ConcurrentMeanStdFilter at this point, since we will remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a deprecation warning

# place, we need to put filters into self.filters so that they get
# synchronized
filter_connectors = self.policy_map[name].agent_connectors[
SyncedFilterAgentConnector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied above.

@@ -1258,6 +1265,8 @@ def add_policy(
)
}

connectors_enabled = merged_config.get("enable_connectors", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this down to right above where it's first used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


self.filters[policy_id] = get_filter(self.observation_filter, filter_shape)
if connectors_enabled and policy_id in self.policy_map:
create_connectors_for_policy(self.policy_map[policy_id], self.policy_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we shouldn't call create_connectors_for_policy() here, it's actually called right below, about 20 lines down.
maybe the high level flow should be:

if connector_enabled:
    create_connectors_for_policy(...)
...
setup_filters_if_necessary()

where setup_filters_if_necessary() will do:

def setup_filters_if_necessary(self):
    if connectors_enabled:
        self.filters[policy_id] = <try to see if we have a filter connector in the agent list>
    else:
        self.filters[policy_id] = <set up a new filter>

wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Like we discussed privately, we now try to create connectors for the added policy and fail with an assertion error if this policy already has connectors.

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
…ntuitive order

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks super close now.

# place, we need to put filters into self.filters so that they get
# synchronized
filter_connectors = self.policy_map[policy_id].agent_connectors[
SyncedFilterAgentConnector
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can you add a Note or TODO here, saying that nobody should use ConcurrentMeanStdFilter at this point, since we will remove it?

# As long as the historic filter synchronization mechanism is in
# place, we need to put filters into self.filters so that they get
# synchronized
filter_connectors = self.policy_map[name].agent_connectors[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be worth it to create a small local util function for this logic, so we don't duplicate it twice here and above?
also it will make the high level code look nicer:

if connectors_enabled:
    policy = self.policy_map[name]
    create_connectors_for_policy(policy)
    maybe_get_filters_for_syncing(policy)

hopefully the long-ish if-else block above will be simplified too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a ton for the updates.
tests also look good.
merging.

@gjoliver gjoliver merged commit e339298 into ray-project:master Oct 3, 2022
WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* lint and comments

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* wip

* format

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* implements and uses filters, working for ppo cartpole with meanstd

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* get rid of synced custom filter abstraction

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* add meanstd filter connetor test and minor fixes

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* jun's comment

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* move observation space struct logic because information is already in connector context

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* fix docstrings

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* minor fixes

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* fix for config=None in add_policy and connector=None

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* fix config name

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* filter connector state is now json serializable

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* jun's comments

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* create connectors only in create_connectors_for_policy

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* get filter connector by __get__item

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* remove observation filters

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* minor fixes

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* revert spelling error

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* Revert "Merge branch 'make_add_policy_config_explicit' into filterstoconnectors"

This reverts commit 06beebc, reversing
changes made to e637f4f.

* accomodate case in which config={} in add_policy

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* fix connectors enabled not no SyncedFilterAgentConnector case

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* initial

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* merge configs in add_policy

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* format

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* revert random cloudpickle linter error

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* small change to trigger CI

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* remove all random changes outside rllib that made it into this PR

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* remove random rst

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* fix deprecated is_training call

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* correct in_eval call

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* nit

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* jun's comment

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* use merged config to create connectors

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* Add meaningful assertion error and switch order of if/else block to intuitive order

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* better warning

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* jun's comments

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

* shorter function signature for helper fn

Signed-off-by: Artur Niederfahrenhorst <[email protected]>

Signed-off-by: Artur Niederfahrenhorst <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
@ArturNiederfahrenhorst ArturNiederfahrenhorst deleted the filterstoconnectors branch January 5, 2023 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants