diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 71ac6a1b10..9d1c360cb3 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -34,10 +34,6 @@ from feast.transformation.pandas_transformation import PandasTransformation from feast.transformation.python_transformation import PythonTransformation from feast.transformation.substrait_transformation import SubstraitTransformation -from feast.type_map import ( - feast_value_type_to_pandas_type, - python_type_to_feast_value_type, -) from feast.usage import log_exceptions from feast.value_type import ValueType @@ -490,69 +486,15 @@ def get_transformed_features( ) def infer_features(self) -> None: - if self.mode in {"pandas", "substrait"}: - self._infer_features_df() - elif self.mode == "python": - self._infer_features_dict() - else: - raise Exception( - f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' - ) - - def _infer_features_dict(self): - """ - Infers the set of features associated to this feature view from the input source. - - Raises: - RegistryInferenceFailure: The set of features could not be inferred. - """ - rand_dict_value: Dict[str, Any] = { - "float": [1.0], - "int": [1], - "str": ["hello world"], - "bytes": [str.encode("hello world")], - "bool": [True], - "datetime64[ns]": [datetime.utcnow()], - } - - feature_dict = {} - for feature_view_projection in self.source_feature_view_projections.values(): - for feature in feature_view_projection.features: - dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type()) - feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( - rand_dict_value[dtype] if dtype in rand_dict_value else [None] - ) - feature_dict[f"{feature.name}"] = ( - rand_dict_value[dtype] if dtype in rand_dict_value else [None] - ) - for request_data in self.source_request_sources.values(): - for field in request_data.schema: - dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) - feature_dict[f"{field.name}"] = ( - rand_dict_value[dtype] if dtype in rand_dict_value else [None] - ) - - output_dict: Dict[str, List[Any]] = self.feature_transformation.transform( - feature_dict + inferred_features = self.feature_transformation.infer_features( + self._construct_random_input() ) - inferred_features = [] - for f, dt in output_dict.items(): - inferred_features.append( - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type( - f, type_name=type(dt[0]).__name__ - ) - ), - ) - ) if self.features: missing_features = [] - for specified_features in self.features: - if specified_features not in inferred_features: - missing_features.append(specified_features) + for specified_feature in self.features: + if specified_feature not in inferred_features: + missing_features.append(specified_feature) if missing_features: raise SpecifiedFeaturesNotPresentError( missing_features, inferred_features, self.name @@ -566,66 +508,42 @@ def _infer_features_dict(self): f"Could not infer Features for the feature view '{self.name}'.", ) - def _infer_features_df(self) -> None: - """ - Infers the set of features associated to this feature view from the input source. - - Raises: - RegistryInferenceFailure: The set of features could not be inferred. - """ - rand_df_value: Dict[str, Any] = { - "float": 1.0, - "int": 1, - "str": "hello world", - "bytes": str.encode("hello world"), - "bool": True, - "datetime64[ns]": datetime.utcnow(), + def _construct_random_input(self) -> Dict[str, List[Any]]: + rand_dict_value: Dict[ValueType, List[Any]] = { + ValueType.BYTES: [str.encode("hello world")], + ValueType.STRING: ["hello world"], + ValueType.INT32: [1], + ValueType.INT64: [1], + ValueType.DOUBLE: [1.0], + ValueType.FLOAT: [1.0], + ValueType.BOOL: [True], + ValueType.UNIX_TIMESTAMP: [datetime.utcnow()], + ValueType.BYTES_LIST: [[str.encode("hello world")]], + ValueType.STRING_LIST: [["hello world"]], + ValueType.INT32_LIST: [[1]], + ValueType.INT64_LIST: [[1]], + ValueType.DOUBLE_LIST: [[1.0]], + ValueType.FLOAT_LIST: [[1.0]], + ValueType.BOOL_LIST: [[True]], + ValueType.UNIX_TIMESTAMP_LIST: [[datetime.utcnow()]], } - df = pd.DataFrame() + feature_dict = {} for feature_view_projection in self.source_feature_view_projections.values(): for feature in feature_view_projection.features: - dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type()) - df[f"{feature_view_projection.name}__{feature.name}"] = pd.Series( - dtype=dtype + feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( + rand_dict_value.get(feature.dtype.to_value_type(), [None]) + ) + feature_dict[f"{feature.name}"] = rand_dict_value.get( + feature.dtype.to_value_type(), [None] ) - sample_val = rand_df_value[dtype] if dtype in rand_df_value else None - df[f"{feature.name}"] = pd.Series(data=sample_val, dtype=dtype) for request_data in self.source_request_sources.values(): for field in request_data.schema: - dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) - sample_val = rand_df_value[dtype] if dtype in rand_df_value else None - df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) - - output_df: pd.DataFrame = self.feature_transformation.transform(df) - inferred_features = [] - for f, dt in zip(output_df.columns, output_df.dtypes): - inferred_features.append( - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type(f, type_name=str(dt)) - ), + feature_dict[f"{field.name}"] = rand_dict_value.get( + field.dtype.to_value_type(), [None] ) - ) - if self.features: - missing_features = [] - for specified_features in self.features: - if specified_features not in inferred_features: - missing_features.append(specified_features) - if missing_features: - raise SpecifiedFeaturesNotPresentError( - missing_features, inferred_features, self.name - ) - else: - self.features = inferred_features - - if not self.features: - raise RegistryInferenceFailure( - "OnDemandFeatureView", - f"Could not infer Features for the feature view '{self.name}'.", - ) + return feature_dict @staticmethod def get_requested_odfvs( @@ -682,59 +600,28 @@ def mainify(obj) -> None: def decorator(user_function): return_annotation = inspect.signature(user_function).return_annotation - if ( - return_annotation - and return_annotation.__module__ == "ibis.expr.types.relations" - and return_annotation.__name__ == "Table" - ): - import ibis - import ibis.expr.datatypes as dt - from ibis_substrait.compiler.core import SubstraitCompiler - - compiler = SubstraitCompiler() - - input_fields: Field = [] - - for s in sources: - if isinstance(s, FeatureView): - fields = s.projection.features - else: - fields = s.features - - input_fields.extend( - [ - ( - f.name, - dt.dtype( - feast_value_type_to_pandas_type(f.dtype.to_value_type()) - ), - ) - for f in fields - ] + udf_string = dill.source.getsource(user_function) + mainify(user_function) + if mode == "pandas": + if return_annotation not in (inspect._empty, pd.DataFrame): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame" ) + transformation = PandasTransformation(user_function, udf_string) + elif mode == "python": + if return_annotation not in (inspect._empty, Dict[str, Any]): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]" + ) + transformation = PythonTransformation(user_function, udf_string) + elif mode == "substrait": + from ibis.expr.types.relations import Table - expr = user_function(ibis.table(input_fields, "t")) - - transformation = SubstraitTransformation( - substrait_plan=compiler.compile(expr).SerializeToString() - ) - else: - udf_string = dill.source.getsource(user_function) - mainify(user_function) - if mode == "pandas": - if return_annotation not in (inspect._empty, pd.DataFrame): - raise TypeError( - f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame" - ) - transformation = PandasTransformation(user_function, udf_string) - elif mode == "python": - if return_annotation not in (inspect._empty, Dict[str, Any]): - raise TypeError( - f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]" - ) - transformation = PythonTransformation(user_function, udf_string) - elif mode == "substrait": - pass + if return_annotation not in (inspect._empty, Table): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table" + ) + transformation = SubstraitTransformation.from_ibis(user_function, sources) on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 1838a882f2..d48055c694 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -1,11 +1,16 @@ from types import FunctionType +from typing import Any, Dict, List import dill import pandas as pd +from feast.field import Field, from_value_type from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProto, ) +from feast.type_map import ( + python_type_to_feast_value_type, +) class PandasTransformation: @@ -33,6 +38,21 @@ def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: ) return output_df + def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]: + df = pd.DataFrame.from_dict(random_input) + + output_df: pd.DataFrame = self.transform(df) + + return [ + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type(f, type_name=str(dt)) + ), + ) + for f, dt in zip(output_df.columns, output_df.dtypes) + ] + def __eq__(self, other): if not isinstance(other, PandasTransformation): raise TypeError( diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 9519f23c05..9f5fac6675 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -1,11 +1,15 @@ from types import FunctionType -from typing import Dict +from typing import Any, Dict, List import dill +from feast.field import Field, from_value_type from feast.protos.feast.core.Transformation_pb2 import ( UserDefinedFunctionV2 as UserDefinedFunctionProto, ) +from feast.type_map import ( + python_type_to_feast_value_type, +) class PythonTransformation: @@ -33,6 +37,19 @@ def transform(self, input_dict: Dict) -> Dict: ) return {**input_dict, **output_dict} + def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]: + output_dict: Dict[str, List[Any]] = self.transform(random_input) + + return [ + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type(f, type_name=type(dt[0]).__name__) + ), + ) + for f, dt in output_dict.items() + ] + def __eq__(self, other): if not isinstance(other, PythonTransformation): raise TypeError( diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index b3dbe7a4b4..48b708ac70 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -1,10 +1,18 @@ +from typing import Any, Dict, List + import pandas as pd import pyarrow import pyarrow.substrait as substrait # type: ignore # noqa +from feast.feature_view import FeatureView +from feast.field import Field, from_value_type from feast.protos.feast.core.Transformation_pb2 import ( SubstraitTransformationV2 as SubstraitTransformationProto, ) +from feast.type_map import ( + feast_value_type_to_pandas_type, + python_type_to_feast_value_type, +) class SubstraitTransformation: @@ -26,6 +34,20 @@ def table_provider(names, schema: pyarrow.Schema): ).read_all() return table.to_pandas() + def infer_features(self, random_input: Dict[str, List[Any]]) -> List[Field]: + df = pd.DataFrame.from_dict(random_input) + output_df: pd.DataFrame = self.transform(df) + + return [ + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type(f, type_name=str(dt)) + ), + ) + for f, dt in zip(output_df.columns, output_df.dtypes) + ] + def __eq__(self, other): if not isinstance(other, SubstraitTransformation): raise TypeError( @@ -48,3 +70,34 @@ def from_proto( return SubstraitTransformation( substrait_plan=substrait_transformation_proto.substrait_plan ) + + @classmethod + def from_ibis(cls, user_function, sources): + import ibis + import ibis.expr.datatypes as dt + from ibis_substrait.compiler.core import SubstraitCompiler + + compiler = SubstraitCompiler() + + input_fields = [] + + for s in sources: + fields = s.projection.features if isinstance(s, FeatureView) else s.features + + input_fields.extend( + [ + ( + f.name, + dt.dtype( + feast_value_type_to_pandas_type(f.dtype.to_value_type()) + ), + ) + for f in fields + ] + ) + + expr = user_function(ibis.table(input_fields, "t")) + + return SubstraitTransformation( + substrait_plan=compiler.compile(expr).SerializeToString() + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 55d2ed8425..48f6e27b8a 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -127,7 +127,7 @@ def create_similarity_request_source(): return RequestSource( name="similarity_input", schema=[ - Field(name="vector_doube", dtype=Array(Float64)), + Field(name="vector_double", dtype=Array(Float64)), Field(name="vector_float", dtype=Array(Float32)), ], )