Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added Remote offline server and client using arrow flight server #3

Closed
wants to merge 41 commits into from

Conversation

redhatHameed
Copy link

@redhatHameed redhatHameed commented May 9, 2024

What this PR does / why we need it:

This PR anticipates the changes needed to implement an offline store using an Arrow Flight server for data transfer.

Please note that we're using a fork of the upstream repo under the RHEcosystemAppEng organization to facilitate the collaboration among the team's developers: once we agree with the community members on the final approach, we'll complete the missing functionalities and send a PR to the community repo.

Which issue(s) this PR fixes:

Partially addresses #4032

Notes

The PR includes a working example that the reviewers can run, with a remote offline store server and the related client.
This example is only meant for explanatory purposes, and will not be part of the final PR.

The example includes a README file with some details on the proposed parameter transfer protocol, which explains how we decided to transfer the parameters of the OfflineStore APIs to the remote server. Again, this will not be committed as-is to the upstream repo but we'll create a dedicated section in the user guide.

f"grpc://{config.offline_store.host}:{config.offline_store.port}"
)
# Put API parameters
self._put_parameters(feature_refs, entity_df)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure, but I think this should also go in _to_arrow_internal method. That's the behavior in other offline stores. Nothing is actually happening until the user calls one of the "action" methods on RetrievalJob.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify more, what I understand you are asking to add self._put_parameters(feature_refs, entity_df) into function _to_arrow_internal

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, take bigquery engine for example. It's a bit more complicated, but note that _upload_entity_df (which essentially does the same thing as self._put_parameters(feature_refs, entity_df)) method is not called during the creation of RetrievalJob, it's actually part of query_generator function and is called from _to_arrow_internal/_to_df_internal.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, we can move the parameter exchange a bit later in the flow. here we only wanted to create the flight at the request time, but there's actually no reason for it, it can surely be postponed

writer.write_table(entity_df_table)
writer.close()

features_array = pa.array(feature_refs)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should be doing multiple do_puts here, the do_put should be called for entity dataset only. I would treat features as a parameter and try to pass at along either in the metadata of the previous do_put call or if that proves too hard, I think we will have to encode all the parameters in the FlightDescriptor itself (It could contain uuid and all params, some binary encoding of ours or maybe a proto message). wdyt?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes we can pass feature_refs as json string into entity_df_table metadata something like this.

      features_json = json.dumps(feature_refs)
       entity_df_table = pa.Table.from_pandas(entity_df)
       writer, _ = self.client.do_put(
           historical_flight_descriptor,
           entity_df_table.schema.with_metadata(
               {
                   "command": self.command,
                   "api": "get_historical_features",
                   "param": "entity_df",
                   "features": features_json
               }
           ),
       )

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that looks better. I still anticipate some problems with this choice, though. For example, there is an option to pass entity dataset as a query rather than a pandas dataframe (which I'm not sure we'd like to support or not, frankly). In that case, we probably wouldn't do any do_put calls whatsover as there's no data to be passed to the server. That's why putting all these into a flight descriptor seems more appropriate to me. Having said that, we can try like this first and revisit the choice later if necessary...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to minimize the do_put calls, that's a good catch.
For parameters that have a double nature like entity_df: Union[pd.DataFrame, str], or those that are of primitive type like project: str, full_feature_names: bool = False, we can try to encode them in the descriptor as you said, probably together with an identifier of the actual API like get_historical_features.
Before trying the proto message option (and the related overhead), I would try to encode them somehow.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, proto will be an overkill for now (although probably where we will end up down the line 😄). Just some dict -> json -> base64 or something like that will do.

tuple(descriptor.path or tuple()),
)

# TODO: since we cannot anticipate here the call to get_historical_features call, what data should we return?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, I think we will have to extend RetrievalJob interface to expose schema of the result set in some way.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really relevant for our use case, BTW? our protocol uses get_flight_info only to retrieve the ticket before invoking the do_get, it's not the regular use case for which the Arrow Flight protocol was initially designed for: there is no existing data set from which we can extract the schema and the metadata until we invoke the API on the store, and probably our client code is not interested in inspecting the descriptor.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, probably not relevant right now if our client implementation can do without. Might be more important if we manage to polish API to a degree when it becomes usable even without feast client.

