From 3ef331df81fd9db318f6a7ecd9930e06a761aeb0 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 17 Sep 2021 11:49:11 -0700 Subject: [PATCH] Refactor providers to remove duplicate implementations (#1876) * Refactor providers to remove duplicate implementations Signed-off-by: Achal Shah * Refactor Signed-off-by: Achal Shah * refactor Signed-off-by: Achal Shah * refactor Signed-off-by: Achal Shah * fix imports Signed-off-by: Achal Shah * Dynamic import for passthru Signed-off-by: Achal Shah * Dynamic import for passthru Signed-off-by: Achal Shah * remove init files Signed-off-by: Achal Shah --- sdk/python/feast/infra/aws.py | 145 +---------------- sdk/python/feast/infra/gcp.py | 147 +---------------- sdk/python/feast/infra/local.py | 142 +---------------- .../feast/infra/passthrough_provider.py | 149 ++++++++++++++++++ sdk/python/feast/infra/provider.py | 14 +- 5 files changed, 174 insertions(+), 423 deletions(-) create mode 100644 sdk/python/feast/infra/passthrough_provider.py diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index f93553bf9f..0f4f2e0738 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -3,152 +3,21 @@ from datetime import datetime from pathlib import Path from tempfile import TemporaryFile -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from urllib.parse import urlparse -import pandas -from tqdm import tqdm - -from feast import FeatureTable -from feast.entity import Entity from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist -from feast.feature_view import FeatureView -from feast.infra.offline_stores.offline_utils import get_offline_store_from_config -from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import ( - Provider, - RetrievalJob, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) +from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig, RepoConfig - - -class AwsProvider(Provider): - def __init__(self, config: RepoConfig): - self.repo_config = config - self.offline_store = get_offline_store_from_config(config.offline_store) - self.online_store = get_online_store_from_config(config.online_store) - - def update_infra( - self, - project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, - ): - self.online_store.update( - config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial, - ) - - def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], - ) -> None: - self.online_store.teardown(self.repo_config, tables, entities) - - def online_write_batch( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - ) -> None: - self.online_store.online_write_batch(config, table, data, progress) - - def online_read( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - result = self.online_store.online_read(config, table, entity_keys) +from feast.repo_config import RegistryConfig - return result - def materialize_single_feature_view( - self, - config: RepoConfig, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) +class AwsProvider(PassthroughProvider): + """ + This class only exists for backwards compatibility. + """ - ( - join_key_columns, - feature_name_columns, - event_timestamp_column, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - - table = offline_job.to_arrow() - - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - - join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) - ) - - def get_historical_features( - self, - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], - registry: Registry, - project: str, - full_feature_names: bool, - ) -> RetrievalJob: - job = self.offline_store.get_historical_features( - config=config, - feature_views=feature_views, - feature_refs=feature_refs, - entity_df=entity_df, - registry=registry, - project=project, - full_feature_names=full_feature_names, - ) - return job + pass class S3RegistryStore(RegistryStore): diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index c57450c876..1dd1eefe2d 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -2,153 +2,20 @@ from datetime import datetime from pathlib import Path from tempfile import TemporaryFile -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union from urllib.parse import urlparse -import pandas -from tqdm import tqdm - -from feast import FeatureTable -from feast.entity import Entity -from feast.feature_view import FeatureView -from feast.infra.offline_stores.offline_utils import get_offline_store_from_config -from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import ( - Provider, - RetrievalJob, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) +from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig, RepoConfig - - -class GcpProvider(Provider): - _gcp_project_id: Optional[str] - _namespace: Optional[str] - - def __init__(self, config: RepoConfig): - self.repo_config = config - self.offline_store = get_offline_store_from_config(config.offline_store) - self.online_store = get_online_store_from_config(config.online_store) - - def update_infra( - self, - project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, - ): - self.online_store.update( - config=self.repo_config, - tables_to_delete=tables_to_delete, - tables_to_keep=tables_to_keep, - entities_to_keep=entities_to_keep, - entities_to_delete=entities_to_delete, - partial=partial, - ) - - def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], - ) -> None: - self.online_store.teardown(self.repo_config, tables, entities) - - def online_write_batch( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - ) -> None: - self.online_store.online_write_batch(config, table, data, progress) - - def online_read( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - result = self.online_store.online_read(config, table, entity_keys) +from feast.repo_config import RegistryConfig - return result - def materialize_single_feature_view( - self, - config: RepoConfig, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) +class GcpProvider(PassthroughProvider): + """ + This class only exists for backwards compatibility. + """ - ( - join_key_columns, - feature_name_columns, - event_timestamp_column, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - config=config, - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - ) - table = offline_job.to_arrow() - - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - - join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) - ) - - def get_historical_features( - self, - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], - registry: Registry, - project: str, - full_feature_names: bool, - ) -> RetrievalJob: - job = self.offline_store.get_historical_features( - config=config, - feature_views=feature_views, - feature_refs=feature_refs, - entity_df=entity_df, - registry=registry, - project=project, - full_feature_names=full_feature_names, - ) - return job + pass class GCSRegistryStore(RegistryStore): diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 7304e262b1..6ac3ce7259 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,150 +1,24 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Union -import pandas as pd import pytz -from tqdm import tqdm from feast import FeatureTable -from feast.entity import Entity from feast.feature_view import FeatureView -from feast.infra.offline_stores.offline_utils import get_offline_store_from_config -from feast.infra.online_stores.helpers import get_online_store_from_config -from feast.infra.provider import ( - Provider, - RetrievalJob, - _convert_arrow_to_proto, - _get_column_names, - _run_field_mapping, -) +from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.registry import Registry from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig, RepoConfig +from feast.repo_config import RegistryConfig -class LocalProvider(Provider): - def __init__(self, config: RepoConfig): - assert config is not None - self.config = config - self.offline_store = get_offline_store_from_config(config.offline_store) - self.online_store = get_online_store_from_config(config.online_store) +class LocalProvider(PassthroughProvider): + """ + This class only exists for backwards compatibility. + """ - def update_infra( - self, - project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, - ): - self.online_store.update( - self.config, - tables_to_delete, - tables_to_keep, - entities_to_delete, - entities_to_keep, - partial, - ) - - def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], - ) -> None: - self.online_store.teardown(self.config, tables, entities) - - def online_write_batch( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], - ) -> None: - self.online_store.online_write_batch(config, table, data, progress) - - def online_read( - self, - config: RepoConfig, - table: Union[FeatureTable, FeatureView], - entity_keys: List[EntityKeyProto], - requested_features: List[str] = None, - ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - result = self.online_store.online_read(config, table, entity_keys) - - return result - - def materialize_single_feature_view( - self, - config: RepoConfig, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - entities = [] - for entity_name in feature_view.entities: - entities.append(registry.get_entity(entity_name, project)) - - ( - join_key_columns, - feature_name_columns, - event_timestamp_column, - created_timestamp_column, - ) = _get_column_names(feature_view, entities) - - offline_job = self.offline_store.pull_latest_from_table_or_query( - data_source=feature_view.batch_source, - join_key_columns=join_key_columns, - feature_name_columns=feature_name_columns, - event_timestamp_column=event_timestamp_column, - created_timestamp_column=created_timestamp_column, - start_date=start_date, - end_date=end_date, - config=config, - ) - table = offline_job.to_arrow() - - if feature_view.batch_source.field_mapping is not None: - table = _run_field_mapping(table, feature_view.batch_source.field_mapping) - - join_keys = [entity.join_key for entity in entities] - rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) - - with tqdm_builder(len(rows_to_write)) as pbar: - self.online_write_batch( - self.config, feature_view, rows_to_write, lambda x: pbar.update(x) - ) - - def get_historical_features( - self, - config: RepoConfig, - feature_views: List[FeatureView], - feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], - registry: Registry, - project: str, - full_feature_names: bool, - ) -> RetrievalJob: - return self.offline_store.get_historical_features( - config=config, - feature_views=feature_views, - feature_refs=feature_refs, - entity_df=entity_df, - registry=registry, - project=project, - full_feature_names=full_feature_names, - ) + pass def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py new file mode 100644 index 0000000000..3e4c0a3485 --- /dev/null +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -0,0 +1,149 @@ +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union + +import pandas +from tqdm import tqdm + +from feast.entity import Entity +from feast.feature_table import FeatureTable +from feast.feature_view import FeatureView +from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.infra.offline_stores.offline_utils import get_offline_store_from_config +from feast.infra.online_stores.helpers import get_online_store_from_config +from feast.infra.provider import ( + Provider, + _convert_arrow_to_proto, + _get_column_names, + _run_field_mapping, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.registry import Registry +from feast.repo_config import RepoConfig + + +class PassthroughProvider(Provider): + """ + The Passthrough provider delegates all operations to the underlying online and offline stores. + """ + + def __init__(self, config: RepoConfig): + super().__init__(config) + + self.repo_config = config + self.offline_store = get_offline_store_from_config(config.offline_store) + self.online_store = get_online_store_from_config(config.online_store) + + def update_infra( + self, + project: str, + tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], + tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + self.online_store.update( + config=self.repo_config, + tables_to_delete=tables_to_delete, + tables_to_keep=tables_to_keep, + entities_to_keep=entities_to_keep, + entities_to_delete=entities_to_delete, + partial=partial, + ) + + def teardown_infra( + self, + project: str, + tables: Sequence[Union[FeatureTable, FeatureView]], + entities: Sequence[Entity], + ) -> None: + self.online_store.teardown(self.repo_config, tables, entities) + + def online_write_batch( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + self.online_store.online_write_batch(config, table, data, progress) + + def online_read( + self, + config: RepoConfig, + table: Union[FeatureTable, FeatureView], + entity_keys: List[EntityKeyProto], + requested_features: List[str] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + result = self.online_store.online_read(config, table, entity_keys) + + return result + + def materialize_single_feature_view( + self, + config: RepoConfig, + feature_view: FeatureView, + start_date: datetime, + end_date: datetime, + registry: Registry, + project: str, + tqdm_builder: Callable[[int], tqdm], + ) -> None: + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + event_timestamp_column, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + event_timestamp_column=event_timestamp_column, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + table = offline_job.to_arrow() + + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + + join_keys = [entity.join_key for entity in entities] + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + + with tqdm_builder(len(rows_to_write)) as pbar: + self.online_write_batch( + self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) + ) + + def get_historical_features( + self, + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pandas.DataFrame, str], + registry: Registry, + project: str, + full_feature_names: bool, + ) -> RetrievalJob: + job = self.offline_store.get_historical_features( + config=config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=registry, + project=project, + full_feature_names=full_feature_names, + ) + return job diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 54c2ee94fb..6147f19b9a 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -146,18 +146,10 @@ def online_read( def get_provider(config: RepoConfig, repo_path: Path) -> Provider: if "." not in config.provider: - if config.provider == "gcp": - from feast.infra.gcp import GcpProvider + if config.provider in {"gcp", "aws", "local"}: + from feast.infra.passthrough_provider import PassthroughProvider - return GcpProvider(config) - elif config.provider == "aws": - from feast.infra.aws import AwsProvider - - return AwsProvider(config) - elif config.provider == "local": - from feast.infra.local import LocalProvider - - return LocalProvider(config) + return PassthroughProvider(config) else: raise errors.FeastProviderNotImplementedError(config.provider) else: