Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: aggregator service and events #61

Merged
merged 2 commits into from
Nov 1, 2023
Merged

Conversation

vasco-santos
Copy link
Contributor

@vasco-santos vasco-santos commented Oct 30, 2023

This PR wires up aggregator service and its events. All old code from aggregator was removed given it is now in filecoin-api, so there is a huge diff of removed code. data folder, workflows folders and processor folder were dropped

Note that:

  • Given most bucket stores are quite identical, an abstraction layer for them was created so that we do not need to keep all the similar code and just focus on extending the differences
  • Created a InclusionProofStore where Proof bytes are stored. From filecoin-api interface standpoint, proof is available via inclusion-store, but we can't stick it in DynamoDB. While there are 2 PUT ops behind now (note that if the first fails to the proof bucket, there is no issue as nothing will happen with it)
  • Removed most of integration tests as they need to be rewritten as follow up

@seed-deploy seed-deploy bot temporarily deployed to pr61 October 31, 2023 10:49 Inactive
@vasco-santos vasco-santos marked this pull request as ready for review October 31, 2023 11:04
@vasco-santos vasco-santos changed the title feat: aggregator service feat: aggregator service and events Oct 31, 2023
@seed-deploy
Copy link

seed-deploy bot commented Oct 31, 2023

View stack outputs

Copy link
Member

@alanshaw alanshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think nothing blocking just some suggestions 🚀

.env.tpl Outdated
@@ -1,8 +1,8 @@
# These variables are only available in your SST code.

# uncomment to try out deploying the api under a custom domain.
# uncomment to try out deploying the `aggregator-api`` under a custom domain.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# uncomment to try out deploying the `aggregator-api`` under a custom domain.
# uncomment to try out deploying the `aggregator-api` under a custom domain.

package.json Outdated
Comment on lines 25 to 27
"@ucanto/client": "9.0.0",
"@ucanto/principal": "9.0.0",
"@ucanto/transport": "9.0.0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing hats?

Suggested change
"@ucanto/client": "9.0.0",
"@ucanto/principal": "9.0.0",
"@ucanto/transport": "9.0.0",
"@ucanto/client": "^9.0.0",
"@ucanto/principal": "^9.0.0",
"@ucanto/transport": "^9.0.0",

Comment on lines +18 to +20
const encodedBytes = JSONencode(aggregateOfferMessage)
return {
MessageBody: toString(encodedBytes),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe dag-json has stringify and parse functions:

import * as dagJSON from '@ipld/dag-json'

const str = dagJSON.stringify(aggregateOfferMessage)

return {
MessageBody: toString(encodedBytes),
// FIFO Queue message group id
MessageGroupId: options.disableMessageGroupId ? undefined : aggregateOfferMessage.group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue this is not part of the encoded message and is an option you'd sent when sending it...

Copy link
Contributor Author

@vasco-santos vasco-santos Nov 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create an issue to do this in separate: #63

export const decodeMessage = (message) => {
const decodedBytes = fromString(message.MessageBody)
return JSONdecode(decodedBytes)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be repeated a lot - would be nice to reuse something here?

// if one we should put back in queue
if (sqsEvent.Records.length === 1) {
return {
batchItemFailures: sqsEvent.Records.map(r => ({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this and just throwing and error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that we can return 200 and account for metrics in a different way if we would like to. This is because it is possible to receive 1 item in the batch if not enough trhoughput

if (eventRawRecords.length !== 1) {
return {
statusCode: 400,
body: `Expected 1 DynamoDBStreamEvent per invocation but received ${eventRawRecords.length}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
body: `Expected 1 DynamoDBStreamEvent per invocation but received ${eventRawRecords.length}`
body: `Expected 1 DynamoDB record per invocation but received ${eventRawRecords.length}`

Comment on lines +46 to +52
const { ok, error } = await aggregatorEvents.handleInclusionInsertToUpdateState(context, record)
if (error) {
return {
statusCode: 500,
body: error.message || 'failed to handle inclusion insert event'
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not throwing here will remove the stack from the logs, so while you get a nice message in the body, it might be harder to debug.

I'm using this helper (Irakli suggested) to make throwing a little easier:

https://github.com/web3-storage/w3infra/blob/8bdd528a284446aed93db024ee1323aabc122dbf/billing/functions/lib.js#L11-L22

The code above becomes:

Suggested change
const { ok, error } = await aggregatorEvents.handleInclusionInsertToUpdateState(context, record)
if (error) {
return {
statusCode: 500,
body: error.message || 'failed to handle inclusion insert event'
}
}
const ok = expect(
await aggregatorEvents.handleInclusionInsertToUpdateState(context, record),
'failed to handle inclusion insert event'
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I will see how to adopt this.
One thing I would like, is to rely on status codes to infer metrics and this would make it difficult unless we explore this a bit further. Will leave in scope of this PR

if (sqsEvent.Records.length !== 1) {
return {
statusCode: 400,
body: `Expected 1 sqsEvent per invocation but received ${sqsEvent.Records.length}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
body: `Expected 1 sqsEvent per invocation but received ${sqsEvent.Records.length}`
body: `Expected 1 SQS message per invocation but received ${sqsEvent.Records.length}`

if (sqsEvent.Records.length !== 1) {
return {
statusCode: 400,
body: `Expected 1 sqsEvent per invocation but received ${sqsEvent.Records.length}`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
body: `Expected 1 sqsEvent per invocation but received ${sqsEvent.Records.length}`
body: `Expected 1 SQS message per invocation but received ${sqsEvent.Records.length}`

@seed-deploy seed-deploy bot temporarily deployed to pr61 November 1, 2023 18:58 Inactive
@vasco-santos vasco-santos merged commit e50b866 into main Nov 1, 2023
3 checks passed
@vasco-santos vasco-santos deleted the feat/aggregator-service branch November 1, 2023 19:05
Copy link

sentry-io bot commented Nov 2, 2023

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ SyntaxError: Non-base32 character vcs-w3filecoin-Aggregator-Consumerbufferqueue03... View Issue
  • ‼️ Error: Unsupported type passed: bafyreifoblfxjiphxpq5jgmn3gjvol7ao4auhzj4kj6dfczph5gskpricq. Pass option... vcs-w3filecoin-Aggregator-Consumeraggregateoffe... View Issue
  • ‼️ HTTPError: HTTP Request failed prod-w3filecoin-Aggregato-Consumeraggregatorinc... View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants