Skip to content

Commit

Permalink
Remove unnecessary override in custom provider
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Sep 16, 2021
1 parent 116903d commit 2808ba0
Showing 1 changed file with 15 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,6 @@ class FlyteCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
super().__init__(config)

def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
# materialize_single_feature_view loads the latest feature values for a specific feature value from the offline
# store into the online store.
# This method can be overridden to also launch custom batch ingestion jobs that loads the latest batch feature
# values into the online store.
#

self._localize_feature_view(feature_view)

# Replace the line below with your custom logic in order to launch your own batch ingestion job
super().materialize_single_feature_view(
config, feature_view, start_date, end_date, registry, project, tqdm_builder
)
print("Launching custom batch jobs is pretty easy...")

def get_historical_features(
self,
config: RepoConfig,
Expand All @@ -57,8 +33,21 @@ def get_historical_features(
# get_historical_features returns a training dataframe from the offline store

# We substitute the remote s3 file with a reference to a local file in each feature view being requested
for fv in feature_views:
self._localize_feature_view(fv)
for feature_view in feature_views:
if not isinstance(feature_view.batch_source, FileSource):
continue
# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
FlyteContext.current_context().file_access.get_data(
file_source.path,
random_local_path,
is_multipart=True,
)
feature_view.batch_source=FileSource(
path=random_local_path,
event_timestamp_column=file_source.event_timestamp_column,
)

return super().get_historical_features(
config,
Expand All @@ -74,17 +63,3 @@ def _localize_feature_view(self, feature_view: FeatureView):
"""
This function ensures that the `FeatureView` object points to files in the local disk
"""
if not isinstance(feature_view.batch_source, FileSource):
return
# Copy parquet file to a local file
file_source: FileSource = feature_view.batch_source
random_local_path = FlyteContext.current_context().file_access.get_random_local_path(file_source.path)
FlyteContext.current_context().file_access.get_data(
file_source.path,
random_local_path,
is_multipart=True,
)
feature_view.batch_source=FileSource(
path=random_local_path,
event_timestamp_column=file_source.event_timestamp_column,
)

0 comments on commit 2808ba0

Please sign in to comment.