Skip to content

Commit

Permalink
chore: Add feature repo for version 0.19 (#2659)
Browse files Browse the repository at this point in the history
* Use `join_keys` instead of `join_key` when instantiating entity

Signed-off-by: Felix Wang <[email protected]>

* Switch from `batch_source` to `source`

Signed-off-by: Felix Wang <[email protected]>

* Clean up test

Signed-off-by: Felix Wang <[email protected]>

* Add example feature repo for version 0.19

Signed-off-by: Felix Wang <[email protected]>

* Add test for 0.19 example feature repo

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored May 11, 2022
1 parent bb72b7c commit 6589f15
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 41 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def from_proto(cls, entity_proto: EntityProto):
entity = cls(
name=entity_proto.spec.name,
value_type=ValueType(entity_proto.spec.value_type),
join_key=entity_proto.spec.join_key,
join_keys=[entity_proto.spec.join_key],
description=entity_proto.spec.description,
tags=entity_proto.spec.tags,
owner=entity_proto.spec.owner,
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/tests/example_repos/example_feature_repo_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
created_timestamp_column="created",
)

driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")


driver_hourly_stats_view = FeatureView(
Expand All @@ -22,7 +22,7 @@
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
batch_source=driver_hourly_stats,
source=driver_hourly_stats,
tags={},
)

Expand All @@ -43,6 +43,6 @@
Field(name="avg_ride_length", dtype=Float32),
],
online=True,
batch_source=global_daily_stats,
source=global_daily_stats,
tags={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta

from feast import Entity, Feature, FeatureView, FileSource, ValueType

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
join_key="driver_id", # Changed to `join_keys` in 0.20
)


driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(days=1),
features=[ # Changed to `schema` in 0.20
Feature(name="conv_rate", dtype=ValueType.FLOAT), # Changed to `Field` in 0.20
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
batch_source=driver_hourly_stats, # Changed to `source` in 0.20
tags={},
)


global_daily_stats = FileSource(
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20
created_timestamp_column="created",
)


global_stats_feature_view = FeatureView(
name="global_daily_stats",
entities=[],
ttl=timedelta(days=1),
features=[ # Changed to `schema` in 0.20
Feature(name="num_rides", dtype=ValueType.INT32), # Changed to `Field` in 0.20
Feature(name="avg_ride_length", dtype=ValueType.FLOAT),
],
online=True,
batch_source=global_daily_stats, # Changed to `source` in 0.20
tags={},
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
batch_source=driver_hourly_stats,
source=driver_hourly_stats,
tags={},
)
85 changes: 49 additions & 36 deletions sdk/python/tests/integration/online_store/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import pandas as pd
from pytz import utc

import feast.driver_test_data as driver_data
from feast import FeatureStore
from feast.driver_test_data import (
create_driver_hourly_stats_df,
create_global_daily_stats_df,
)
from feast.feature_store import FeatureStore
from tests.utils.cli_utils import CliRunner, get_example_repo


Expand Down Expand Up @@ -65,6 +68,37 @@ def _assert_online_features(
assert "global_daily_stats__avg_ride_length" in result


def _test_materialize_and_online_retrieval(
runner: CliRunner,
store: FeatureStore,
start_date: datetime,
end_date: datetime,
driver_df: pd.DataFrame,
):
assert store.repo_path is not None

# Test `feast materialize` and online retrieval.
r = runner.run(
[
"materialize",
start_date.isoformat(),
(end_date - timedelta(days=7)).isoformat(),
],
cwd=Path(store.repo_path),
)

assert r.returncode == 0
_assert_online_features(store, driver_df, end_date - timedelta(days=7))

# Test `feast materialize-incremental` and online retrieval.
r = runner.run(
["materialize-incremental", end_date.isoformat()], cwd=Path(store.repo_path),
)

assert r.returncode == 0
_assert_online_features(store, driver_df, end_date)


def test_e2e_local() -> None:
"""
A more comprehensive than "basic" test, using local provider.
Expand All @@ -74,71 +108,50 @@ def test_e2e_local() -> None:
3. Ingest some data to online store from parquet
4. Read from the online store to make sure it made it there.
"""

runner = CliRunner()
with tempfile.TemporaryDirectory() as data_dir:

# Generate some test data in parquet format.
# Generate test data.
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = driver_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
driver_stats_path = os.path.join(data_dir, "driver_stats.parquet")
driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True)

global_df = driver_data.create_global_daily_stats_df(start_date, end_date)
global_df = create_global_daily_stats_df(start_date, end_date)
global_stats_path = os.path.join(data_dir, "global_stats.parquet")
global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True)

# Note that runner takes care of running apply/teardown for us here.
# We patch python code in example_feature_repo_2.py to set the path to Parquet files.
with runner.local_repo(
get_example_repo("example_feature_repo_2.py")
.replace("%PARQUET_PATH%", driver_stats_path)
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
"file",
) as store:

assert store.repo_path is not None

# feast materialize
r = runner.run(
[
"materialize",
start_date.isoformat(),
(end_date - timedelta(days=7)).isoformat(),
],
cwd=Path(store.repo_path),
_test_materialize_and_online_retrieval(
runner, store, start_date, end_date, driver_df
)

assert r.returncode == 0

_assert_online_features(store, driver_df, end_date - timedelta(days=7))

# feast materialize-incremental
r = runner.run(
["materialize-incremental", end_date.isoformat()],
cwd=Path(store.repo_path),
with runner.local_repo(
get_example_repo("example_feature_repo_version_0_19.py")
.replace("%PARQUET_PATH%", driver_stats_path)
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
"file",
) as store:
_test_materialize_and_online_retrieval(
runner, store, start_date, end_date, driver_df
)

assert r.returncode == 0

_assert_online_features(store, driver_df, end_date)

# Test a failure case when the parquet file doesn't include a join key
with runner.local_repo(
get_example_repo("example_feature_repo_with_entity_join_key.py").replace(
"%PARQUET_PATH%", driver_stats_path
),
"file",
) as store:

assert store.repo_path is not None

# feast materialize
returncode, output = runner.run_with_output(
[
"materialize",
Expand Down

0 comments on commit 6589f15

Please sign in to comment.