From 7b4ba4e65cc5379b57b49f382ebf8c3e1a2a8230 Mon Sep 17 00:00:00 2001 From: Thomas Sands Date: Thu, 25 Jan 2024 13:58:17 +0000 Subject: [PATCH 1/2] Add optional MessageGroupId field to SQS to support FIFO queues Signed-off-by: Thomas Sands --- pkg/services/awssqs.go | 14 ++++++++++++++ pkg/services/awssqs_test.go | 3 +++ 2 files changed, 17 insertions(+) diff --git a/pkg/services/awssqs.go b/pkg/services/awssqs.go index 995fb7ed..026f9e2e 100644 --- a/pkg/services/awssqs.go +++ b/pkg/services/awssqs.go @@ -16,6 +16,7 @@ import ( type AwsSqsNotification struct { MessageAttributes map[string]string `json:"messageAttributes"` + MessageGroupId string `json:"messageGroupId,omitempty"` } type AwsSqsOptions struct { @@ -130,6 +131,11 @@ func (s awsSqsService) getCustomResolver(endpointRegion string) func(service, re } func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) { + groupId, err := texttemplate.New(name).Funcs(f).Parse(n.MessageGroupId) + if err != nil { + return nil, err + } + return func(notification *Notification, vars map[string]interface{}) error { if notification.AwsSqs == nil { notification.AwsSqs = &AwsSqsNotification{} @@ -142,6 +148,14 @@ func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) ( } } + var groupIdBuff bytes.Buffer + if err := groupId.Execute(&groupIdBuff, vars); err != nil { + return err + } + if val := groupIdBuff.String(); val != "" { + notification.AwsSqs.MessageGroupId = val + } + return nil }, nil } diff --git a/pkg/services/awssqs_test.go b/pkg/services/awssqs_test.go index 97b67c75..95e5708e 100644 --- a/pkg/services/awssqs_test.go +++ b/pkg/services/awssqs_test.go @@ -20,6 +20,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { MessageAttributes: map[string]string{ "attributeKey": "{{.messageAttributeValue}}", }, + MessageGroupId: "{{.messageGroupId}}", }, } @@ -33,6 +34,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { err = templater(¬ification, map[string]interface{}{ "message": "abcdef", "messageAttributeValue": "123456", + "messageGroupId": "a1b2c3", }) if !assert.NoError(t, err) { @@ -42,6 +44,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { assert.Equal(t, map[string]string{ "attributeKey": "123456", }, notification.AwsSqs.MessageAttributes) + assert.Equal(t, "a1b2c3", notification.AwsSqs.MessageGroupId) } func TestSend_AwsSqs(t *testing.T) { From b16c2586a6a383da8cc7c6ae48474ee627336ae2 Mon Sep 17 00:00:00 2001 From: Thomas Sands Date: Tue, 30 Jan 2024 15:35:20 +0000 Subject: [PATCH 2/2] Add docs for fifo sqs Signed-off-by: Thomas Sands --- docs/services/awssqs.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/services/awssqs.md b/docs/services/awssqs.md index 6b744f47..74126893 100644 --- a/docs/services/awssqs.md +++ b/docs/services/awssqs.md @@ -1,8 +1,8 @@ -# AWS SQS +# AWS SQS ## Parameters -This notification service is capable of sending simple messages to AWS SQS queue. +This notification service is capable of sending simple messages to AWS SQS queue. * `queue` - name of the queue you are intending to send messages to. Can be overridden with target destination annotation. * `region` - region of the sqs queue can be provided via env variable AWS_DEFAULT_REGION @@ -104,3 +104,16 @@ data: - oncePer: obj.metadata.annotations["generation"] ``` + +## FIFO SQS Queues + +FIFO queues require a [MessageGroupId](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageGroupId) to be sent along with every message, every message with a matching MessageGroupId will be processed one by one in order. + +To send to a FIFO SQS Queue you must include a `messageGroupId` in the template such as in the example below: + +```yaml +template.deployment-ready: | + message: | + Deployment {{.obj.metadata.name}} is ready! + messageGroupId: {{.obj.metadata.name}}-deployment +```