Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce an OnlineStore interface #1628

Merged
merged 13 commits into from
Jun 11, 2021
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
super().__init__(
f"Online Store '{online_store_name}' does not support data source '{data_source_name}'"
)


class FeastEntityDFMissingColumnsError(Exception):
def __init__(self, expected, missing):
super().__init__(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def get_online_features(
table, union_of_entity_keys, entity_name_to_join_key_map
)
read_rows = provider.online_read(
project=self.project,
config=self.config,
table=table,
entity_keys=entity_keys,
requested_features=requested_features,
Expand Down
192 changes: 20 additions & 172 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import itertools
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
import pandas
from tqdm import tqdm

from feast import FeatureTable, utils
from feast import FeatureTable
from feast.entity import Entity
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
Expand All @@ -23,42 +19,17 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig

try:
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import datastore
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("gcp", str(e))
from feast.repo_config import RepoConfig


class GcpProvider(Provider):
_gcp_project_id: Optional[str]
_namespace: Optional[str]

def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DatastoreOnlineStoreConfig)
self._gcp_project_id = config.online_store.project_id
self._namespace = config.online_store.namespace
self._write_concurrency = config.online_store.write_concurrency
self._write_batch_size = config.online_store.write_batch_size

assert config.offline_store is not None
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)

def _initialize_client(self):
try:
return datastore.Client(
project=self._gcp_project_id, namespace=self._namespace
)
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
str(e)
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
"local Google Cloud account "
)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
Expand All @@ -69,85 +40,43 @@ def update_infra(
entities_to_keep: Sequence[Entity],
partial: bool,
):

client = self._initialize_client()

for table in tables_to_keep:
key = client.key("Project", project, "Table", table.name)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
client.put(entity)

for table in tables_to_delete:
_delete_all_values(
client, client.key("Project", project, "Table", table.name)
)

# Delete the table metadata datastore entity
key = client.key("Project", project, "Table", table.name)
client.delete(key)
self.online_store.setup(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
client = self._initialize_client()

for table in tables:
_delete_all_values(
client, client.key("Project", project, "Table", table.name)
)

# Delete the table metadata datastore entity
key = client.key("Project", project, "Table", table.name)
client.delete(key)
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
client = self._initialize_client()

pool = ThreadPool(processes=self._write_concurrency)
pool.map(
lambda b: _write_minibatch(client, project, table, b, progress),
_to_minibatches(data, batch_size=self._write_batch_size),
)
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
client = self._initialize_client()
result = self.online_store.online_read(config, table, entity_keys)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
key = client.key(
"Project", project, "Table", table.name, "Row", document_id
)
value = client.get(key)
if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin)
res[feature_name] = val
result.append((value["event_ts"], res))
else:
result.append((None, None))
return result

def materialize_single_feature_view(
Expand Down Expand Up @@ -188,7 +117,7 @@ def materialize_single_feature_view(

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
Expand All @@ -209,84 +138,3 @@ def get_historical_features(
project=project,
)
return job


ProtoBatch = Sequence[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
]


def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]:
"""
Split data into minibatches, making sure we stay under GCP datastore transaction size
limits.
"""
iterable = iter(data)

while True:
batch = list(itertools.islice(iterable, batch_size))
if len(batch) > 0:
yield batch
else:
break


def _write_minibatch(
client,
project: str,
table: Union[FeatureTable, FeatureView],
data: Sequence[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
):
entities = []
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)

key = client.key("Project", project, "Table", table.name, "Row", document_id,)

entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)

entity.update(
dict(
key=entity_key.SerializeToString(),
values={k: v.SerializeToString() for k, v in features.items()},
event_ts=utils.make_tzaware(timestamp),
created_ts=(
utils.make_tzaware(created_ts) if created_ts is not None else None
),
)
)
entities.append(entity)
with client.transaction():
client.put_multi(entities)

if progress:
progress(len(entities))


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
"""
while True:
query = client.query(kind="Row", ancestor=key)
entities = list(query.fetch(limit=1000))
if not entities:
return

for entity in entities:
client.delete(entity.key)


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
Loading