Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing initial on demand transforms for historical retrieval to_df #1824

Merged
merged 2 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, List, Optional, Union

import numpy as np
import pandas
import pandas as pd
import pyarrow
from pydantic import StrictStr
from pydantic.typing import Literal
Expand All @@ -19,6 +19,7 @@
from feast.feature_view import FeatureView
from feast.infra.offline_stores import offline_utils
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

Expand Down Expand Up @@ -87,14 +88,21 @@ def pull_latest_from_table_or_query(
WHERE _feast_row = 1
"""

return BigQueryRetrievalJob(query=query, client=client, config=config)
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return BigQueryRetrievalJob(
query=query,
client=client,
config=config,
full_feature_names=False,
on_demand_feature_views=None,
)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
Expand Down Expand Up @@ -140,16 +148,41 @@ def get_historical_features(
full_feature_names=full_feature_names,
)

return BigQueryRetrievalJob(query=query, client=client, config=config)
return BigQueryRetrievalJob(
query=query,
client=client,
config=config,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project, allow_cache=True
),
)


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client, config):
def __init__(
self,
query: str,
client: bigquery.Client,
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
):
self.query = query
self.client = client
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views

@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df(self):
def to_df_internal(self) -> pd.DataFrame:
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df
Expand Down Expand Up @@ -266,7 +299,7 @@ def _get_table_reference_for_new_entity(


def _upload_entity_df_and_get_entity_schema(
client: Client, table_name: str, entity_df: Union[pandas.DataFrame, str],
client: Client, table_name: str, entity_df: Union[pd.DataFrame, str],
) -> Dict[str, np.dtype]:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""

Expand All @@ -278,7 +311,7 @@ def _upload_entity_df_and_get_entity_schema(
client.query(f"SELECT * FROM {table_name} LIMIT 1").result().to_dataframe()
)
entity_schema = dict(zip(limited_entity_df.columns, limited_entity_df.dtypes))
elif isinstance(entity_df, pandas.DataFrame):
elif isinstance(entity_df, pd.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)

Expand Down
36 changes: 31 additions & 5 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytz
from pydantic.typing import Literal

from feast import FileSource
from feast import FileSource, OnDemandFeatureView
from feast.data_source import DataSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
Expand All @@ -30,13 +30,28 @@ class FileOfflineStoreConfig(FeastConfigBaseModel):


class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
def __init__(
self,
evaluation_function: Callable,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
):
"""Initialize a lazy historical retrieval job"""

# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views

def to_df(self):
@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return df
Expand Down Expand Up @@ -224,7 +239,13 @@ def evaluate_historical_retrieval():

return entity_df_with_features

job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval)
job = FileRetrievalJob(
evaluation_function=evaluate_historical_retrieval,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project, allow_cache=True
),
)
return job

@staticmethod
Expand Down Expand Up @@ -284,4 +305,9 @@ def evaluate_offline_job():
)
return last_values_df[columns_to_extract]

return FileRetrievalJob(evaluation_function=evaluate_offline_job)
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return FileRetrievalJob(
evaluation_function=evaluate_offline_job,
full_feature_names=False,
on_demand_feature_views=None,
)
24 changes: 24 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,42 @@

from feast.data_source import DataSource
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.repo_config import RepoConfig


class RetrievalJob(ABC):
"""RetrievalJob is used to manage the execution of a historical feature retrieval"""

@property
@abstractmethod
def full_feature_names(self) -> bool:
pass

@property
@abstractmethod
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
pass
Comment on lines +31 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the API for the base class here and adding these fields? Are they necessary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I see why they're needed now. While I think this is okay for a first pass I want to see if we can get away from adding these additional fields on to the job. I think they are pretty inelegant for a user to have to implement themselves.


def to_df(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
features_df = self.to_df_internal()
if self.on_demand_feature_views is None:
return features_df

for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(self.full_feature_names, features_df)
)
return features_df

@abstractmethod
def to_df_internal(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
pass

# TODO(adchia): implement ODFV for to_arrow method
@abstractmethod
def to_arrow(self) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def build_point_in_time_query(
entity_df_event_timestamp_col: str,
query_template: str,
full_feature_names: bool = False,
):
) -> str:
"""Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift"""
template = Environment(loader=BaseLoader()).from_string(source=query_template)

Expand Down
32 changes: 29 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pydantic import StrictStr
from pydantic.typing import Literal

from feast import RedshiftSource
from feast import OnDemandFeatureView, RedshiftSource
from feast.data_source import DataSource
from feast.errors import InvalidEntityType
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -90,11 +90,14 @@ def pull_latest_from_table_or_query(
)
WHERE _feast_row = 1
"""
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return RedshiftRetrievalJob(
query=query,
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
full_feature_names=False,
on_demand_feature_views=None,
)

@staticmethod
Expand Down Expand Up @@ -164,6 +167,10 @@ def query_generator() -> Iterator[str]:
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project=project, allow_cache=True
),
drop_columns=["entity_timestamp"]
+ [
f"{feature_view.name}__entity_row_unique_id"
Expand All @@ -179,6 +186,8 @@ def __init__(
redshift_client,
s3_resource,
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
drop_columns: Optional[List[str]] = None,
):
"""Initialize RedshiftRetrievalJob object.
Expand All @@ -188,6 +197,8 @@ def __init__(
redshift_client: boto3 redshift-data client
s3_resource: boto3 s3 resource object
config: Feast repo config
full_feature_names: Whether to add the feature view prefixes to the feature names
on_demand_feature_views: A list of on demand transforms to apply at retrieval time
drop_columns: Optionally a list of columns to drop before unloading to S3.
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
"""
Expand All @@ -209,9 +220,19 @@ def query_generator() -> Iterator[str]:
+ "/unload/"
+ str(uuid.uuid4())
)
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views
self._drop_columns = drop_columns

def to_df(self) -> pd.DataFrame:
@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
Expand Down Expand Up @@ -304,7 +325,12 @@ def _upload_entity_df_and_get_entity_schema(
f"CREATE TABLE {table_name} AS ({entity_df})",
)
limited_entity_df = RedshiftRetrievalJob(
f"SELECT * FROM {table_name} LIMIT 1", redshift_client, s3_resource, config
f"SELECT * FROM {table_name} LIMIT 1",
redshift_client,
s3_resource,
config,
full_feature_names=False,
on_demand_feature_views=None,
).to_df()
return dict(zip(limited_entity_df.columns, limited_entity_df.dtypes))
else:
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def _get_requested_feature_views_to_features_dict(
feature_views_to_feature_map: Dict[FeatureView, List[str]] = {}

for ref in feature_refs:
if ":" not in ref:
# ODFV
continue
ref_parts = ref.split(":")
feature_view_from_ref = ref_parts[0]
feature_from_ref = ref_parts[1]
Expand Down