Skip to content

Commit

Permalink
Add redis online provider - integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: qooba <[email protected]>
  • Loading branch information
qooba committed May 6, 2021
1 parent f625b15 commit 6b33786
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions sdk/python/feast/infra/redis_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,15 @@ def teardown_infra(
) -> None:
# according to the repos_operations.py we can delete the whole project
client = self._get_client()
keys = client.keys("*{project}:*")
client.unlink(*keys)

tables_join_keys = [[e for e in t.entities] for t in tables]
for table_join_keys in tables_join_keys:
redis_key_bin = _redis_key(
project, EntityKeyProto(join_keys=table_join_keys)
)
keys = client.keys(f"{redis_key_bin}*")
if keys:
client.unlink(*keys)

def online_write_batch(
self,
Expand Down Expand Up @@ -103,33 +110,35 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

if requested_features:
for entity_key in entity_keys:
redis_key_bin = _redis_key(project, entity_key)
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)
requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))

res_ts = Timestamp()
ts_val = res_val.pop(ts_key)
if ts_val:
res_ts.ParseFromString(ts_val)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
if val_bin:
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
result.append((None, None))
else:
timestamp = datetime.fromtimestamp(res_ts.seconds)
result.append((timestamp, res))
if not requested_features:
requested_features = [f.name for f in table.features]

for entity_key in entity_keys:
redis_key_bin = _redis_key(project, entity_key)
hset_keys = [_mmh3(f"{feature_view}:{k}") for k in requested_features]
ts_key = f"_ts:{feature_view}"
hset_keys.append(ts_key)
values = client.hmget(redis_key_bin, hset_keys)
requested_features.append(ts_key)
res_val = dict(zip(requested_features, values))

res_ts = Timestamp()
ts_val = res_val.pop(ts_key)
if ts_val:
res_ts.ParseFromString(ts_val)

res = {}
for feature_name, val_bin in res_val.items():
val = ValueProto()
if val_bin:
val.ParseFromString(val_bin)
res[feature_name] = val

if not res:
result.append((None, None))
else:
timestamp = datetime.fromtimestamp(res_ts.seconds)
result.append((timestamp, res))
return result

def materialize_single_feature_view(
Expand Down

0 comments on commit 6b33786

Please sign in to comment.