diff --git a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py index 9cb1997016..a11d53a87e 100644 --- a/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py +++ b/cookbook/case_studies/feature_engineering/feast_integration/custom_provider/provider.py @@ -20,30 +20,6 @@ class FlyteCustomProvider(LocalProvider): def __init__(self, config: RepoConfig, repo_path): super().__init__(config) - def materialize_single_feature_view( - self, - config: RepoConfig, - feature_view: FeatureView, - start_date: datetime, - end_date: datetime, - registry: Registry, - project: str, - tqdm_builder: Callable[[int], tqdm], - ) -> None: - # materialize_single_feature_view loads the latest feature values for a specific feature value from the offline - # store into the online store. - # This method can be overridden to also launch custom batch ingestion jobs that loads the latest batch feature - # values into the online store. - # - - self._localize_feature_view(feature_view) - - # Replace the line below with your custom logic in order to launch your own batch ingestion job - super().materialize_single_feature_view( - config, feature_view, start_date, end_date, registry, project, tqdm_builder - ) - print("Launching custom batch jobs is pretty easy...") - def get_historical_features( self, config: RepoConfig, @@ -57,8 +33,21 @@ def get_historical_features( # get_historical_features returns a training dataframe from the offline store # We substitute the remote s3 file with a reference to a local file in each feature view being requested - for fv in feature_views: - self._localize_feature_view(fv) + for feature_view in feature_views: + if not isinstance(feature_view.batch_source, FileSource): + continue + # Copy parquet file to a local file + file_source: FileSource = feature_view.batch_source + random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path) + FlyteContext.current_context().file_access.get_data( + file_source.path, + random_local_path, + is_multipart=True, + ) + feature_view.batch_source=FileSource( + path=random_local_path, + event_timestamp_column=file_source.event_timestamp_column, + ) return super().get_historical_features( config, @@ -74,17 +63,3 @@ def _localize_feature_view(self, feature_view: FeatureView): """ This function ensures that the `FeatureView` object points to files in the local disk """ - if not isinstance(feature_view.batch_source, FileSource): - return - # Copy parquet file to a local file - file_source: FileSource = feature_view.batch_source - random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path) - FlyteContext.current_context().file_access.get_data( - file_source.path, - random_local_path, - is_multipart=True, - ) - feature_view.batch_source=FileSource( - path=random_local_path, - event_timestamp_column=file_source.event_timestamp_column, - )