Skip to content

Commit

Permalink
Fix OnDemandFeatureView type inference for array types
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Mirrington <[email protected]>
  • Loading branch information
alexmirrington committed Jul 7, 2024
1 parent de5b0eb commit 0ff73fb
Show file tree
Hide file tree
Showing 6 changed files with 555 additions and 30 deletions.
28 changes: 20 additions & 8 deletions sdk/python/feast/transformation/pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,27 @@ def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)

return [
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=str(dt))
),
fields = []
for feature_name, feature_type in zip(output_df.columns, output_df.dtypes):
feature_value = output_df[feature_name].tolist()
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
fields.append(
Field(
name=feature_name,
dtype=from_value_type(
python_type_to_feast_value_type(
feature_name,
value=feature_value[0],
type_name=str(feature_type),
)
),
)
)
for f, dt in zip(output_df.columns, output_df.dtypes)
]
return fields

def __eq__(self, other):
if not isinstance(other, PandasTransformation):
Expand Down
27 changes: 19 additions & 8 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,26 @@ def transform(self, input_dict: dict) -> dict:
def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
output_dict: dict[str, list[Any]] = self.transform(random_input)

return [
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=type(dt[0]).__name__)
),
fields = []
for feature_name, feature_value in output_dict.items():
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
fields.append(
Field(
name=feature_name,
dtype=from_value_type(
python_type_to_feast_value_type(
feature_name,
value=feature_value[0],
type_name=type(feature_value[0]).__name__,
)
),
)
)
for f, dt in output_dict.items()
]
return fields

def __eq__(self, other):
if not isinstance(other, PythonTransformation):
Expand Down
32 changes: 22 additions & 10 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,28 @@ def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]:
df = pd.DataFrame.from_dict(random_input)
output_df: pd.DataFrame = self.transform(df)

return [
Field(
name=f,
dtype=from_value_type(
python_type_to_feast_value_type(f, type_name=str(dt))
),
)
for f, dt in zip(output_df.columns, output_df.dtypes)
if f not in random_input
]
fields = []
for feature_name, feature_type in zip(output_df.columns, output_df.dtypes):
feature_value = output_df[feature_name].tolist()
if len(feature_value) <= 0:
raise TypeError(
f"Failed to infer type for feature '{feature_name}' with value "
+ f"'{feature_value}' since no items were returned by the UDF."
)
if feature_name not in random_input:
fields.append(
Field(
name=feature_name,
dtype=from_value_type(
python_type_to_feast_value_type(
feature_name,
value=feature_value[0],
type_name=str(feature_type),
)
),
)
)
return fields

def __eq__(self, other):
if not isinstance(other, SubstraitTransformation):
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def python_type_to_feast_value_type(
"uint16": ValueType.INT32,
"uint8": ValueType.INT32,
"int8": ValueType.INT32,
"bool_": ValueType.BOOL, # np.bool_
"bool": ValueType.BOOL,
"boolean": ValueType.BOOL,
"timedelta": ValueType.UNIX_TIMESTAMP,
Expand Down
254 changes: 252 additions & 2 deletions sdk/python/tests/unit/test_on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,31 @@
import os
import re
import tempfile
from datetime import datetime, timedelta

import pandas as pd
import pytest

from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig
from feast import (
Entity,
FeatureStore,
FeatureView,
FileSource,
RepoConfig,
RequestSource,
)
from feast.driver_test_data import create_driver_hourly_stats_df
from feast.field import Field
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
from feast.types import (
Array,
Bool,
Float32,
Float64,
Int64,
String,
)


def test_pandas_transformation():
Expand Down Expand Up @@ -91,3 +107,237 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
assert online_response["conv_rate_plus_acc"].equals(
online_response["conv_rate"] + online_response["acc_rate"]
)


def test_pandas_transformation_returning_all_data_types():
with tempfile.TemporaryDirectory() as data_dir:
store = FeatureStore(
config=RepoConfig(
project="test_on_demand_python_transformation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=2,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),
)
)

# Generate test data.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)

driver = Entity(name="driver", join_keys=["driver_id"])

driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=driver_stats_path,
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=0),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
)

request_source = RequestSource(
name="request_source",
schema=[
Field(name="avg_daily_trip_rank_thresholds", dtype=Array(Int64)),
Field(name="avg_daily_trip_rank_names", dtype=Array(String)),
],
)

@on_demand_feature_view(
sources=[request_source, driver_stats_fv],
schema=[
Field(name="highest_achieved_rank", dtype=String),
Field(name="avg_daily_trips_plus_one", dtype=Int64),
Field(name="conv_rate_plus_acc", dtype=Float64),
Field(name="is_highest_rank", dtype=Bool),
Field(name="achieved_ranks", dtype=Array(String)),
Field(name="trips_until_next_rank_int", dtype=Array(Int64)),
Field(name="trips_until_next_rank_float", dtype=Array(Float64)),
Field(name="achieved_ranks_mask", dtype=Array(Bool)),
],
mode="pandas",
)
def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_acc"] = inputs["conv_rate"] + inputs["acc_rate"]
df["avg_daily_trips_plus_one"] = inputs["avg_daily_trips"] + 1

