-
Notifications
You must be signed in to change notification settings - Fork 993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Parallelize read calls by table and batch #4619
Conversation
cbf0773
to
66d5328
Compare
Signed-off-by: Rob Howley <[email protected]>
Signed-off-by: Rob Howley <[email protected]>
Signed-off-by: Rob Howley <[email protected]>
Signed-off-by: Rob Howley <[email protected]>
667bdbd
to
e6fcd78
Compare
batches = [] | ||
entity_id_batches = [] | ||
while True: | ||
batch = list(itertools.islice(entity_ids_iter, batch_size)) | ||
if not batch: | ||
break | ||
entity_id_batch = self._to_client_batch_get_payload( | ||
online_config, table_name, batch | ||
) | ||
batches.append(batch) | ||
entity_id_batches.append(entity_id_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
construct the batches of ids/entity_ids that we'll be looking up
response_batches = await asyncio.gather( | ||
*[ | ||
client.batch_get_item( | ||
RequestItems=entity_id_batch, | ||
) | ||
for entity_id_batch in entity_id_batches | ||
] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make those batch requests in parallel.
note: gather
maintains order
for batch, response in zip(batches, response_batches): | ||
result_batch = self._process_batch_get_response( | ||
table_name, | ||
response, | ||
entity_ids, | ||
batch, | ||
to_tbl_response=to_tbl_resp, | ||
) | ||
result_batches.append(result_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format the responses to the final format. we iterate through the list three times in stead of one, but make up for it in asyncing the batches
all_responses = await asyncio.gather( | ||
*[ | ||
query_table(table, requested_features) | ||
for table, requested_features in grouped_refs | ||
] | ||
) | ||
|
||
for (idxs, read_rows), (table, requested_features) in zip( | ||
all_responses, grouped_refs | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when requesting features across multiple tables, we can parallelize the calls to each.
@@ -240,7 +240,7 @@ async def get_online_features_async( | |||
native_entity_values=True, | |||
) | |||
|
|||
for table, requested_features in grouped_refs: | |||
async def query_table(table, requested_features): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add type hints?
lgtm |
What this PR does / why we need it:
Improve the use of async batch calls to dynamo db for
get_online_features_async
Which issue(s) this PR fixes:
Misc