Skip to content

Commit

Permalink
Add optional MessageGroupId field to SQS to support FIFO queues
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Sands <[email protected]>
  • Loading branch information
thomassandslyst committed Jan 25, 2024
1 parent c02dc5f commit c142340
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/services/awssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type AwsSqsNotification struct {
MessageAttributes map[string]string `json:"messageAttributes"`
MessageGroupId string `json:"messageGroupId,omitempty"`
}

type AwsSqsOptions struct {
Expand Down Expand Up @@ -125,6 +126,11 @@ func (s awsSqsService) setOptions() []func(*config.LoadOptions) error {
}

func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) {
groupId, err := texttemplate.New(name).Funcs(f).Parse(n.MessageGroupId)
if err != nil {
return nil

Check failure on line 131 in pkg/services/awssqs.go

View workflow job for this annotation

GitHub Actions / Lint Go code

not enough return values

Check failure on line 131 in pkg/services/awssqs.go

View workflow job for this annotation

GitHub Actions / Lint Go code

not enough return values

Check failure on line 131 in pkg/services/awssqs.go

View workflow job for this annotation

GitHub Actions / Lint Go code

not enough return values
}

return func(notification *Notification, vars map[string]interface{}) error {
if notification.AwsSqs == nil {
notification.AwsSqs = &AwsSqsNotification{}
Expand All @@ -137,6 +143,14 @@ func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (
}
}

var groupIdBuff bytes.Buffer
if err := groupId.Execute(&groupIdBuff, vars); err != nil {
return err
}
if val := groupIdBuff.String(); val != "" {
notification.AwsSqs.MessageGroupId = val
}

return nil
}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/services/awssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
MessageAttributes: map[string]string{
"attributeKey": "{{.messageAttributeValue}}",
},
MessageGroupId: "{{.messageGroupId}}",
},
}

Expand All @@ -33,6 +34,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
err = templater(&notification, map[string]interface{}{
"message": "abcdef",
"messageAttributeValue": "123456",
"messageGroupId": "a1b2c3",
})

if !assert.NoError(t, err) {
Expand All @@ -42,6 +44,7 @@ func TestGetTemplater_AwsSqs(t *testing.T) {
assert.Equal(t, map[string]string{
"attributeKey": "123456",
}, notification.AwsSqs.MessageAttributes)
assert.Equal(t, "a1b2c3", notification.AwsSqs.MessageGroupId)
}

func TestSend_AwsSqs(t *testing.T) {
Expand Down

0 comments on commit c142340

Please sign in to comment.