Skip to content

Commit

Permalink
feat: Refactor ODFV schema inference (#4076)
Browse files Browse the repository at this point in the history
* refactor odfv scheam inference

Signed-off-by: tokoko <[email protected]>

* bugfix odfv schema inference

Signed-off-by: tokoko <[email protected]>

* remove print statement

Signed-off-by: tokoko <[email protected]>

---------

Signed-off-by: tokoko <[email protected]>
  • Loading branch information
tokoko authored Apr 4, 2024
1 parent f3a9c64 commit c50a9ff
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 167 deletions.
217 changes: 52 additions & 165 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
from feast.transformation.pandas_transformation import PandasTransformation
from feast.transformation.python_transformation import PythonTransformation
from feast.transformation.substrait_transformation import SubstraitTransformation
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
)
from feast.usage import log_exceptions
from feast.value_type import ValueType

Expand Down Expand Up @@ -490,69 +486,15 @@ def get_transformed_features(
)

def infer_features(self) -> None:
if self.mode in {"pandas", "substrait"}:
self._infer_features_df()
elif self.mode == "python":
self._infer_features_dict()
else:
raise Exception(
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
)

def _infer_features_dict(self):
"""
Infers the set of features associated to this feature view from the input source.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
rand_dict_value: Dict[str, Any] = {
"float": [1.0],
"int": [1],
"str": ["hello world"],
"bytes": [str.encode("hello world")],
"bool": [True],
"datetime64[ns]": [datetime.utcnow()],
}

feature_dict = {}
for feature_view_projection in self.source_feature_view_projections.values():
for feature in feature_view_projection.features:
dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type())
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
)
feature_dict[f"{feature.name}"] = (
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
)
for request_data in self.source_request_sources.values():
for field in request_data.schema:
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
feature_dict[f"{field.name}"] = (
rand_dict_value[dtype] if dtype in rand_dict_value else [None]
)

output_dict: Dict[str, List[Any]] = self.feature_transformation.transform(
feature_dict
inferred_features = self.feature_transformation.infer_features(
self._construct_random_input()
)
inferred_features = []
for f, dt in output_dict.items():
inferred_features.append(
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(
f, type_name=type(dt[0]).__name__
)
),
)
)

if self.features:
missing_features = []
for specified_features in self.features:
if specified_features not in inferred_features:
missing_features.append(specified_features)
for specified_feature in self.features:
if specified_feature not in inferred_features:
missing_features.append(specified_feature)
if missing_features:
raise SpecifiedFeaturesNotPresentError(
missing_features, inferred_features, self.name
Expand All @@ -566,66 +508,42 @@ def _infer_features_dict(self):
f"Could not infer Features for the feature view '{self.name}'.",
)

def _infer_features_df(self) -> None:
"""
Infers the set of features associated to this feature view from the input source.
Raises:
RegistryInferenceFailure: The set of features could not be inferred.
"""
rand_df_value: Dict[str, Any] = {
"float": 1.0,
"int": 1,
"str": "hello world",
"bytes": str.encode("hello world"),
"bool": True,
"datetime64[ns]": datetime.utcnow(),
def _construct_random_input(self) -> Dict[str, List[Any]]:
rand_dict_value: Dict[ValueType, List[Any]] = {
ValueType.BYTES: [str.encode("hello world")],
ValueType.STRING: ["hello world"],
ValueType.INT32: [1],
ValueType.INT64: [1],
ValueType.DOUBLE: [1.0],
ValueType.FLOAT: [1.0],
ValueType.BOOL: [True],
ValueType.UNIX_TIMESTAMP: [datetime.utcnow()],
ValueType.BYTES_LIST: [[str.encode("hello world")]],
ValueType.STRING_LIST: [["hello world"]],
ValueType.INT32_LIST: [[1]],
ValueType.INT64_LIST: [[1]],
ValueType.DOUBLE_LIST: [[1.0]],
ValueType.FLOAT_LIST: [[1.0]],
ValueType.BOOL_LIST: [[True]],
ValueType.UNIX_TIMESTAMP_LIST: [[datetime.utcnow()]],
}

df = pd.DataFrame()
feature_dict = {}
for feature_view_projection in self.source_feature_view_projections.values():
for feature in feature_view_projection.features:
dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type())
df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series(
dtype=dtype
feature_dict[f"{feature_view_projection.name}__{feature.name}"] = (
rand_dict_value.get(feature.dtype.to_value_type(), [None])
)
feature_dict[f"{feature.name}"] = rand_dict_value.get(
feature.dtype.to_value_type(), [None]
)
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{feature.name}"] = pd.Series(data=sample_val, dtype=dtype)
for request_data in self.source_request_sources.values():
for field in request_data.schema:
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)

output_df: pd.DataFrame = self.feature_transformation.transform(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=str(dt))
),
feature_dict[f"{field.name}"] = rand_dict_value.get(
field.dtype.to_value_type(), [None]
)
)

if self.features:
missing_features = []
for specified_features in self.features:
if specified_features not in inferred_features:
missing_features.append(specified_features)
if missing_features:
raise SpecifiedFeaturesNotPresentError(
missing_features, inferred_features, self.name
)
else:
self.features = inferred_features

if not self.features:
raise RegistryInferenceFailure(
"OnDemandFeatureView",
f"Could not infer Features for the feature view '{self.name}'.",
)
return feature_dict

@staticmethod
def get_requested_odfvs(
Expand Down Expand Up @@ -682,59 +600,28 @@ def mainify(obj) -> None:

def decorator(user_function):
return_annotation = inspect.signature(user_function).return_annotation
if (
return_annotation
and return_annotation.__module__ == "ibis.expr.types.relations"
and return_annotation.__name__ == "Table"
):
import ibis
import ibis.expr.datatypes as dt
from ibis_substrait.compiler.core import SubstraitCompiler

compiler = SubstraitCompiler()

input_fields: Field = []

for s in sources:
if isinstance(s, FeatureView):
fields = s.projection.features
else:
fields = s.features

input_fields.extend(
[
(
f.name,
dt.dtype(
feast_value_type_to_pandas_type(f.dtype.to_value_type())
),
)
for f in fields
]
udf_string = dill.source.getsource(user_function)
mainify(user_function)
if mode == "pandas":
if return_annotation not in (inspect._empty, pd.DataFrame):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
)
transformation = PandasTransformation(user_function, udf_string)
elif mode == "python":
if return_annotation not in (inspect._empty, Dict[str, Any]):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
)
transformation = PythonTransformation(user_function, udf_string)
elif mode == "substrait":
from ibis.expr.types.relations import Table

expr = user_function(ibis.table(input_fields, "t"))

transformation = SubstraitTransformation(
substrait_plan=compiler.compile(expr).SerializeToString()
)
else:
udf_string = dill.source.getsource(user_function)
mainify(user_function)
if mode == "pandas":
if return_annotation not in (inspect._empty, pd.DataFrame):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
)
transformation = PandasTransformation(user_function, udf_string)
elif mode == "python":
if return_annotation not in (inspect._empty, Dict[str, Any]):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]"
)
transformation = PythonTransformation(user_function, udf_string)
elif mode == "substrait":
pass
if return_annotation not in (inspect._empty, Table):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table"
)
transformation = SubstraitTransformation.from_ibis(user_function, sources)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from types import FunctionType
from typing import Any, Dict, List

import dill
import pandas as pd

from feast.field import Field, from_value_type
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.type_map import (
python_type_to_feast_value_type,
)


class PandasTransformation:
Expand Down Expand Up @@ -33,6 +38,21 @@ def transform(self, input_df: pd.DataFrame) -> pd.DataFrame:
)
return output_df

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
df = pd.DataFrame.from_dict(random_input)

output_df: pd.DataFrame = self.transform(df)

return [
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=str(dt))
),
)
for f, dt in zip(output_df.columns, output_df.dtypes)
]

def __eq__(self, other):
if not isinstance(other, PandasTransformation):
raise TypeError(
Expand Down
19 changes: 18 additions & 1 deletion sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from types import FunctionType
from typing import Dict
from typing import Any, Dict, List

import dill

from feast.field import Field, from_value_type
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.type_map import (
python_type_to_feast_value_type,
)


class PythonTransformation:
Expand Down Expand Up @@ -33,6 +37,19 @@ def transform(self, input_dict: Dict) -> Dict:
)
return {**input_dict, **output_dict}

def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]:
output_dict: Dict[str, List[Any]] = self.transform(random_input)

return [
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=type(dt[0]).__name__)
),
)
for f, dt in output_dict.items()
]

def __eq__(self, other):
if not isinstance(other, PythonTransformation):
raise TypeError(
Expand Down
Loading

0 comments on commit c50a9ff

Please sign in to comment.