From 373e624abb8779b8a60d30aa08d25414d987bb1b Mon Sep 17 00:00:00 2001 From: Tornike Gurgenidze Date: Wed, 7 Feb 2024 03:26:32 +0400 Subject: [PATCH] feat: Add gRPC Registry Server (#3924) --- protos/feast/registry/RegistryServer.proto | 230 ++++++++++++++++++ sdk/python/feast/cli.py | 21 +- sdk/python/feast/constants.py | 3 + sdk/python/feast/feature_store.py | 7 + .../feast/infra/registry/base_registry.py | 4 +- sdk/python/feast/infra/registry/registry.py | 8 +- sdk/python/feast/registry_server.py | 202 +++++++++++++++ sdk/python/tests/unit/test_registry_server.py | 60 +++++ setup.py | 2 +- 9 files changed, 532 insertions(+), 5 deletions(-) create mode 100644 protos/feast/registry/RegistryServer.proto create mode 100644 sdk/python/feast/registry_server.py create mode 100644 sdk/python/tests/unit/test_registry_server.py diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto new file mode 100644 index 0000000000..3e7773e89a --- /dev/null +++ b/protos/feast/registry/RegistryServer.proto @@ -0,0 +1,230 @@ +syntax = "proto3"; + +package feast.registry; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; +import "feast/core/Registry.proto"; +import "feast/core/Entity.proto"; +import "feast/core/DataSource.proto"; +import "feast/core/FeatureView.proto"; +import "feast/core/RequestFeatureView.proto"; +import "feast/core/StreamFeatureView.proto"; +import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/FeatureService.proto"; +import "feast/core/SavedDataset.proto"; +import "feast/core/ValidationProfile.proto"; +import "feast/core/InfraObject.proto"; + +service RegistryServer{ + // Entity RPCs + rpc GetEntity (GetEntityRequest) returns (feast.core.Entity) {} + rpc ListEntities (ListEntitiesRequest) returns (ListEntitiesResponse) {} + + // DataSource RPCs + rpc GetDataSource (GetDataSourceRequest) returns (feast.core.DataSource) {} + rpc ListDataSources (ListDataSourcesRequest) returns (ListDataSourcesResponse) {} + + // FeatureView RPCs + rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} + rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} + + // RequestFeatureView RPCs + rpc GetRequestFeatureView (GetRequestFeatureViewRequest) returns (feast.core.RequestFeatureView) {} + rpc ListRequestFeatureViews (ListRequestFeatureViewsRequest) returns (ListRequestFeatureViewsResponse) {} + + // StreamFeatureView RPCs + rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {} + rpc ListStreamFeatureViews (ListStreamFeatureViewsRequest) returns (ListStreamFeatureViewsResponse) {} + + // OnDemandFeatureView RPCs + rpc GetOnDemandFeatureView (GetOnDemandFeatureViewRequest) returns (feast.core.OnDemandFeatureView) {} + rpc ListOnDemandFeatureViews (ListOnDemandFeatureViewsRequest) returns (ListOnDemandFeatureViewsResponse) {} + + // FeatureService RPCs + rpc GetFeatureService (GetFeatureServiceRequest) returns (feast.core.FeatureService) {} + rpc ListFeatureServices (ListFeatureServicesRequest) returns (ListFeatureServicesResponse) {} + + // SavedDataset RPCs + rpc GetSavedDataset (GetSavedDatasetRequest) returns (feast.core.SavedDataset) {} + rpc ListSavedDatasets (ListSavedDatasetsRequest) returns (ListSavedDatasetsResponse) {} + + // ValidationReference RPCs + rpc GetValidationReference (GetValidationReferenceRequest) returns (feast.core.ValidationReference) {} + rpc ListValidationReferences (ListValidationReferencesRequest) returns (ListValidationReferencesResponse) {} + + rpc ListProjectMetadata (ListProjectMetadataRequest) returns (ListProjectMetadataResponse) {} + rpc GetInfra (GetInfraRequest) returns (feast.core.Infra) {} + rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {} + rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {} + +} + +message RefreshRequest { + string project = 1; +} + +message GetInfraRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListProjectMetadataRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListProjectMetadataResponse { + repeated feast.core.ProjectMetadata project_metadata = 1; +} + +message GetEntityRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListEntitiesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListEntitiesResponse { + repeated feast.core.Entity entities = 1; +} + +// DataSources + +message GetDataSourceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListDataSourcesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListDataSourcesResponse { + repeated feast.core.DataSource data_sources = 1; +} + +// FeatureViews + +message GetFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListFeatureViewsResponse { + repeated feast.core.FeatureView feature_views = 1; +} + +// RequestFeatureView + +message GetRequestFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListRequestFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListRequestFeatureViewsResponse { + repeated feast.core.RequestFeatureView request_feature_views = 1; +} + +// StreamFeatureView + +message GetStreamFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListStreamFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListStreamFeatureViewsResponse { + repeated feast.core.StreamFeatureView stream_feature_views = 1; +} + +// OnDemandFeatureView + +message GetOnDemandFeatureViewRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListOnDemandFeatureViewsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListOnDemandFeatureViewsResponse { + repeated feast.core.OnDemandFeatureView on_demand_feature_views = 1; +} + +// FeatureServices + +message GetFeatureServiceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListFeatureServicesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListFeatureServicesResponse { + repeated feast.core.FeatureService feature_services = 1; +} + +// SavedDataset + +message GetSavedDatasetRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListSavedDatasetsRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListSavedDatasetsResponse { + repeated feast.core.SavedDataset saved_datasets = 1; +} + +// ValidationReference + +message GetValidationReferenceRequest { + string name = 1; + string project = 2; + bool allow_cache = 3; +} + +message ListValidationReferencesRequest { + string project = 1; + bool allow_cache = 2; +} + +message ListValidationReferencesResponse { + repeated feast.core.ValidationReference validation_references = 1; +} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 2eb2c27bcb..985c44b821 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -25,7 +25,10 @@ from pygments import formatters, highlight, lexers from feast import utils -from feast.constants import DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT +from feast.constants import ( + DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + DEFAULT_REGISTRY_SERVER_PORT, +) from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError from feast.feature_view import FeatureView from feast.infra.contrib.grpc_server import get_grpc_server @@ -753,6 +756,22 @@ def serve_transformations_command(ctx: click.Context, port: int): store.serve_transformations(port) +@cli.command("serve_registry") +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_REGISTRY_SERVER_PORT, + help="Specify a port for the server", +) +@click.pass_context +def serve_registry_command(ctx: click.Context, port: int): + """Start a registry server locally on a given port.""" + store = create_feature_store(ctx) + + store.serve_registry(port) + + @cli.command("validate") @click.option( "--feature-service", diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 574d79f416..c022ecba55 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -44,5 +44,8 @@ # Default FTS port DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT = 6569 +# Default registry server port +DEFAULT_REGISTRY_SERVER_PORT = 6570 + # Environment variable for feature server docker image tag DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d3f98f8032..4a53672b2e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2278,6 +2278,13 @@ def serve_ui( root_path=root_path, ) + @log_exceptions_and_usage + def serve_registry(self, port: int) -> None: + """Start registry server locally on a given port.""" + from feast import registry_server + + registry_server.start_server(self, port) + @log_exceptions_and_usage def serve_transformations(self, port: int) -> None: """Start the feature transformation server locally on a given port.""" diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 14b098bb12..8928a5800d 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -329,7 +329,9 @@ def list_feature_views( # request feature view operations @abstractmethod - def get_request_feature_view(self, name: str, project: str) -> RequestFeatureView: + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> RequestFeatureView: """ Retrieves a request feature view. diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 1a72cbb4a5..fc7be75e0d 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -528,8 +528,12 @@ def list_feature_views( ) return proto_registry_utils.list_feature_views(registry_proto, project) - def get_request_feature_view(self, name: str, project: str): - registry_proto = self._get_registry_proto(project=project, allow_cache=False) + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ): + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) return proto_registry_utils.get_request_feature_view( registry_proto, name, project ) diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py new file mode 100644 index 0000000000..221715480e --- /dev/null +++ b/sdk/python/feast/registry_server.py @@ -0,0 +1,202 @@ +from concurrent import futures + +import grpc +from google.protobuf.empty_pb2 import Empty + +from feast import FeatureStore +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc + + +class RegistryServer(RegistryServer_pb2_grpc.RegistryServerServicer): + def __init__(self, store: FeatureStore) -> None: + super().__init__() + self.proxied_registry = store.registry + + def GetEntity(self, request: RegistryServer_pb2.GetEntityRequest, context): + return self.proxied_registry.get_entity( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListEntities(self, request, context): + return RegistryServer_pb2.ListEntitiesResponse( + entities=[ + entity.to_proto() + for entity in self.proxied_registry.list_entities( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetDataSource(self, request: RegistryServer_pb2.GetDataSourceRequest, context): + return self.proxied_registry.get_data_source( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListDataSources(self, request, context): + return RegistryServer_pb2.ListDataSourcesResponse( + data_sources=[ + data_source.to_proto() + for data_source in self.proxied_registry.list_data_sources( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetFeatureView( + self, request: RegistryServer_pb2.GetFeatureViewRequest, context + ): + return self.proxied_registry.get_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListFeatureViews(self, request, context): + return RegistryServer_pb2.ListFeatureViewsResponse( + feature_views=[ + feature_view.to_proto() + for feature_view in self.proxied_registry.list_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetRequestFeatureView( + self, request: RegistryServer_pb2.GetRequestFeatureViewRequest, context + ): + return self.proxied_registry.get_request_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListRequestFeatureViews(self, request, context): + return RegistryServer_pb2.ListRequestFeatureViewsResponse( + request_feature_views=[ + request_feature_view.to_proto() + for request_feature_view in self.proxied_registry.list_request_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetStreamFeatureView( + self, request: RegistryServer_pb2.GetStreamFeatureViewRequest, context + ): + return self.proxied_registry.get_stream_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListStreamFeatureViews(self, request, context): + return RegistryServer_pb2.ListStreamFeatureViewsResponse( + stream_feature_views=[ + stream_feature_view.to_proto() + for stream_feature_view in self.proxied_registry.list_stream_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetOnDemandFeatureView( + self, request: RegistryServer_pb2.GetOnDemandFeatureViewRequest, context + ): + return self.proxied_registry.get_on_demand_feature_view( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListOnDemandFeatureViews(self, request, context): + return RegistryServer_pb2.ListOnDemandFeatureViewsResponse( + on_demand_feature_views=[ + on_demand_feature_view.to_proto() + for on_demand_feature_view in self.proxied_registry.list_on_demand_feature_views( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetFeatureService( + self, request: RegistryServer_pb2.GetFeatureServiceRequest, context + ): + return self.proxied_registry.get_feature_service( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListFeatureServices( + self, request: RegistryServer_pb2.ListFeatureServicesRequest, context + ): + return RegistryServer_pb2.ListFeatureServicesResponse( + feature_services=[ + feature_service.to_proto() + for feature_service in self.proxied_registry.list_feature_services( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetSavedDataset( + self, request: RegistryServer_pb2.GetSavedDatasetRequest, context + ): + return self.proxied_registry.get_saved_dataset( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListSavedDatasets( + self, request: RegistryServer_pb2.ListSavedDatasetsRequest, context + ): + return RegistryServer_pb2.ListSavedDatasetsResponse( + saved_datasets=[ + saved_dataset.to_proto() + for saved_dataset in self.proxied_registry.list_saved_datasets( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetValidationReference( + self, request: RegistryServer_pb2.GetValidationReferenceRequest, context + ): + return self.proxied_registry.get_validation_reference( + name=request.name, project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def ListValidationReferences( + self, request: RegistryServer_pb2.ListValidationReferencesRequest, context + ): + return RegistryServer_pb2.ListValidationReferencesResponse( + validation_references=[ + validation_reference.to_proto() + for validation_reference in self.proxied_registry.list_validation_references( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def ListProjectMetadata( + self, request: RegistryServer_pb2.ListProjectMetadataRequest, context + ): + return RegistryServer_pb2.ListProjectMetadataResponse( + project_metadata=[ + project_metadata.to_proto() + for project_metadata in self.proxied_registry.list_project_metadata( + project=request.project, allow_cache=request.allow_cache + ) + ] + ) + + def GetInfra(self, request: RegistryServer_pb2.GetInfraRequest, context): + return self.proxied_registry.get_infra( + project=request.project, allow_cache=request.allow_cache + ).to_proto() + + def Refresh(self, request, context): + self.proxied_registry.refresh(request.project) + return Empty() + + def Proto(self, request, context): + return self.proxied_registry.proto() + + +def start_server(store: FeatureStore, port: int): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + RegistryServer_pb2_grpc.add_RegistryServerServicer_to_server( + RegistryServer(store), server + ) + server.add_insecure_port(f"[::]:{port}") + server.start() + server.wait_for_termination() diff --git a/sdk/python/tests/unit/test_registry_server.py b/sdk/python/tests/unit/test_registry_server.py new file mode 100644 index 0000000000..734bbfe19b --- /dev/null +++ b/sdk/python/tests/unit/test_registry_server.py @@ -0,0 +1,60 @@ +import assertpy +import grpc_testing +import pytest +from google.protobuf.empty_pb2 import Empty + +from feast import Entity, FeatureStore +from feast.protos.feast.registry import RegistryServer_pb2 +from feast.registry_server import RegistryServer + + +def call_registry_server(server, method: str, request=None): + service = RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"] + rpc = server.invoke_unary_unary( + service.methods_by_name[method], (), request if request else Empty(), None + ) + + return rpc.termination() + + +@pytest.fixture +def registry_server(environment): + store: FeatureStore = environment.feature_store + + servicer = RegistryServer(store=store) + + return grpc_testing.server_from_dictionary( + {RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"]: servicer}, + grpc_testing.strict_real_time(), + ) + + +def test_registry_server_get_entity(environment, registry_server): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.get_entity(entity.name) + + get_entity_request = RegistryServer_pb2.GetEntityRequest( + name=entity.name, project=store.project, allow_cache=False + ) + response, trailing_metadata, code, details = call_registry_server( + registry_server, "GetEntity", get_entity_request + ) + response_entity = Entity.from_proto(response) + + assertpy.assert_that(response_entity).is_equal_to(expected) + + +def test_registry_server_proto(environment, registry_server): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.registry.proto() + response, trailing_metadata, code, details = call_registry_server( + registry_server, "Proto" + ) + + assertpy.assert_that(response).is_equal_to(expected) diff --git a/setup.py b/setup.py index 4905a7697d..29b8dc5a68 100644 --- a/setup.py +++ b/setup.py @@ -234,7 +234,7 @@ else: use_scm_version = None -PROTO_SUBDIRS = ["core", "serving", "types", "storage"] +PROTO_SUBDIRS = ["core", "registry", "serving", "types", "storage"] PYTHON_CODE_PREFIX = "sdk/python"