From 9f5893f6b61b58e73be8973ba66a01733c517ed5 Mon Sep 17 00:00:00 2001 From: Jiro Kugiya Date: Mon, 27 May 2024 14:06:08 +0900 Subject: [PATCH 1/3] test: Reproduces failiing with system attribute names --- build.sbt | 2 +- .../sqs/aws/AmazonJavaSdkNewTestSuite.scala | 23 +++++++++++++ .../rest/sqs/client/AwsSdkV1SqsClient.scala | 33 +++++++++++++++++-- .../rest/sqs/client/AwsSdkV2SqsClient.scala | 6 +++- .../elasticmq/rest/sqs/client/SqsClient.scala | 2 ++ 5 files changed, 62 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 2d2c15d8..806ab8c8 100644 --- a/build.sbt +++ b/build.sbt @@ -39,7 +39,7 @@ val scalatest = "org.scalatest" %% "scalatest" % "3.2.18" val awaitility = "org.awaitility" % "awaitility-scala" % "4.2.1" val amazonJavaSdkSqs = "com.amazonaws" % "aws-java-sdk-sqs" % "1.12.699" exclude ("commons-logging", "commons-logging") -val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.25.30" +val amazonJavaV2SdkSqs = "software.amazon.awssdk" % "sqs" % "2.25.60" val pekkoVersion = "1.0.2" val pekkoHttpVersion = "1.0.1" diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala index 2168742a..4b088552 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/aws/AmazonJavaSdkNewTestSuite.scala @@ -363,6 +363,29 @@ abstract class AmazonJavaSdkNewTestSuite result should contain theSameElementsAs Set(queue1Url, queue2Url, queue4Url) } + test("should receive system all attributes") { + // given + val queueUrl = testClient.createQueue( + "testQueue2.fifo", + Map(FifoQueueAttributeName -> "true", ContentBasedDeduplicationAttributeName -> "false") + ) + testClient.sendMessage( + queueUrl, + "test123", + messageGroupId = Option("gp1"), + messageDeduplicationId = Option("dup1") + ) + + // when + val messages = testClient.receiveMessage(queueUrl, systemAttributes = List("All")) + + // then + messages should have size 1 + val messageAttributes = messages.head.attributes + messageAttributes(MessageDeduplicationId) shouldBe "dup1" + messageAttributes(MessageGroupId) shouldBe "gp1" + } + private def doTestSendAndReceiveMessageWithAttributes( content: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala index d4f28f26..0be9bf1f 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala @@ -1,7 +1,30 @@ package org.elasticmq.rest.sqs.client import com.amazonaws.services.sqs.AmazonSQS -import com.amazonaws.services.sqs.model.{BatchResultErrorEntry, CancelMessageMoveTaskRequest, ChangeMessageVisibilityBatchRequest, ChangeMessageVisibilityBatchRequestEntry, CreateQueueRequest, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry, GetQueueAttributesRequest, GetQueueUrlRequest, ListDeadLetterSourceQueuesRequest, ListMessageMoveTasksRequest, MessageAttributeValue, MessageSystemAttributeValue, PurgeQueueRequest, QueueDoesNotExistException, ReceiveMessageRequest, ResourceNotFoundException, SendMessageBatchRequest, SendMessageBatchRequestEntry, SendMessageRequest, StartMessageMoveTaskRequest, UnsupportedOperationException} +import com.amazonaws.services.sqs.model.{ + BatchResultErrorEntry, + CancelMessageMoveTaskRequest, + ChangeMessageVisibilityBatchRequest, + ChangeMessageVisibilityBatchRequestEntry, + CreateQueueRequest, + DeleteMessageBatchRequest, + DeleteMessageBatchRequestEntry, + GetQueueAttributesRequest, + GetQueueUrlRequest, + ListDeadLetterSourceQueuesRequest, + ListMessageMoveTasksRequest, + MessageAttributeValue, + MessageSystemAttributeValue, + PurgeQueueRequest, + QueueDoesNotExistException, + ReceiveMessageRequest, + ResourceNotFoundException, + SendMessageBatchRequest, + SendMessageBatchRequestEntry, + SendMessageRequest, + StartMessageMoveTaskRequest, + UnsupportedOperationException +} import org.elasticmq._ import java.nio.ByteBuffer @@ -49,6 +72,8 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + messageGroupId: Option[String] = None, + messageDeduplicationId: Option[String] = None, awsTraceHeader: Option[String] = None ): Either[SqsClientError, Unit] = interceptErrors { client.sendMessage( @@ -59,6 +84,8 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { mapAwsTraceHeader(awsTraceHeader) ) .withMessageAttributes(mapMessageAttributes(messageAttributes)) + .withMessageGroupId(messageGroupId.orNull) + .withMessageDeduplicationId(messageDeduplicationId.orNull) ) } @@ -190,7 +217,9 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { private def mapAwsTraceHeader(awsTraceHeader: Option[MessageMoveTaskStatus]) = { awsTraceHeader - .map(header => Map("AWSTraceHeader" -> new MessageSystemAttributeValue().withStringValue(header).withDataType("String")).asJava) + .map(header => + Map("AWSTraceHeader" -> new MessageSystemAttributeValue().withStringValue(header).withDataType("String")).asJava + ) .orNull } diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala index fbb5aa17..2cbbf54d 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala @@ -41,6 +41,8 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + messageGroupId: Option[String] = None, + messageDeduplicationId: Option[String] = None, awsTraceHeader: Option[String] = None ): Either[SqsClientError, Unit] = interceptErrors { client.sendMessage( @@ -50,6 +52,8 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e .messageBody(messageBody) .messageSystemAttributes(mapAwsTraceHeader(awsTraceHeader)) .messageAttributes(mapMessageAttributes(messageAttributes)) + .messageGroupId(messageGroupId.orNull) + .messageDeduplicationId(messageDeduplicationId.orNull) .build() ) } @@ -181,7 +185,7 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e ReceiveMessageRequest .builder() .queueUrl(queueUrl) - .attributeNamesWithStrings(systemAttributes.asJava) + .messageSystemAttributeNames(systemAttributes.map(SdkMessageSystemAttributeName.fromValue).asJava) .messageAttributeNames(messageAttributes.asJava) .maxNumberOfMessages(maxNumberOfMessages.map(Int.box).orNull) .build() diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala index 51c2843b..c2d4b5c4 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala @@ -18,6 +18,8 @@ trait SqsClient { queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + messageGroupId: Option[String] = None, + messageDeduplicationId: Option[String] = None, awsTraceHeader: Option[String] = None ): Either[SqsClientError, Unit] From cc53c22f833dd56c47faa3cd548479aeda87169e Mon Sep 17 00:00:00 2001 From: Jiro Kugiya Date: Mon, 27 May 2024 15:18:48 +0900 Subject: [PATCH 2/3] fix: Handles MessageSystemAttributeNames --- .../rest/sqs/ReceiveMessageDirectives.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala index 4c6bf62e..c5d45da7 100644 --- a/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala +++ b/rest/rest-sqs/src/main/scala/org/elasticmq/rest/sqs/ReceiveMessageDirectives.scala @@ -94,11 +94,12 @@ trait ReceiveMessageDirectives { ) val attributeNames = requestParameters.AttributeNames.getOrElse(List.empty) + val systemAttributeNames = requestParameters.MessageSystemAttributeNames.getOrElse(List.empty) def calculateAttributeValues(msg: MessageData): List[(String, String)] = { import AttributeValuesCalculator.Rule possiblyEmptyAttributeValuesCalculator.calculate[String]( - attributeNames, + attributeNames ++ systemAttributeNames, Rule(SenderIdAttribute, () => Some("127.0.0.1")), Rule(SentTimestampAttribute, () => Some(msg.created.toInstant.toEpochMilli.toString)), Rule(ApproximateReceiveCountAttribute, () => Some(msg.statistics.approximateReceiveCount.toString)), @@ -158,6 +159,7 @@ trait ReceiveMessageDirectives { case class ReceiveMessageActionRequest( AttributeNames: Option[List[String]], + MessageSystemAttributeNames: Option[List[String]], MaxNumberOfMessages: Option[Int], MessageAttributeNames: Option[List[String]], QueueUrl: String, @@ -169,6 +171,7 @@ trait ReceiveMessageDirectives { object ReceiveMessageActionRequest { def apply( AttributeNames: Option[List[String]], + MessageSystemAttributeNames: Option[List[String]], MaxNumberOfMessages: Option[Int], MessageAttributeNames: Option[List[String]], QueueUrl: String, @@ -179,6 +182,9 @@ trait ReceiveMessageDirectives { new ReceiveMessageActionRequest( AttributeNames = AttributeNames.map(atr => if (atr.contains("All")) MessageReadeableAttributeNames.AllAttributeNames else atr), + MessageSystemAttributeNames = MessageSystemAttributeNames.map(atr => + if (atr.contains("All")) MessageReadeableAttributeNames.AllAttributeNames else atr + ), MaxNumberOfMessages = MaxNumberOfMessages, MessageAttributeNames = MessageAttributeNames, QueueUrl = QueueUrl, @@ -188,7 +194,7 @@ trait ReceiveMessageDirectives { ) } - implicit val requestJsonFormat: RootJsonFormat[ReceiveMessageActionRequest] = jsonFormat7( + implicit val requestJsonFormat: RootJsonFormat[ReceiveMessageActionRequest] = jsonFormat8( ReceiveMessageActionRequest.apply ) @@ -196,6 +202,7 @@ trait ReceiveMessageDirectives { new FlatParamsReader[ReceiveMessageActionRequest] { override def read(params: Map[String, String]): ReceiveMessageActionRequest = { val attributeNames = attributeNamesReader.read(params, MessageReadeableAttributeNames.AllAttributeNames) + val systemAttributeNames = attributeNamesReader.read(params, MessageReadeableAttributeNames.AllAttributeNames) val maxNumberOfMessages = params.get(MessageReadeableAttributeNames.MaxNumberOfMessagesAttribute).map(_.toInt) val messageAttributeNames = getMessageAttributeNames(params).toList val queueUrl = requiredParameter(params)(QueueUrlParameter) @@ -204,6 +211,7 @@ trait ReceiveMessageDirectives { val waitTimeSeconds = params.get(MessageReadeableAttributeNames.WaitTimeSecondsAttribute).map(_.toLong) ReceiveMessageActionRequest( Some(attributeNames), + Some(systemAttributeNames), maxNumberOfMessages, Some(messageAttributeNames), queueUrl, From 98197b6da395e5ec579404214dfaec20c6bb3058 Mon Sep 17 00:00:00 2001 From: Jiro Kugiya Date: Mon, 27 May 2024 15:45:22 +0900 Subject: [PATCH 3/3] fix: corrects parameter orders --- .../org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala | 4 ++-- .../org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala | 4 ++-- .../test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala index 0be9bf1f..8e76f2d4 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV1SqsClient.scala @@ -72,9 +72,9 @@ class AwsSdkV1SqsClient(client: AmazonSQS) extends SqsClient { queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None, messageGroupId: Option[String] = None, - messageDeduplicationId: Option[String] = None, - awsTraceHeader: Option[String] = None + messageDeduplicationId: Option[String] = None ): Either[SqsClientError, Unit] = interceptErrors { client.sendMessage( new SendMessageRequest() diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala index 2cbbf54d..8bd9855e 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/AwsSdkV2SqsClient.scala @@ -41,9 +41,9 @@ class AwsSdkV2SqsClient(client: software.amazon.awssdk.services.sqs.SqsClient) e queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None, messageGroupId: Option[String] = None, - messageDeduplicationId: Option[String] = None, - awsTraceHeader: Option[String] = None + messageDeduplicationId: Option[String] = None ): Either[SqsClientError, Unit] = interceptErrors { client.sendMessage( SendMessageRequest diff --git a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala index c2d4b5c4..9060f047 100644 --- a/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala +++ b/rest/rest-sqs-testing-amazon-java-sdk/src/test/scala/org/elasticmq/rest/sqs/client/SqsClient.scala @@ -18,9 +18,9 @@ trait SqsClient { queueUrl: QueueUrl, messageBody: String, messageAttributes: Map[String, MessageAttribute] = Map.empty, + awsTraceHeader: Option[String] = None, messageGroupId: Option[String] = None, - messageDeduplicationId: Option[String] = None, - awsTraceHeader: Option[String] = None + messageDeduplicationId: Option[String] = None ): Either[SqsClientError, Unit] def receiveMessage(