-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] FIFO queue experimental changes #281
Conversation
lib/message.js
Outdated
// Only FIFO queue messages will have this property. | ||
const messageGroupId = sqsMessage.MessageGroupId; | ||
const envSubject = messageGroupId || snsMessage.Subject; | ||
if (envSubject) this.env.Subject = envSubject; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no guarantee that the SQS Message body will be JSON.parse
able on L32 above, or that it will have a .Message
property on L39.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. So I'm thinking now that the parsed body of the sqsMessage
is the snsMessage
because we make that happen, so when the FIFO SQS message is sent directly (instead of an indirect SQS message via SNS) we won't have that body. I'm having trouble seeing how the SNS & SQS hookup works, but I'll try running with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SNS message is a JSON object { Subject: 'string', Message: 'string' }
. When an SNS topic is subscribed to an SQS queue, messages sent to the SNS are forwarded to the SQS queue, which means stringifying the SNS object object, and supplying it as the .Body
attribute of the larger object that represents the SQS message. That SQS message object with the SNS data stringified in the .Body
is what watchbot receives from the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Ah, so that subscription must happen through the CloudFormation template that gets generated. I think I'm seeing now where that probably happens:
Lines 239 to 249 in 3975071
Resources[prefixed('Topic')] = { | |
Type: 'AWS::SNS::Topic', | |
Properties: { | |
Subscription: [ | |
{ | |
Endpoint: cf.getAtt(prefixed('Queue'), 'Arn'), | |
Protocol: 'sqs' | |
} | |
] | |
} | |
}; |
This seems to be working in a sense. However, I'm not currently seeing the message group behavior I'd expect. I set my worker's task to take 5 seconds, then fired off a series of messages in groups a1, a2, b1, a3, c1, b2. I expected these to end up processing in an order like a1, b1, c1, a2, b2, a3; but instead they all waited for each other, as though they were in the same group, and processed in the order that they were triggered. @rclark do you know of anything still left in this branch that would cause the queue to only allow one at a time, regardless of group? It's also possible that my quick test isn't quite right. |
Do you know how many containers you had running at the time of your test? If there was only 1, then this would make sense, since 1 container can only process 1 message at a time. It can take watchbot up to 5 or 10 minutes to react to changes in the queue size and scale up the number of containers it is running. For your next test, you should either (a) make your jobs take considerably longer (like 5 minutes each), or (b) set the |
@rclark That was it, thanks. Added a minSize of 10 and then I saw the behavior that I expected. |
Merging this info #279 to continue work. |
Merges into #279. I created a new branch so I could experiment with this for a little while without messing up the current
fifo
branch.cc @mapbox/frontend-platform @rclark