Skip to content

Commit

Permalink
pubsub/awssqs: Fix BeforeSend/As to enable changes to the sqs input m…
Browse files Browse the repository at this point in the history
…essage (#3201)
  • Loading branch information
szaher authored Jan 4, 2023
1 parent dfaf95a commit fe0a3d7
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
8 changes: 4 additions & 4 deletions pubsub/awssnssqs/awssnssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
// - Topic: (V1) *sns.SNS for OpenSNSTopic, *sqs.SQS for OpenSQSTopic; (V2) *snsv2.Client for OpenSNSTopicV2, *sqsv2.Client for OpenSQSTopicV2
// - Subscription: (V1) *sqs.SQS; (V2) *sqsv2.Client
// - Message: (V1) *sqs.Message; (V2) sqstypesv2.Message
// - Message.BeforeSend: (V1) *sns.PublishInput for OpenSNSTopic, *sqs.SendMessageBatchRequestEntry or *sqs.SendMessageInput(deprecated) for OpenSQSTopic; (V2) *snsv2.PublishInput for OpenSNSTopicV2, sqstypesv2.SendMessageBatchRequestEntry for OpenSQSTopicV2
// - Message.BeforeSend: (V1) *sns.PublishInput for OpenSNSTopic, *sqs.SendMessageBatchRequestEntry or *sqs.SendMessageInput(deprecated) for OpenSQSTopic; (V2) *snsv2.PublishInput for OpenSNSTopicV2, *sqstypesv2.SendMessageBatchRequestEntry for OpenSQSTopicV2
// - Message.AfterSend: (V1) *sns.PublishOutput for OpenSNSTopic, *sqs.SendMessageBatchResultEntry for OpenSQSTopic; (V2) *snsv2.PublishOutput for OpenSNSTopicV2, sqstypesv2.SendMessageBatchResultEntry for OpenSQSTopicV2
// - Error: (V1) awserr.Error, (V2) any error type returned by the service, notably smithy.APIError
package awssnssqs // import "gocloud.dev/pubsub/awssnssqs"
Expand Down Expand Up @@ -678,15 +678,14 @@ func (t *sqsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
if len(attrs) == 0 {
attrs = nil
}
entry := sqstypesv2.SendMessageBatchRequestEntry{
entry := &sqstypesv2.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(len(req.Entries))),
MessageAttributes: attrs,
MessageBody: aws.String(body),
}
req.Entries = append(req.Entries, entry)
if dm.BeforeSend != nil {
asFunc := func(i interface{}) bool {
if p, ok := i.(*sqstypesv2.SendMessageBatchRequestEntry); ok {
if p, ok := i.(**sqstypesv2.SendMessageBatchRequestEntry); ok {
*p = entry
return true
}
Expand All @@ -696,6 +695,7 @@ func (t *sqsTopic) SendBatch(ctx context.Context, dms []*driver.Message) error {
return err
}
}
req.Entries = append(req.Entries, *entry)
}
resp, err := t.clientV2.SendMessageBatch(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/awssnssqs/awssnssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (t awsAsTest) BeforeSend(as func(interface{}) bool) error {
}
case topicKindSQS:
if t.useV2 {
var entry sqstypesv2.SendMessageBatchRequestEntry
var entry *sqstypesv2.SendMessageBatchRequestEntry
if !as(&entry) {
return fmt.Errorf("cast failed for %T", &entry)
}
Expand Down

0 comments on commit fe0a3d7

Please sign in to comment.