Skip to content

Commit

Permalink
Fix feast apply bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze committed Aug 1, 2021
1 parent 651d066 commit c2e0ec1
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 26 deletions.
10 changes: 6 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

from feast import utils
from feast.entity import Entity
from feast.errors import FeatureNameCollisionError, FeatureViewNotFoundException
from feast.errors import (
EntityNotFoundException,
FeatureNameCollisionError,
FeatureViewNotFoundException,
)
from feast.feature_service import FeatureService
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -654,9 +658,7 @@ def get_online_features(
try:
join_key = entity_name_to_join_key_map[entity_name]
except KeyError:
raise Exception(
f"Entity {entity_name} does not exist in project {self.project}"
)
raise EntityNotFoundException(entity_name, self.project)
join_key_row[join_key] = entity_value
join_key_rows.append(join_key_row)

Expand Down
28 changes: 19 additions & 9 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ def __init__(self, registry_path: str, repo_path: Path, cache_ttl: timedelta):
self.cached_registry_proto_ttl = cache_ttl

def _initialize_registry(self):
"""Explicitly initializes the registry with an empty proto."""
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
self._registry_store.update_registry_proto(registry_proto)
"""Explicitly initializes the registry with an empty proto if it doesn't exist."""
try:
self._get_registry_proto()
except FileNotFoundError:
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
self._registry_store.update_registry_proto(registry_proto)

def apply_entity(self, entity: Entity, project: str, commit: bool = True):
"""
Expand Down Expand Up @@ -409,22 +412,29 @@ def get_feature_view(self, name: str, project: str) -> FeatureView:
return FeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)

def delete_feature_service(self, name: str, project: str):
def delete_feature_service(self, name: str, project: str, commit: bool = True):
"""
Deletes a feature service or raises an exception if not found.
Args:
name: Name of feature service
project: Feast project that this feature service belongs to
commit: Whether the change should be persisted immediately
"""
registry_proto = self._get_registry_proto()
for idx, feature_service_proto in enumerate(registry_proto.feature_services):
self._prepare_registry_for_changes()
assert self.cached_registry_proto

for idx, feature_service_proto in enumerate(
self.cached_registry_proto.feature_services
):
if (
feature_service_proto.spec.name == name
and feature_service_proto.spec.project == project
):
del registry_proto.feature_services[idx]
return feature_service_proto
del self.cached_registry_proto.feature_services[idx]
if commit:
self.commit()
return
raise FeatureServiceNotFoundException(name, project)

def delete_feature_table(self, name: str, project: str, commit: bool = True):
Expand Down
36 changes: 23 additions & 13 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,24 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
return res


def apply_feature_services(registry: Registry, project: str, repo: ParsedRepo):
def apply_feature_services(
registry: Registry,
project: str,
repo: ParsedRepo,
existing_feature_services: List[FeatureService],
):
from colorama import Fore, Style

# Determine which feature services should be deleted.
existing_feature_services = registry.list_feature_services(project)
for feature_service in repo.feature_services:
if feature_service in existing_feature_services:
existing_feature_services.remove(feature_service)

# The remaining features services in the list should be deleted.
for feature_service_to_delete in existing_feature_services:
registry.delete_feature_service(feature_service_to_delete.name, project)
registry.delete_feature_service(
feature_service_to_delete.name, project, commit=False
)
click.echo(
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} "
f"from registry"
Expand Down Expand Up @@ -192,6 +198,16 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
if registry_view.name not in repo_table_names:
views_to_delete.append(registry_view)

entities_to_delete: List[Entity] = []
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)

entities_to_keep: List[Entity] = repo.entities

existing_feature_services = registry.list_feature_services(project)

sys.dont_write_bytecode = False
for entity in repo.entities:
registry.apply_entity(entity, project=project, commit=False)
Expand Down Expand Up @@ -228,9 +244,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
click.echo(
f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}"
)
registry.commit()

apply_feature_services(registry, project, repo)
apply_feature_services(registry, project, repo, existing_feature_services)

infra_provider = get_provider(repo_config, repo_path)

Expand All @@ -242,14 +257,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
all_to_keep.extend(repo.feature_tables)
all_to_keep.extend(repo.feature_views)

entities_to_delete: List[Entity] = []
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)

entities_to_keep: List[Entity] = repo.entities

for name in [view.name for view in repo.feature_tables] + [
table.name for table in repo.feature_views
]:
Expand All @@ -272,6 +279,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
partial=False,
)

# Commit the update to the registry only after successful infra update
registry.commit()


@log_exceptions_and_usage
def teardown(repo_config: RepoConfig, repo_path: Path):
Expand Down

0 comments on commit c2e0ec1

Please sign in to comment.