Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Update on demand feature view api #2587

Merged
10 changes: 5 additions & 5 deletions docs/tutorials/validating-historical-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")
import pyarrow.parquet
import pandas as pd

from feast import FeatureView, Entity, FeatureStore, Field
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
Expand All @@ -134,7 +134,7 @@ taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])


```python
trips_stats_fv = FeatureView(
trips_stats_fv = BatchFeatureView(
name='trip_stats',
entities=['taxi'],
features=[
Expand All @@ -160,9 +160,9 @@ trips_stats_fv = FeatureView(
Field("avg_trip_seconds", Float64),
Field("earned_per_hour", Float64),
],
sources={
"stats": trips_stats_fv
}
sources=[
trips_stats_fv,
]
)
def on_demand_stats(inp):
out = pd.DataFrame()
Expand Down
12 changes: 6 additions & 6 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
from google.protobuf.duration_pb2 import Duration
from feast.field import Field

from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast import Entity, Feature, BatchFeatureView, FileSource, ValueType

driver_hourly_stats = FileSource(
path="data/driver_stats_with_string.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
driver_hourly_stats_view = FeatureView(
driver_hourly_stats_view = BatchFeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400000),
Expand Down Expand Up @@ -43,10 +43,10 @@
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
inputs=[
driver_hourly_stats_view,
input_request,
],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource

from .batch_feature_view import BatchFeatureView
from .data_source import (
KafkaSource,
KinesisSource,
Expand All @@ -23,6 +24,7 @@
from .on_demand_feature_view import OnDemandFeatureView
from .repo_config import RepoConfig
from .request_feature_view import RequestFeatureView
from .stream_feature_view import StreamFeatureView
from .value_type import ValueType

logging.basicConfig(
Expand All @@ -38,6 +40,7 @@
pass

__all__ = [
"BatchFeatureView",
"Entity",
"KafkaSource",
"KinesisSource",
Expand All @@ -49,6 +52,7 @@
"OnDemandFeatureView",
"RepoConfig",
"SourceType",
"StreamFeatureView",
"ValueType",
"BigQuerySource",
"FileSource",
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
else feature_view_proto.spec.ttl.ToTimedelta()
),
source=batch_source,
stream_source=stream_source,
)
if stream_source:
feature_view.stream_source = stream_source

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
Expand Down
150 changes: 110 additions & 40 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd

from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import RequestSource
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
from feast.feature import Feature
Expand All @@ -25,6 +26,7 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.stream_feature_view import StreamFeatureView
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand Down Expand Up @@ -67,13 +69,20 @@ class OnDemandFeatureView(BaseFeatureView):
owner: str

@log_exceptions
def __init__(
def __init__( # noqa: C901
achals marked this conversation as resolved.
Show resolved Hide resolved
self,
*args,
name: Optional[str] = None,
features: Optional[List[Feature]] = None,
sources: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
List[
Union[
BatchFeatureView,
StreamFeatureView,
RequestSource,
FeatureViewProjection,
]
]
] = None,
udf: Optional[MethodType] = None,
inputs: Optional[
Expand All @@ -92,11 +101,11 @@ def __init__(
features (deprecated): The list of features in the output of the on demand
feature view, after the transformation has been applied.
sources (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
which may be feature views, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
udf (optional): The user defined transformation function, which must take pandas
dataframes as inputs.
inputs (optional): A map from input source names to the actual input sources,
inputs (optional): (Deprecated) A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
schema (optional): The list of features in the output of the on demand feature
Expand All @@ -123,8 +132,7 @@ def __init__(
),
DeprecationWarning,
)

_sources = sources or inputs
_sources = sources or []
if inputs and sources:
raise ValueError("At most one of `sources` or `inputs` can be specified.")
elif inputs:
Expand All @@ -135,7 +143,17 @@ def __init__(
),
DeprecationWarning,
)

for _, source in inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
_udf = udf

if args:
Expand Down Expand Up @@ -169,7 +187,18 @@ def __init__(
DeprecationWarning,
)
if len(args) >= 3:
_sources = args[2]
_inputs = args[2]
for _, source in _inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
Expand All @@ -195,18 +224,17 @@ def __init__(
tags=tags,
owner=owner,
)

assert _sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
for source_name, odfv_source in _sources.items():
for odfv_source in _sources:
if isinstance(odfv_source, RequestSource):
self.source_request_sources[source_name] = odfv_source
self.source_request_sources[odfv_source.name] = odfv_source
elif isinstance(odfv_source, FeatureViewProjection):
self.source_feature_view_projections[source_name] = odfv_source
self.source_feature_view_projections[odfv_source.name] = odfv_source
else:
self.source_feature_view_projections[
source_name
odfv_source.name
] = odfv_source.projection

if _udf is None:
Expand All @@ -219,12 +247,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]:
return OnDemandFeatureViewProto

def __copy__(self):

fv = OnDemandFeatureView(
name=self.name,
schema=self.features,
sources=dict(
**self.source_feature_view_projections, **self.source_request_sources,
),
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
description=self.description,
tags=self.tags,
Expand Down Expand Up @@ -302,22 +330,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
Returns:
A OnDemandFeatureView object based on the on-demand feature view protobuf.
"""
sources = {}
for (
source_name,
on_demand_source,
) in on_demand_feature_view_proto.spec.sources.items():
sources = []
for (_, on_demand_source,) in on_demand_feature_view_proto.spec.sources.items():
if on_demand_source.WhichOneof("source") == "feature_view":
sources[source_name] = FeatureView.from_proto(
on_demand_source.feature_view
).projection
sources.append(
FeatureView.from_proto(on_demand_source.feature_view).projection
)
elif on_demand_source.WhichOneof("source") == "feature_view_projection":
sources[source_name] = FeatureViewProjection.from_proto(
on_demand_source.feature_view_projection
sources.append(
FeatureViewProjection.from_proto(
on_demand_source.feature_view_projection
)
)
else:
sources[source_name] = RequestSource.from_proto(
on_demand_source.request_data_source
sources.append(
RequestSource.from_proto(on_demand_source.request_data_source)
)
on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
Expand Down Expand Up @@ -476,7 +503,16 @@ def get_requested_odfvs(feature_refs, project, registry):
def on_demand_feature_view(
*args,
features: Optional[List[Feature]] = None,
sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
sources: Optional[
List[
Union[
BatchFeatureView,
StreamFeatureView,
RequestSource,
FeatureViewProjection,
]
]
] = None,
inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None,
schema: Optional[List[Field]] = None,
description: str = "",
Expand All @@ -490,7 +526,7 @@ def on_demand_feature_view(
features (deprecated): The list of features in the output of the on demand
feature view, after the transformation has been applied.
sources (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
which may be feature views, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
inputs (optional): A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
Expand All @@ -517,8 +553,7 @@ def on_demand_feature_view(
),
DeprecationWarning,
)

_sources = sources or inputs
_sources = sources or []
if inputs and sources:
raise ValueError("At most one of `sources` or `inputs` can be specified.")
elif inputs:
Expand All @@ -529,6 +564,17 @@ def on_demand_feature_view(
),
DeprecationWarning,
)
for _, source in inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)

if args:
warnings.warn(
Expand Down Expand Up @@ -559,14 +605,25 @@ def on_demand_feature_view(
DeprecationWarning,
)
if len(args) >= 2:
_sources = args[1]
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
"Feast 0.21 and onwards will not support the `inputs` parameter."
),
DeprecationWarning,
)
_inputs = args[1]
for _, source in _inputs.items():
if isinstance(source, FeatureView):
_sources.append(feature_view_to_batch_feature_view(source))
elif isinstance(source, RequestSource) or isinstance(
source, FeatureViewProjection
):
_sources.append(source)
else:
raise ValueError(
"input can only accept FeatureView, FeatureViewProjection, or RequestSource"
)
warnings.warn(
(
"The `inputs` parameter is being deprecated. Please use `sources` instead. "
"Feast 0.21 and onwards will not support the `inputs` parameter."
),
DeprecationWarning,
)

if not _sources:
raise ValueError("The `sources` parameter must be specified.")
Expand All @@ -587,3 +644,16 @@ def decorator(user_function):
return on_demand_feature_view_obj

return decorator


def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
return BatchFeatureView(
name=fv.name,
entities=fv.entities,
ttl=fv.ttl,
tags=fv.tags,
online=fv.online,
owner=fv.owner,
schema=fv.schema,
source=fv.source,
)
Loading