Skip to content

Commit

Permalink
fix: Enable registry caching in SQL Registry (#3395)
Browse files Browse the repository at this point in the history
* fix: Enable registry caching in SQL Registry

Signed-off-by: Danny Chiao <[email protected]>

* docs

Signed-off-by: Danny Chiao <[email protected]>

* fix regular file registry docs too

Signed-off-by: Danny Chiao <[email protected]>

* fix new file lint

Signed-off-by: Danny Chiao <[email protected]>

* fix new file lint

Signed-off-by: Danny Chiao <[email protected]>

Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia authored Dec 15, 2022
1 parent 6bcf77c commit 2e57376
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 128 deletions.
20 changes: 18 additions & 2 deletions docs/getting-started/concepts/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ a remote file registry, you need to create a GCS / S3 bucket that Feast can unde
```yaml
project: feast_demo_aws
provider: aws
registry: s3://[YOUR BUCKET YOU CREATED]/registry.pb
registry:
path: s3://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: file
Expand All @@ -27,7 +29,9 @@ offline_store:
```yaml
project: feast_demo_gcp
provider: gcp
registry: gs://[YOUR BUCKET YOU CREATED]/registry.pb
registry:
path: gs://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: file
Expand All @@ -43,6 +47,18 @@ multiple feature views or time ranges concurrently).
#### SQL Registry
Alternatively, a [SQL Registry](../../tutorials/using-scalable-registry.md) can be used for a more scalable registry.
The configuration roughly looks like:
```yaml
project: <your project name>
provider: <provider name>
online_store: redis
offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60
```
This supports any SQLAlchemy compatible database as a backend. The exact schema can be seen in [sql.py](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/registry/sql.py)
### Updating the registry
Expand Down
1 change: 1 addition & 0 deletions docs/tutorials/using-scalable-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60
```
Specifically, the registry_type needs to be set to sql in the registry config block. On doing so, the path should refer to the [Database URL](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls) for the database to be used, as expected by SQLAlchemy. No other additional commands are currently needed to configure this registry.
Expand Down
208 changes: 208 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from typing import List

from feast.data_source import DataSource
from feast.entity import Entity
from feast.errors import (
DataSourceObjectNotFoundException,
EntityNotFoundException,
FeatureServiceNotFoundException,
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
SavedDatasetNotFound,
ValidationReferenceNotFound,
)
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView


def get_feature_service(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureService:
for feature_service_proto in registry_proto.feature_services:
if (
feature_service_proto.spec.project == project
and feature_service_proto.spec.name == name
):
return FeatureService.from_proto(feature_service_proto)
raise FeatureServiceNotFoundException(name, project=project)


def get_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureView:
for feature_view_proto in registry_proto.feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return FeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_stream_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> StreamFeatureView:
for feature_view_proto in registry_proto.stream_feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return StreamFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_request_feature_view(registry_proto: RegistryProto, name: str, project: str):
for feature_view_proto in registry_proto.feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return RequestFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)


def get_on_demand_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> OnDemandFeatureView:
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if (
on_demand_feature_view.spec.project == project
and on_demand_feature_view.spec.name == name
):
return OnDemandFeatureView.from_proto(on_demand_feature_view)
raise OnDemandFeatureViewNotFoundException(name, project=project)


def get_data_source(
registry_proto: RegistryProto, name: str, project: str
) -> DataSource:
for data_source in registry_proto.data_sources:
if data_source.project == project and data_source.name == name:
return DataSource.from_proto(data_source)
raise DataSourceObjectNotFoundException(name, project=project)


def get_entity(registry_proto: RegistryProto, name: str, project: str) -> Entity:
for entity_proto in registry_proto.entities:
if entity_proto.spec.name == name and entity_proto.spec.project == project:
return Entity.from_proto(entity_proto)
raise EntityNotFoundException(name, project=project)


def get_saved_dataset(
registry_proto: RegistryProto, name: str, project: str
) -> SavedDataset:
for saved_dataset in registry_proto.saved_datasets:
if saved_dataset.spec.name == name and saved_dataset.spec.project == project:
return SavedDataset.from_proto(saved_dataset)
raise SavedDatasetNotFound(name, project=project)


def get_validation_reference(
registry_proto: RegistryProto, name: str, project: str
) -> ValidationReference:
for validation_reference in registry_proto.validation_references:
if (
validation_reference.name == name
and validation_reference.project == project
):
return ValidationReference.from_proto(validation_reference)
raise ValidationReferenceNotFound(name, project=project)


def list_feature_services(
registry_proto: RegistryProto, project: str, allow_cache: bool = False
) -> List[FeatureService]:
feature_services = []
for feature_service_proto in registry_proto.feature_services:
if feature_service_proto.spec.project == project:
feature_services.append(FeatureService.from_proto(feature_service_proto))
return feature_services


def list_feature_views(
registry_proto: RegistryProto, project: str
) -> List[FeatureView]:
feature_views: List[FeatureView] = []
for feature_view_proto in registry_proto.feature_views:
if feature_view_proto.spec.project == project:
feature_views.append(FeatureView.from_proto(feature_view_proto))
return feature_views


def list_request_feature_views(
registry_proto: RegistryProto, project: str
) -> List[RequestFeatureView]:
feature_views: List[RequestFeatureView] = []
for request_feature_view_proto in registry_proto.request_feature_views:
if request_feature_view_proto.spec.project == project:
feature_views.append(
RequestFeatureView.from_proto(request_feature_view_proto)
)
return feature_views


def list_stream_feature_views(
registry_proto: RegistryProto, project: str
) -> List[StreamFeatureView]:
stream_feature_views = []
for stream_feature_view in registry_proto.stream_feature_views:
if stream_feature_view.spec.project == project:
stream_feature_views.append(
StreamFeatureView.from_proto(stream_feature_view)
)
return stream_feature_views


def list_on_demand_feature_views(
registry_proto: RegistryProto, project: str
) -> List[OnDemandFeatureView]:
on_demand_feature_views = []
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if on_demand_feature_view.spec.project == project:
on_demand_feature_views.append(
OnDemandFeatureView.from_proto(on_demand_feature_view)
)
return on_demand_feature_views


def list_entities(registry_proto: RegistryProto, project: str) -> List[Entity]:
entities = []
for entity_proto in registry_proto.entities:
if entity_proto.spec.project == project:
entities.append(Entity.from_proto(entity_proto))
return entities


def list_data_sources(registry_proto: RegistryProto, project: str) -> List[DataSource]:
data_sources = []
for data_source_proto in registry_proto.data_sources:
if data_source_proto.project == project:
data_sources.append(DataSource.from_proto(data_source_proto))
return data_sources


def list_saved_datasets(
registry_proto: RegistryProto, project: str, allow_cache: bool = False
) -> List[SavedDataset]:
return [
SavedDataset.from_proto(saved_dataset)
for saved_dataset in registry_proto.saved_datasets
if saved_dataset.spec.project == project
]


def list_project_metadata(
registry_proto: RegistryProto, project: str
) -> List[ProjectMetadata]:
return [
ProjectMetadata.from_proto(project_metadata)
for project_metadata in registry_proto.project_metadata
if project_metadata.project == project
]
Loading

0 comments on commit 2e57376

Please sign in to comment.