P.S. I wouldn't say it's not the regular use case for that particular reason. True, dataset isn't ready yet, but that can be said for any sql query to any database, right? Actually it's probably pretty trivial to extract schema without running the actual retrieval job, we know the schema for entity df and also know data types for all features.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, schema is resolvable by means of the registry for the given features. we can use -1 for others requiring to run the query:

total_bytes: The size in bytes of the data in this flight, or -1 if unknown.
total_records: The total record count of this flight, or -1 if unknown.

@dmartinol
Copy link

dmartinol commented May 10, 2024

some of the suggested changes are available in PR #4

@tokoko
Copy link

tokoko commented May 10, 2024

I'll answer here to keep the discussion contained 😄 If you're encoding the whole thing anyway, wouldn't this be simpler? It would avoid do_put metadata entirely.

api_info = {
    "command": command,
    "api": "get_historical_features",
    "features": json.dumps(self.feature_refs),
}

command_descriptor = fl.FlightDescriptor.for_command(json.dumps(api_info))

@tokoko
Copy link

tokoko commented May 10, 2024

@dmartinol you also don't need to call json.dumps twice, the outer one will be sufficient.

@dmartinol
Copy link

dmartinol commented May 10, 2024

I'll answer here to keep the discussion contained 😄 If you're encoding the whole thing anyway, wouldn't this be simpler? It would avoid do_put metadata entirely.

I tried the same before, with the goal of removing the put step, but translating the df to_json raised some errors at read_json time, mainly because it seemed to lose the original data types and get_historical_features rejected it.
I can retry, of course it would help to encode everything in the same descriptor

Anyway I updated PR #4 with latest proposals

self.store = store

@classmethod
def descriptor_to_key(self, descriptor):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this to be a class method and not instance method or at least it should be in a util file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just followed the implementation approach from Arrow Flight Repo - here

Copy link

@tmihalac tmihalac May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just followed the implementation approach from Arrow Flight Repo - here

I just saying I don't like all those none object oriented concepts that are currently going on in the code

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for changing it to @staticmethod or extracting as a separate function

@redhatHameed redhatHameed added the enhancement New feature or request label May 16, 2024
dmartinol
dmartinol previously approved these changes May 17, 2024
@tokoko
Copy link

tokoko commented May 17, 2024

@dmartinol hey, you can also reuse some of the existing offline store tests by creating a custom DataSourceCreator and configuring AVAILABLE_OFFLINE_STORES here.

@dmartinol
Copy link

dmartinol commented May 20, 2024

@dmartinol hey, you can also reuse some of the existing offline store tests by creating a custom DataSourceCreator and configuring AVAILABLE_OFFLINE_STORES here.

You mean to develop an integration test, right?
I reviewed the existing examples and noticed that the remote offline store is somewhat different. It’s not just another data source; it requires two independent FeatureStores operating in server and client roles, both connected to the same registry. This might require changes to the existing framework to integrate these components (e.g., similar to the feature_server implementation)
Pls let me know if my understanding is correct.

@tokoko
Copy link

tokoko commented May 20, 2024

@dmartinol sure, it's a bit of an unusual case because of an additional server-side feature store, but other configs also require external resources here and there. For example, s3+duckdb config brings up minio in a container during startup. I was thinking we could do something similar here (ideally in a container, but might be hard to copy current code in there). Our version of DataSourceCreator would:

  1. create data sources (locally or in minio)
  2. create a server-side feature store with one of offline store implementations (my vote to duckdb 😄)
    (The steps above can probably be reused from an existing creator)
  3. start offline feature server in the background (linux process or a testcontainer similar to minio)
  4. populate create_offline_store_config with a remote offline store type.

I can give it a try later today and give you some draft version if you're fine with that...

@dmartinol
Copy link

@tokoko sure, any help is appreciated.
I was trying the separate process option, running OfflineServer the same way we serve the python feature server, but I miss the connection with the feature_server_endpoint, not sure how this is used.

