Skip to content

Commit

Permalink
Reuse same setup and assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
breno-costa committed May 15, 2024
1 parent a9f4643 commit c5779b7
Showing 1 changed file with 29 additions and 62 deletions.
91 changes: 29 additions & 62 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import requests
from botocore.exceptions import BotoCoreError

from feast import FeatureStore
from feast.entity import Entity
from feast.errors import FeatureNameCollisionError
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -401,19 +402,13 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s
)


@pytest.mark.integration
@pytest.mark.universal_online_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_online_retrieval_with_event_timestamps(
environment, universal_data_sources, full_feature_names
):
fs = environment.feature_store
def setup_feature_store_universal_feature_views(environment, universal_data_sources) -> FeatureStore:
fs: FeatureStore = environment.feature_store
entities, datasets, data_sources = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)

fs.apply([driver(), feature_views.driver, feature_views.global_fv])

# fake data to ingest into Online Store
data = {
"driver_id": [1, 2],
"conv_rate": [0.5, 0.3],
Expand All @@ -430,18 +425,11 @@ def test_online_retrieval_with_event_timestamps(
}
df_ingest = pd.DataFrame(data)

# directly ingest data into the Online Store
fs.write_to_online_store("driver_stats", df_ingest)
return fs

response = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver_id": 1}, {"driver_id": 2}],
)
df = response.to_df(True)

def assert_feature_store_universal_feature_views_response(df: pd.DataFrame):
assertpy.assert_that(len(df)).is_equal_to(2)
assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1)
assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
Expand All @@ -466,33 +454,32 @@ def test_online_retrieval_with_event_timestamps(


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
@pytest.mark.universal_online_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_online_retrieval_with_event_timestamps(
environment, universal_data_sources, full_feature_names
):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)

fs.apply([driver(), feature_views.driver, feature_views.global_fv])
fs = setup_feature_store_universal_feature_views(fs, universal_data_sources)

data = {
"driver_id": [1, 2],
"conv_rate": [0.5, 0.3],
"acc_rate": [0.6, 0.4],
"avg_daily_trips": [4, 5],
"event_timestamp": [
pd.to_datetime(1646263500, utc=True, unit="s"),
pd.to_datetime(1646263600, utc=True, unit="s"),
],
"created": [
pd.to_datetime(1646263500, unit="s"),
pd.to_datetime(1646263600, unit="s"),
response = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
}
df_ingest = pd.DataFrame(data)
entity_rows=[{"driver_id": 1}, {"driver_id": 2}],
)
df = response.to_df(True)

fs.write_to_online_store("driver_stats", df_ingest)
assert_feature_store_universal_feature_views_response(df)


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
):
fs = setup_feature_store_universal_feature_views(fs, universal_data_sources)

response = asyncio.run(
fs.get_online_features_async(
Expand All @@ -506,27 +493,7 @@ def test_async_online_retrieval_with_event_timestamps(
)
df = response.to_df(True)

assertpy.assert_that(len(df)).is_equal_to(2)
assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1)
assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)
assert_feature_store_universal_feature_views_response(df)


@pytest.mark.integration
Expand Down

0 comments on commit c5779b7

Please sign in to comment.