df["trips_until_next_rank_int"] = inputs[
["avg_daily_trips", "avg_daily_trip_rank_thresholds"]
].apply(
lambda x: [max(threshold - x.iloc[0], 0) for threshold in x.iloc[1]],
axis=1,
)
df["trips_until_next_rank_float"] = df["trips_until_next_rank_int"].map(
lambda values: [float(value) for value in values]
)
df["achieved_ranks_mask"] = df["trips_until_next_rank_int"].map(
lambda values: [value <= 0 for value in values]
)

temp = pd.concat(
[df[["achieved_ranks_mask"]], inputs[["avg_daily_trip_rank_names"]]],
axis=1,
)
df["achieved_ranks"] = temp.apply(
lambda x: [
rank if achieved else "Locked"
for achieved, rank in zip(x.iloc[0], x.iloc[1])
],
axis=1,
)
df["highest_achieved_rank"] = (
df["achieved_ranks"]
.map(
lambda ranks: str(
([rank for rank in ranks if rank != "Locked"][-1:] or ["None"])[
0
]
)
)
.astype("string")
)
df["is_highest_rank"] = df["achieved_ranks"].map(
lambda ranks: ranks[-1] != "Locked"
)
return df

store.apply([driver, driver_stats_source, driver_stats_fv, pandas_view])

entity_rows = [
{
"driver_id": 1001,
"avg_daily_trip_rank_thresholds": [100, 250, 500, 1000],
"avg_daily_trip_rank_names": ["Bronze", "Silver", "Gold", "Platinum"],
}
]
store.write_to_online_store(
feature_view_name="driver_hourly_stats", df=driver_df
)

online_response = store.get_online_features(
entity_rows=entity_rows,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"pandas_view:avg_daily_trips_plus_one",
"pandas_view:conv_rate_plus_acc",
"pandas_view:trips_until_next_rank_int",
"pandas_view:trips_until_next_rank_float",
"pandas_view:achieved_ranks_mask",
"pandas_view:achieved_ranks",
"pandas_view:highest_achieved_rank",
"pandas_view:is_highest_rank",
],
).to_df()
# We use to_df here to ensure we use the pandas backend, but convert to a dict for comparisons
result = online_response.to_dict(orient="records")[0]

# Type assertions
# Materialized view
assert type(result["conv_rate"]) == float
assert type(result["acc_rate"]) == float
assert type(result["avg_daily_trips"]) == int
# On-demand view
assert type(result["avg_daily_trips_plus_one"]) == int
assert type(result["conv_rate_plus_acc"]) == float
assert type(result["highest_achieved_rank"]) == str
assert type(result["is_highest_rank"]) == bool

assert type(result["trips_until_next_rank_int"]) == list
assert all([type(e) == int for e in result["trips_until_next_rank_int"]])

assert type(result["trips_until_next_rank_float"]) == list
assert all([type(e) == float for e in result["trips_until_next_rank_float"]])

assert type(result["achieved_ranks"]) == list
assert all([type(e) == str for e in result["achieved_ranks"]])

assert type(result["achieved_ranks_mask"]) == list
assert all([type(e) == bool for e in result["achieved_ranks_mask"]])

# Value assertions
expected_trips_until_next_rank = [
max(threshold - result["avg_daily_trips"], 0)
for threshold in entity_rows[0]["avg_daily_trip_rank_thresholds"]
]
expected_mask = [value <= 0 for value in expected_trips_until_next_rank]
expected_ranks = [
rank if achieved else "Locked"
for achieved, rank in zip(
expected_mask, entity_rows[0]["avg_daily_trip_rank_names"]
)
]
highest_rank = (
[rank for rank in expected_ranks if rank != "Locked"][-1:] or ["None"]
)[0]

assert result["conv_rate_plus_acc"] == result["conv_rate"] + result["acc_rate"]
assert result["avg_daily_trips_plus_one"] == result["avg_daily_trips"] + 1
assert result["highest_achieved_rank"] == highest_rank
assert result["is_highest_rank"] == (expected_ranks[-1] != "Locked")

assert result["trips_until_next_rank_int"] == expected_trips_until_next_rank
assert result["trips_until_next_rank_float"] == [
float(value) for value in expected_trips_until_next_rank
]
assert result["achieved_ranks_mask"] == expected_mask
assert result["achieved_ranks"] == expected_ranks


def test_invalid_pandas_transformation_raises_type_error_on_apply():
with tempfile.TemporaryDirectory() as data_dir:
store = FeatureStore(
config=RepoConfig(
project="test_on_demand_python_transformation",
registry=os.path.join(data_dir, "registry.db"),
provider="local",
entity_key_serialization_version=2,
online_store=SqliteOnlineStoreConfig(
path=os.path.join(data_dir, "online.db")
),
)
)

request_source = RequestSource(
name="request_source",
schema=[
Field(name="driver_name", dtype=String),
],
)

@on_demand_feature_view(
sources=[request_source],
schema=[Field(name="driver_name_lower", dtype=String)],
mode="pandas",
)
def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"driver_name_lower": []})

with pytest.raises(
TypeError,
match=re.escape(
"Failed to infer type for feature 'driver_name_lower' with value '[]' since no items were returned by the UDF."
),
):
store.apply([request_source, pandas_view])
Loading

0 comments on commit 0ff73fb

Please sign in to comment.