BTW: what about using the regular FileDataSourceCreator as the underlying implementation? Any reasons for using DuckDb instead?

@tokoko
Copy link

tokoko commented May 20, 2024

@dmartinol We could do it that way (similar to feature server) but we would be missing out on easily reusing offline store tests in that case. The reason they decided to treat feature server as a special case was probably because they weren't planning to put feature server behind "normal" feast APIs. Once remote-http PR is merged, we can probably rethink that approach as well...

There's not too much difference between file and duckdb implementations. file offline store (despite it's confusing name) is actually a dask implementation. (see here). It's a bit slower than duckdb, but shouldn't matter one way or the other for our purposes yet. (One difference is that I added to_remote_storage support to duckdb recently, which is still missing for dask afaik).

@tokoko
Copy link

tokoko commented May 20, 2024

Turns out you're right, it's more complicated than I thought initially, particularly registry configuration for server side feature store is impossible to acquire now, DataSourceCreator is not supposed to know anything about other components configured. I still think it's worth making an effort... I'll look into changing test setup so that DataSourceCreator has a setup method that will be called with an already existing registry configuration, then we will bee able to move the flight server setup from __init__ to the new setup method. wdyt?

@tokoko
Copy link

tokoko commented May 21, 2024

@dmartinol Forgive me if this is unnecessary 😄 After the changes in 4210, you should be able to do something like this:

class RemoteDataSourceCreator(FileDataSourceCreator):
    def setup(self, registry: RegistryConfig):
        parent_offline_config = super().create_offline_store_config()

        fs = FeatureStore(
            config=RepoConfig(
                project=self.project_name,
                provider='local',
                offline_store=parent_offline_config,
                registry=registry,
                entity_key_serialization_version=2
            )
        )
        # fs.serve_offline in a subprocess

    def create_offline_store_config(self) -> FeastConfigBaseModel:
        return RemoteOfflineStoreConfig(...) 

    def teardown(self):
        super().teardown()
        # do some cleanup if necessary

@dmartinol
Copy link

Thanks @tokoko for your prompt reaction! i will give it a try tomorrow and let you know!

  • @lokeshrangineni to check if a similar solution fits the remote-http online store

@dmartinol
Copy link

@dmartinol Forgive me if this is unnecessary 😄 After the changes in 4210, you should be able to do something like this:

class RemoteDataSourceCreator(FileDataSourceCreator):
...

@tokoko sorry for delayed reply but I faced some issues with this implementation.
First of all, thanks for suggesting to use these complex ready-to-use integration tests: their complexity revealed errors that we did not face with our basic examples!

  • I had to use multiprocess.Process instead of multiprocessing.Process as it raised can't pickle _thread.lock objects. This means to update the project dependencies, unless you see another way to do the same (I'm on MacOs, in case it matters)
  • I realized that we missed to manage some of the fields in the API, like full_feature_names, so I had to introduce this parameter in the ArrowFlight command descriptor
  • Similarly while some basic tests works fine, those requiring entity mappings failed, the reason being that we also missed to manage the feature_views field in the offline store implementation. Since this can't be serialized to JSON, I decided to put only the names in the command and use registry.get_feature_view to retrieve the FeatureView in the server. It took me some time to realize that this method, which takes the view definition from the protos, adds unexpected dummy entities to the entityless views, which is not expected when you run the actual query (raises KeyError: "['__dummy_id'] not in index"). I'm trying to cleanup the instance and remove the dummies from entitites and entity_columns fields, but now I'm stuck with a new error while running test_historical_features_main:
pyarrow.lib.ArrowInvalid: Could not find feature view from reference origin:temperature. Detail: Python exception: ValueError. gRPC client debug context: UNKNOWN:Error received from peer ipv4:0.0.0.0:50215 {grpc_message:"Could not find feature view from reference origin:temperature. Detail: Python exception: ValueError", grpc_status:3, created_time:"2024-05-23T19:23:41.660071+02:00"}. Client context: IOError: Server never sent a data message. Detail: Internal

