Skip to content

Commit

Permalink
feat(awssqs): Support sending messages to SQS FIFO queues (#261)
Browse files Browse the repository at this point in the history
* Add optional MessageGroupId field to SQS to support FIFO queues

Signed-off-by: Thomas Sands <[email protected]>

* Add docs for fifo sqs

Signed-off-by: Thomas Sands <[email protected]>

---------

Signed-off-by: Thomas Sands <[email protected]>
  • Loading branch information
thomassandslyst authored Feb 1, 2024
1 parent 84b9f79 commit 85c76ec
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 2 deletions.
17 changes: 15 additions & 2 deletions docs/services/awssqs.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# AWS SQS
# AWS SQS

## Parameters

This notification service is capable of sending simple messages to AWS SQS queue.
This notification service is capable of sending simple messages to AWS SQS queue.

* `queue` - name of the queue you are intending to send messages to. Can be overridden with target destination annotation.
* `region` - region of the sqs queue can be provided via env variable AWS_DEFAULT_REGION
Expand Down Expand Up @@ -104,3 +104,16 @@ data:
- oncePer: obj.metadata.annotations["generation"]
```
## FIFO SQS Queues
FIFO queues require a [MessageGroupId](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageGroupId) to be sent along with every message, every message with a matching MessageGroupId will be processed one by one in order.
To send to a FIFO SQS Queue you must include a `messageGroupId` in the template such as in the example below:

```yaml
template.deployment-ready: |
message: |
Deployment {{.obj.metadata.name}} is ready!
messageGroupId: {{.obj.metadata.name}}-deployment
```
14 changes: 14 additions & 0 deletions pkg/services/awssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type AwsSqsNotification struct {
MessageAttributes map[string]string `json:"messageAttributes"`
MessageGroupId string `json:"messageGroupId,omitempty"`
}

type AwsSqsOptions struct {
Expand Down Expand Up @@ -130,6 +131,11 @@ func (s awsSqsService) getCustomResolver(endpointRegion string) func(service, re
}

func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) {
groupId, err := texttemplate.New(name).Funcs(f).Parse(n.MessageGroupId)
if err != nil {
return nil, err
}

return func(notification *Notification, vars map[string]interface{}) error {
if notification.AwsSqs == nil {
notification.AwsSqs = &AwsSqsNotification{}
Expand All @@ -142,6 +148,14 @@ func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (
}
}

var groupIdBuff bytes.Buffer
if err := groupId.Execute(&groupIdBuff, vars); err != nil {
return err
}
if val := groupIdBuff.String(); val != "" {
notification.AwsSqs.MessageGroupId = val
}

return nil
}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/services/awssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
MessageAttributes: map[string]string{
"attributeKey": "{{.messageAttributeValue}}",
},
MessageGroupId: "{{.messageGroupId}}",
},
}

Expand All @@ -33,6 +34,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
err = templater(&notification, map[string]interface{}{
"message": "abcdef",
"messageAttributeValue": "123456",
"messageGroupId": "a1b2c3",
})

if !assert.NoError(t, err) {
Expand All @@ -42,6 +44,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
assert.Equal(t, map[string]string{
"attributeKey": "123456",
}, notification.AwsSqs.MessageAttributes)
assert.Equal(t, "a1b2c3", notification.AwsSqs.MessageGroupId)
}

func TestSend_AwsSqs(t *testing.T) {
Expand Down

0 comments on commit 85c76ec

Please sign in to comment.