diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 3aaf0f9b69..38c353857c 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -180,7 +180,7 @@ def from_proto(cls, entity_proto: EntityProto): entity = cls( name=entity_proto.spec.name, value_type=ValueType(entity_proto.spec.value_type), - join_key=entity_proto.spec.join_key, + join_keys=[entity_proto.spec.join_key], description=entity_proto.spec.description, tags=entity_proto.spec.tags, owner=entity_proto.spec.owner, diff --git a/sdk/python/tests/example_repos/example_feature_repo_2.py b/sdk/python/tests/example_repos/example_feature_repo_2.py index 1ca7cc3805..d4c7976418 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_2.py +++ b/sdk/python/tests/example_repos/example_feature_repo_2.py @@ -9,7 +9,7 @@ created_timestamp_column="created", ) -driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") driver_hourly_stats_view = FeatureView( @@ -22,7 +22,7 @@ Field(name="avg_daily_trips", dtype=Int64), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, tags={}, ) @@ -43,6 +43,6 @@ Field(name="avg_ride_length", dtype=Float32), ], online=True, - batch_source=global_daily_stats, + source=global_daily_stats, tags={}, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_version_0_19.py b/sdk/python/tests/example_repos/example_feature_repo_version_0_19.py new file mode 100644 index 0000000000..e00a69b867 --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_version_0_19.py @@ -0,0 +1,52 @@ +from datetime import timedelta + +from feast import Entity, Feature, FeatureView, FileSource, ValueType + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # placeholder to be replaced by the test + event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20 + created_timestamp_column="created", +) + +driver = Entity( + name="driver_id", + value_type=ValueType.INT64, + description="driver id", + join_key="driver_id", # Changed to `join_keys` in 0.20 +) + + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=timedelta(days=1), + features=[ # Changed to `schema` in 0.20 + Feature(name="conv_rate", dtype=ValueType.FLOAT), # Changed to `Field` in 0.20 + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + batch_source=driver_hourly_stats, # Changed to `source` in 0.20 + tags={}, +) + + +global_daily_stats = FileSource( + path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test + event_timestamp_column="event_timestamp", # Changed to `timestamp_field` in 0.20 + created_timestamp_column="created", +) + + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=1), + features=[ # Changed to `schema` in 0.20 + Feature(name="num_rides", dtype=ValueType.INT32), # Changed to `Field` in 0.20 + Feature(name="avg_ride_length", dtype=ValueType.FLOAT), + ], + online=True, + batch_source=global_daily_stats, # Changed to `source` in 0.20 + tags={}, +) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py index ba18cf84ba..5ba26d2573 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_repos/example_feature_repo_with_entity_join_key.py @@ -29,6 +29,6 @@ Field(name="avg_daily_trips", dtype=Int64), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, tags={}, ) diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index c1aa10900a..59ea0777a4 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -6,8 +6,11 @@ import pandas as pd from pytz import utc -import feast.driver_test_data as driver_data -from feast import FeatureStore +from feast.driver_test_data import ( + create_driver_hourly_stats_df, + create_global_daily_stats_df, +) +from feast.feature_store import FeatureStore from tests.utils.cli_utils import CliRunner, get_example_repo @@ -65,6 +68,37 @@ def _assert_online_features( assert "global_daily_stats__avg_ride_length" in result +def _test_materialize_and_online_retrieval( + runner: CliRunner, + store: FeatureStore, + start_date: datetime, + end_date: datetime, + driver_df: pd.DataFrame, +): + assert store.repo_path is not None + + # Test `feast materialize` and online retrieval. + r = runner.run( + [ + "materialize", + start_date.isoformat(), + (end_date - timedelta(days=7)).isoformat(), + ], + cwd=Path(store.repo_path), + ) + + assert r.returncode == 0 + _assert_online_features(store, driver_df, end_date - timedelta(days=7)) + + # Test `feast materialize-incremental` and online retrieval. + r = runner.run( + ["materialize-incremental", end_date.isoformat()], cwd=Path(store.repo_path), + ) + + assert r.returncode == 0 + _assert_online_features(store, driver_df, end_date) + + def test_e2e_local() -> None: """ A more comprehensive than "basic" test, using local provider. @@ -74,60 +108,41 @@ def test_e2e_local() -> None: 3. Ingest some data to online store from parquet 4. Read from the online store to make sure it made it there. """ - runner = CliRunner() with tempfile.TemporaryDirectory() as data_dir: - - # Generate some test data in parquet format. + # Generate test data. end_date = datetime.now().replace(microsecond=0, second=0, minute=0) start_date = end_date - timedelta(days=15) driver_entities = [1001, 1002, 1003, 1004, 1005] - driver_df = driver_data.create_driver_hourly_stats_df( - driver_entities, start_date, end_date - ) + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) - global_df = driver_data.create_global_daily_stats_df(start_date, end_date) + global_df = create_global_daily_stats_df(start_date, end_date) global_stats_path = os.path.join(data_dir, "global_stats.parquet") global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) - # Note that runner takes care of running apply/teardown for us here. - # We patch python code in example_feature_repo_2.py to set the path to Parquet files. with runner.local_repo( get_example_repo("example_feature_repo_2.py") .replace("%PARQUET_PATH%", driver_stats_path) .replace("%PARQUET_PATH_GLOBAL%", global_stats_path), "file", ) as store: - - assert store.repo_path is not None - - # feast materialize - r = runner.run( - [ - "materialize", - start_date.isoformat(), - (end_date - timedelta(days=7)).isoformat(), - ], - cwd=Path(store.repo_path), + _test_materialize_and_online_retrieval( + runner, store, start_date, end_date, driver_df ) - assert r.returncode == 0 - - _assert_online_features(store, driver_df, end_date - timedelta(days=7)) - - # feast materialize-incremental - r = runner.run( - ["materialize-incremental", end_date.isoformat()], - cwd=Path(store.repo_path), + with runner.local_repo( + get_example_repo("example_feature_repo_version_0_19.py") + .replace("%PARQUET_PATH%", driver_stats_path) + .replace("%PARQUET_PATH_GLOBAL%", global_stats_path), + "file", + ) as store: + _test_materialize_and_online_retrieval( + runner, store, start_date, end_date, driver_df ) - assert r.returncode == 0 - - _assert_online_features(store, driver_df, end_date) - # Test a failure case when the parquet file doesn't include a join key with runner.local_repo( get_example_repo("example_feature_repo_with_entity_join_key.py").replace( @@ -135,10 +150,8 @@ def test_e2e_local() -> None: ), "file", ) as store: - assert store.repo_path is not None - # feast materialize returncode, output = runner.run_with_output( [ "materialize",