Any hints?

@tokoko
Copy link

tokoko commented May 23, 2024

  • It's a ci dependency, so should be fine for now. We can always try to rewrite it later...
  • I forgot about feature_views param as well, definitely not ideal 😄 We should probably rethink these interfaces later on. The same about entityless case handling, far from intuitive, I agree.
  • Not sure how much I can help with that error tbh. origin refers to this alias given to the location feature view right here. Aliases allow a feature service to query from the same feature view multiple times with different entities. Not sure what could lead to that error, though. Are you handling FeatureServices the same way? passing a string and retrieving the object on the server side?

@dmartinol
Copy link

  • Aliases allow a feature service to query from the same feature view multiple times with different entities. Not sure what could lead to that error, though. Are you handling FeatureServices the same way? passing a string and retrieving the object on the server side?

I see your point and I found the problem (not the fix).
In the client layer, the feature store API has features: Union[List[str], FeatureService] which is then translated to the params passed to the OfflineStore API:

        feature_views: List[FeatureView],
        feature_refs: List[str],

The remote offline store cannot rebuild the original feature store API, so it transfers the input params to the arrow server as-is. In the server we invoke the offline store API, by-passing the feature store (it's not in the current PR, but a recent change to be committed).

The problem, as you said, is that when there are feature services, the feature_views are extended to have aliases, so when we retrieve them by name we miss the aliases and the server-side query fails 😞
Maybe we need to transfer the feature views to the arrow server using an ad-hoc JSON serializer or a pyarrow writer. WDYT?

@tokoko
Copy link

tokoko commented May 23, 2024

Oh... right, got it. Trying to pass the FV objects could be a workaround, but definitely not a long-term solution. If server accepted arbitrary objects from the client and not retrieve them from registry, there would be no way to access additional info like auth/permissions in a trusted way. We shouldn't waste time on that, I think. Looks like we'll have to redesign the OfflineStore API sooner rather than later. It should accept both FeatureServices and FeatureViews/Features as simple python objects.

P.S. I don't think there will be many tests testing for FeatureServices with aliases, let's simply disable them for remote offline store for the time being.

@dmartinol
Copy link

dmartinol commented May 27, 2024

@tokoko I reviewed the server implementation to pass feature view names and disabled ITs using feature services. It will be merged to this PR after internal review.
A couple of remarks:

  • registry.get_feature_view is not returning sub-classes of FeatureView, so tests using StreamFeatureView failed. I've added an extended lookup method in the OfflineServer class for now, but probably it should be moved into the registry itself, WDYT?
  • Similarly, I've added a cleanup method to remove dummy IDs returned by FeatureView.from_proto but I assume the same logic should go into this method, unless it's breaking any other assumptions.
  • Not sure about the way to identify remote offline store in the IT code, for now it's a isinstance( environment.data_source_creator, RemoteOfflineStoreDataSourceCreator), is there any better alternative?
  • Many ITs fail because they invoke APIs of the OfflineStore interface that are currently not implemented. By the end of this PR @tmihalac will add the missing implementations and we will be able to validate the entire flow

[IMO the first 2 point deserve their own GH issue]

#9

@tokoko
Copy link

tokoko commented May 27, 2024

  • I think I already fixed the first bug as part of this PR
  • Not sure about the dummy IDs, I'll have to look into this
  • Do you need identification only to skip some tests? pytest's -k expressions might be a better alternative, although a bit harder to get right.

redhatHameed and others added 12 commits June 10, 2024 14:57
Signed-off-by: Abdul Hameed <[email protected]>
use feature_view_names to transfer feature views and remove dummies

Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Theodor Mihalache <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Theodor Mihalache <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Theodor Mihalache <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Theodor Mihalache <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
Signed-off-by: Theodor Mihalache <[email protected]>
Signed-off-by: Abdul Hameed <[email protected]>
@redhatHameed redhatHameed force-pushed the remote_offline branch 10 times, most recently from 2e1879a to 08c2616 Compare June 12, 2024 20:07
Signed-off-by: Abdul Hameed <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants