Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow using entity's join_key in get_online_features #2420

Merged
merged 2 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,9 +1266,11 @@ def _get_online_features(
features=features, allow_cache=True, hide_dummy_entity=False
)

entity_name_to_join_key_map, entity_type_map = self._get_entity_maps(
requested_feature_views
)
(
entity_name_to_join_key_map,
entity_type_map,
join_keys_set,
) = self._get_entity_maps(requested_feature_views)

# Extract Sequence from RepeatedValue Protobuf.
entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = {
Expand Down Expand Up @@ -1322,22 +1324,32 @@ def _get_online_features(
join_key_values: Dict[str, List[Value]] = {}
request_data_features: Dict[str, List[Value]] = {}
# Entity rows may be either entities or request data.
for entity_name, values in entity_proto_values.items():
for join_key_or_entity_name, values in entity_proto_values.items():
# Found request data
if (
entity_name in needed_request_data
or entity_name in needed_request_fv_features
join_key_or_entity_name in needed_request_data
or join_key_or_entity_name in needed_request_fv_features
):
if entity_name in needed_request_fv_features:
if join_key_or_entity_name in needed_request_fv_features:
# If the data was requested as a feature then
# make sure it appears in the result.
requested_result_row_names.add(entity_name)
request_data_features[entity_name] = values
requested_result_row_names.add(join_key_or_entity_name)
request_data_features[join_key_or_entity_name] = values
else:
try:
join_key = entity_name_to_join_key_map[entity_name]
except KeyError:
raise EntityNotFoundException(entity_name, self.project)
if join_key_or_entity_name in join_keys_set:
join_key = join_key_or_entity_name
else:
try:
join_key = entity_name_to_join_key_map[join_key_or_entity_name]
except KeyError:
raise EntityNotFoundException(
join_key_or_entity_name, self.project
)
else:
warnings.warn(
"Using entity name is deprecated. Use join_key instead."
)

# All join keys should be returned in the result.
requested_result_row_names.add(join_key)
join_key_values[join_key] = values
Expand Down Expand Up @@ -1422,7 +1434,9 @@ def _get_columnar_entity_values(
return res
return cast(Dict[str, List[Any]], columnar)

def _get_entity_maps(self, feature_views):
def _get_entity_maps(
self, feature_views
) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]:
entities = self._list_entities(allow_cache=True, hide_dummy_entity=False)
entity_name_to_join_key_map: Dict[str, str] = {}
entity_type_map: Dict[str, ValueType] = {}
Expand All @@ -1444,7 +1458,11 @@ def _get_entity_maps(self, feature_views):
)
entity_name_to_join_key_map[entity_name] = join_key
entity_type_map[join_key] = entity.value_type
return entity_name_to_join_key_map, entity_type_map
return (
entity_name_to_join_key_map,
entity_type_map,
set(entity_name_to_join_key_map.values()),
)

@staticmethod
def _get_table_entity_values(
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

driver = Entity(
name="driver", # The name is derived from this argument, not object name.
join_key="driver_id",
value_type=ValueType.INT64,
description="driver id",
)

customer = Entity(
name="customer", # The name is derived from this argument, not object name.
join_key="customer_id",
value_type=ValueType.STRING,
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def check_offline_and_online_features(
# Check online store
response_dict = fs.get_online_features(
[f"{fv.name}:value"],
[{"driver": driver_id}],
[{"driver_id": driver_id}],
full_feature_names=full_feature_names,
).to_dict()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_online() -> None:
provider = store._get_provider()

driver_key = EntityKeyProto(
join_keys=["driver"], entity_values=[ValueProto(int64_val=1)]
join_keys=["driver_id"], entity_values=[ValueProto(int64_val=1)]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -54,7 +54,7 @@ def test_online() -> None:
)

customer_key = EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val="5")]
join_keys=["customer_id"], entity_values=[ValueProto(string_val="5")]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -75,7 +75,7 @@ def test_online() -> None:
)

customer_key = EntityKeyProto(
join_keys=["customer", "driver"],
join_keys=["customer_id", "driver_id"],
entity_values=[ValueProto(string_val="5"), ValueProto(int64_val=1)],
)
provider.online_write_batch(
Expand All @@ -100,15 +100,18 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": "5"}, {"driver": 1, "customer": 5}],
entity_rows=[
{"driver_id": 1, "customer_id": "5"},
{"driver_id": 1, "customer_id": 5},
],
full_feature_names=False,
).to_dict()

assert "lon" in result
assert "avg_orders_day" in result
assert "name" in result
assert result["driver"] == [1, 1]
assert result["customer"] == ["5", "5"]
assert result["driver_id"] == [1, 1]
assert result["customer_id"] == ["5", "5"]
assert result["lon"] == ["1.0", "1.0"]
assert result["avg_orders_day"] == [1.0, 1.0]
assert result["name"] == ["John", "John"]
Expand All @@ -117,7 +120,7 @@ def test_online() -> None:
# Ensure features are still in result when keys not found
result = store.get_online_features(
features=["customer_driver_combined:trips"],
entity_rows=[{"driver": 0, "customer": 0}],
entity_rows=[{"driver_id": 0, "customer_id": 0}],
full_feature_names=False,
).to_dict()

Expand All @@ -127,7 +130,7 @@ def test_online() -> None:
with pytest.raises(FeatureViewNotFoundException):
store.get_online_features(
features=["driver_locations_bad:lon"],
entity_rows=[{"driver": 1}],
entity_rows=[{"driver_id": 1}],
full_feature_names=False,
)

Expand All @@ -152,7 +155,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -173,7 +176,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()

Expand All @@ -188,7 +191,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -214,7 +217,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand All @@ -234,7 +237,7 @@ def test_online() -> None:
"customer_profile:name",
"customer_driver_combined:trips",
],
entity_rows=[{"driver": 1, "customer": 5}],
entity_rows=[{"driver_id": 1, "customer_id": 5}],
full_feature_names=False,
).to_dict()
assert result["lon"] == ["1.0"]
Expand Down Expand Up @@ -284,7 +287,7 @@ def test_online_to_df():
3 3.0 0.3
"""
driver_key = EntityKeyProto(
join_keys=["driver"], entity_values=[ValueProto(int64_val=d)]
join_keys=["driver_id"], entity_values=[ValueProto(int64_val=d)]
)
provider.online_write_batch(
config=store.config,
Expand All @@ -311,7 +314,7 @@ def test_online_to_df():
6 6.0 foo6 60
"""
customer_key = EntityKeyProto(
join_keys=["customer"], entity_values=[ValueProto(string_val=str(c))]
join_keys=["customer_id"], entity_values=[ValueProto(string_val=str(c))]
)
provider.online_write_batch(
config=store.config,
Expand Down Expand Up @@ -340,7 +343,7 @@ def test_online_to_df():
6 3 18
"""
combo_keys = EntityKeyProto(
join_keys=["customer", "driver"],
join_keys=["customer_id", "driver_id"],
entity_values=[ValueProto(string_val=str(c)), ValueProto(int64_val=d)],
)
provider.online_write_batch(
Expand Down Expand Up @@ -369,7 +372,7 @@ def test_online_to_df():
],
# Reverse the row order
entity_rows=[
{"driver": d, "customer": c}
{"driver_id": d, "customer_id": c}
for (d, c) in zip(reversed(driver_ids), reversed(customer_ids))
],
).to_df()
Expand All @@ -381,8 +384,8 @@ def test_online_to_df():
1 4 1.0 0.1 4.0 foo4 40 4
"""
df_dict = {
"driver": driver_ids,
"customer": [str(c) for c in customer_ids],
"driver_id": driver_ids,
"customer_id": [str(c) for c in customer_ids],
"lon": [str(d * lon_multiply) for d in driver_ids],
"lat": [d * lat_multiply for d in driver_ids],
"avg_orders_day": [c * avg_order_day_multiply for c in customer_ids],
Expand All @@ -392,8 +395,8 @@ def test_online_to_df():
}
# Requested column order
ordered_column = [
"driver",
"customer",
"driver_id",
"customer_id",
"lon",
"lat",
"avg_orders_day",
Expand Down
Loading