Skip to content

Commit

Permalink
fix: Schema update (#2509)
Browse files Browse the repository at this point in the history
* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* indent fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix go

Signed-off-by: Kevin Zhang <[email protected]>

* Fix tests

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Revert

Signed-off-by: Kevin Zhang <[email protected]>

* fixes

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix and clean up

Signed-off-by: Kevin Zhang <[email protected]>

* Lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Patch fix, will change definitions in separate pr

Signed-off-by: Kevin Zhang <[email protected]>
  • Loading branch information
kevjumba authored Apr 13, 2022
1 parent 11a342f commit cf7bbc2
Show file tree
Hide file tree
Showing 18 changed files with 197 additions and 57 deletions.
8 changes: 4 additions & 4 deletions docs/getting-started/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ from feast import Field, Float64, RequestSource
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={
"val_to_add": ValueType.INT64,
"val_to_add_2": ValueType.INT64
}
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2": dtype=PrimitiveFeastType.INT64),
]
)

# Use the input data and feature view features to create new features
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to-guides/adding-a-new-offline-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ Finally, the custom data source class can be use in the feature repo to define a
```python
pdriver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
Expand Down
11 changes: 9 additions & 2 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from feast.request_feature_view import RequestFeatureView
from feast.types import Float32, Float64, Int64, String
from google.protobuf.duration_pb2 import Duration
from feast.field import Field

from feast import Entity, Feature, FeatureView, FileSource, ValueType

Expand Down Expand Up @@ -33,7 +34,10 @@
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)

# Define an on demand feature view which can generate new features based on
Expand All @@ -59,6 +63,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
driver_age_request_fv = RequestFeatureView(
name="driver_age",
request_data_source=RequestSource(
name="driver_age", schema={"driver_age": ValueType.INT64,}
name="driver_age",
schema=[
Field(name="driver_age", dtype=Int64),
],
),
)
Binary file not shown.
4 changes: 2 additions & 2 deletions go/internal/feast/model/ondemandfeatureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (fs *OnDemandFeatureView) ProjectWithFeatures(featureNames []string) (*OnDe
func (fs *OnDemandFeatureView) GetRequestDataSchema() map[string]types.ValueType_Enum {
schema := make(map[string]types.ValueType_Enum)
for _, requestDataSource := range fs.SourceRequestDataSources {
for fieldName, fieldValueType := range requestDataSource.Schema {
schema[fieldName] = fieldValueType
for _, featureSpec := range requestDataSource.Schema {
schema[featureSpec.Name] = featureSpec.ValueType
}
}
return schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from feast.types import Float32, Float64, Int64
from feast.value_type import ValueType
from google.protobuf.duration_pb2 import Duration

from feast import FileSource

file_path = "driver_stats.parquet"
Expand Down
6 changes: 5 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ option java_package = "feast.proto.core";

import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 28
Expand Down Expand Up @@ -212,7 +213,10 @@ message DataSource {
message RequestDataOptions {
reserved 1;
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 2;
map<string, feast.types.ValueType.Enum> deprecated_schema = 2;

repeated FeatureSpecV2 schema = 3;

}

// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/types/Field.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/types";

message Field {
string name = 1;
feast.types.Value value = 2;
feast.types.ValueType.Enum value = 2;
}
113 changes: 93 additions & 20 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
import enum
import warnings
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, Optional, Tuple
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from google.protobuf.json_format import MessageToJson

from feast import type_map
from feast.data_format import StreamFormat
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
from feast.value_type import ValueType


Expand Down Expand Up @@ -449,27 +451,45 @@ class RequestSource(DataSource):
Args:
name: Name of the request data source
schema: Schema mapping from the input feature name to a ValueType
schema Union[Dict[str, ValueType], List[Field]]: Schema mapping from the input feature name to a ValueType
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the request data source, typically the email of the primary
maintainer.
"""

name: str
schema: Dict[str, ValueType]
schema: List[Field]

def __init__(
self,
name: str,
schema: Dict[str, ValueType],
schema: Union[Dict[str, ValueType], List[Field]],
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
):
"""Creates a RequestSource object."""
super().__init__(name=name, description=description, tags=tags, owner=owner)
self.schema = schema
if isinstance(schema, Dict):
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
schemaList = []
for key, valueType in schema.items():
schemaList.append(
Field(name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[valueType])
)
self.schema = schemaList
elif isinstance(schema, List):
self.schema = schema
else:
raise Exception(
"Schema type must be either dictionary or list, not "
+ str(type(schema))
)

def validate(self, config: RepoConfig):
pass
Expand All @@ -479,33 +499,86 @@ def get_table_column_names_and_types(
) -> Iterable[Tuple[str, str]]:
pass

def __eq__(self, other):
if not isinstance(other, RequestSource):
raise TypeError(
"Comparisons should only involve RequestSource class objects."
)
if (
self.name != other.name
or self.description != other.description
or self.owner != other.owner
or self.tags != other.tags
):
return False
if isinstance(self.schema, List) and isinstance(other.schema, List):
for field1, field2 in zip(self.schema, other.schema):
if field1 != field2:
return False
return True
else:
return False

def __hash__(self):
return super().__hash__()

@staticmethod
def from_proto(data_source: DataSourceProto):

deprecated_schema = data_source.request_data_options.deprecated_schema
schema_pb = data_source.request_data_options.schema
schema = {}
for key, val in schema_pb.items():
schema[key] = ValueType(val)
return RequestSource(
name=data_source.name,
schema=schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

if deprecated_schema and not schema_pb:
warnings.warn(
"Schema in RequestSource is changing type. The schema data type Dict[str, ValueType] is being deprecated in Feast 0.23. "
"Please use List[Field] instead for the schema",
DeprecationWarning,
)
dict_schema = {}
for key, val in deprecated_schema.items():
dict_schema[key] = ValueType(val)
return RequestSource(
name=data_source.name,
schema=dict_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)
else:
list_schema = []
for field_proto in schema_pb:
list_schema.append(Field.from_proto(field_proto))

return RequestSource(
name=data_source.name,
schema=list_schema,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(schema=schema_pb)

schema_pb = []

if isinstance(self.schema, Dict):
for key, value in self.schema.items():
schema_pb.append(
Field(
name=key, dtype=VALUE_TYPES_TO_FEAST_TYPES[value.value]
).to_proto()
)
else:
for field in self.schema:
schema_pb.append(field.to_proto())
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.REQUEST_SOURCE,
request_data_options=options,
description=self.description,
tags=self.tags,
owner=self.owner,
)
data_source_proto.request_data_options.schema.extend(schema_pb)

return data_source_proto

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def from_proto(cls, field_proto: FieldProto):
field_proto: FieldProto protobuf object
"""
value_type = ValueType(field_proto.value_type)
return cls(name=field_proto.name, dtype=from_value_type(value_type))
return cls(name=field_proto.name, dtype=from_value_type(value_type=value_type))

@classmethod
def from_feature(cls, feature: Feature):
Expand Down
18 changes: 14 additions & 4 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,17 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
for request_source in self.source_request_sources.values():
schema.update(request_source.schema)
if isinstance(request_source.schema, List):
new_schema = {}
for field in request_source.schema:
new_schema[field.name] = field.dtype.to_value_type()
schema.update(new_schema)
elif isinstance(request_source.schema, Dict):
schema.update(request_source.schema)
else:
raise Exception(
f"Request source schema is not correct type: ${str(type(request_source.schema))}"
)
return schema

def get_transformed_features_df(
Expand Down Expand Up @@ -409,9 +419,9 @@ def infer_features(self):
)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in self.source_request_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
for field in request_data.schema:
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
df[f"{field.name}"] = pd.Series(dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
Expand Down
15 changes: 10 additions & 5 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from feast.base_feature_view import BaseFeatureView
from feast.data_source import RequestSource
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.field import Field
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
Expand Down Expand Up @@ -63,12 +63,17 @@ def __init__(
DeprecationWarning,
)

if isinstance(request_data_source.schema, Dict):
new_features = [
Field(name=name, dtype=dtype)
for name, dtype in request_data_source.schema.items()
]
else:
new_features = request_data_source.schema

super().__init__(
name=name,
features=[
Field(name=name, dtype=from_value_type(value_type))
for name, value_type in request_data_source.schema.items()
],
features=new_features,
description=description,
tags=tags,
owner=owner,
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ def to_value_type(self) -> ValueType:
def __str__(self):
return PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]

def __eq__(self, other):
if isinstance(other, PrimitiveFeastType):
return self.value == other.value
else:
return False

def __hash__(self):
return hash((PRIMITIVE_FEAST_TYPES_TO_STRING[self.name]))


Invalid = PrimitiveFeastType.INVALID
Bytes = PrimitiveFeastType.BYTES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def similarity_feature_view(


def create_conv_rate_request_source():
return RequestSource(name="conv_rate_input", schema={"val_to_add": ValueType.INT32})
return RequestSource(
name="conv_rate_input", schema=[Field(name="val_to_add", dtype=Int32)],
)


def create_similarity_request_source():
Expand Down
Loading

0 comments on commit cf7bbc2

Please sign in to comment.