From c142340d4458936695385cc176ec4c16b1b83df9 Mon Sep 17 00:00:00 2001 From: Thomas Sands Date: Thu, 25 Jan 2024 13:58:17 +0000 Subject: [PATCH] Add optional MessageGroupId field to SQS to support FIFO queues Signed-off-by: Thomas Sands --- pkg/services/awssqs.go | 14 ++++++++++++++ pkg/services/awssqs_test.go | 3 +++ 2 files changed, 17 insertions(+) diff --git a/pkg/services/awssqs.go b/pkg/services/awssqs.go index d9f22348..1ad31429 100644 --- a/pkg/services/awssqs.go +++ b/pkg/services/awssqs.go @@ -16,6 +16,7 @@ import ( type AwsSqsNotification struct { MessageAttributes map[string]string `json:"messageAttributes"` + MessageGroupId string `json:"messageGroupId,omitempty"` } type AwsSqsOptions struct { @@ -125,6 +126,11 @@ func (s awsSqsService) setOptions() []func(*config.LoadOptions) error { } func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) { + groupId, err := texttemplate.New(name).Funcs(f).Parse(n.MessageGroupId) + if err != nil { + return nil + } + return func(notification *Notification, vars map[string]interface{}) error { if notification.AwsSqs == nil { notification.AwsSqs = &AwsSqsNotification{} @@ -137,6 +143,14 @@ func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) ( } } + var groupIdBuff bytes.Buffer + if err := groupId.Execute(&groupIdBuff, vars); err != nil { + return err + } + if val := groupIdBuff.String(); val != "" { + notification.AwsSqs.MessageGroupId = val + } + return nil }, nil } diff --git a/pkg/services/awssqs_test.go b/pkg/services/awssqs_test.go index 0bfe5645..8a20dabe 100644 --- a/pkg/services/awssqs_test.go +++ b/pkg/services/awssqs_test.go @@ -20,6 +20,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { MessageAttributes: map[string]string{ "attributeKey": "{{.messageAttributeValue}}", }, + MessageGroupId: "{{.messageGroupId}}", }, } @@ -33,6 +34,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { err = templater(¬ification, map[string]interface{}{ "message": "abcdef", "messageAttributeValue": "123456", + "messageGroupId": "a1b2c3", }) if !assert.NoError(t, err) { @@ -42,6 +44,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) { assert.Equal(t, map[string]string{ "attributeKey": "123456", }, notification.AwsSqs.MessageAttributes) + assert.Equal(t, "a1b2c3", notification.AwsSqs.MessageGroupId) } func TestSend_AwsSqs(t *testing.T) {