From f03cb4830b05467856d42445bca231b29f30d493 Mon Sep 17 00:00:00 2001 From: Juozas Skarbalius Date: Fri, 23 Aug 2024 17:29:55 +0300 Subject: [PATCH] Fix tests after rebase --- .../DefaultErrorHandlingStrategy.java | 20 ++-- .../handling/ErrorHandlingStrategy.java | 8 +- .../handling/LongRunningMessageHandler.java | 8 +- .../handling/MessageHandlingRunnable.java | 15 ++- .../message/handling/MessageWrapper.java | 16 +-- .../visibility/VisibilityTimeoutExtender.java | 9 +- .../handling/LogAndRethrowStrategyTest.java | 6 +- ...gRunningMessageHandlerIntegrationTest.java | 100 +++++++++--------- .../LongRunningMessageHandlerTest.java | 6 +- .../handling/MessageHandlingRunnableTest.java | 12 +-- .../VisibilityTimeoutExtenderFactoryTest.java | 2 + .../VisibilityTimeoutExtenderTest.java | 5 +- 12 files changed, 105 insertions(+), 102 deletions(-) diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java b/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java index 4033991..3803512 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/DefaultErrorHandlingStrategy.java @@ -29,26 +29,26 @@ class DefaultErrorHandlingStrategy implements ErrorHandlingStrategy { @Override @SneakyThrows - public void handleWorkerException(Exception e, Message message) { - String messageId = String.valueOf(message.getHeaders().get("id", UUID.class)); - log.error("error while handling message " + messageId + ": " + message.getPayload(), e); + public void handleWorkerException(Exception e, MessageWrapper message) { + String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); + log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), e); throw e; } @Override @SneakyThrows - public void handleWorkerThrowable(Throwable t, Message message) { - String messageId = String.valueOf(message.getHeaders().get("id", UUID.class)); - log.error("error while handling message " + messageId + ": " + message.getPayload(), t); + public void handleWorkerThrowable(Throwable t, MessageWrapper message) { + String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); + log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), t); throw t; } @Override public void handleExtendVisibilityTimeoutException(AwsServiceException e, - Message message) { + MessageWrapper message) { String msg = "error while extending message visibility for " + Objects.requireNonNull( - message.getHeaders().get("id", + message.getMessage().getHeaders().get("id", UUID.class)); log.error(msg, e); throw e; @@ -56,8 +56,8 @@ public void handleExtendVisibilityTimeoutException(AwsServiceException e, } @Override - public void handleAcknowledgeMessageException(AwsServiceException e, Message message) { - String messageId = String.valueOf(message.getHeaders().get("id", UUID.class)); + public void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper message) { + String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); log.error("could not acknowledge " + messageId, e); } diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java b/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java index 9b2bb80..1ebcd5a 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/ErrorHandlingStrategy.java @@ -29,7 +29,7 @@ public interface ErrorHandlingStrategy { * @param message * that was incorrectly processed */ - void handleWorkerException(Exception e, Message message); + void handleWorkerException(Exception e, MessageWrapper message); /** * Defines how a throwable, that is thrown by the worker are handled. If a @@ -40,7 +40,7 @@ public interface ErrorHandlingStrategy { * @param message * that was incorrectly processed */ - void handleWorkerThrowable(Throwable t, Message message); + void handleWorkerThrowable(Throwable t, MessageWrapper message); /** * Defines how exceptions, that are thrown by the timeout extension are handled. @@ -51,7 +51,7 @@ public interface ErrorHandlingStrategy { * @param message that was tried to extend */ - void handleExtendVisibilityTimeoutException(AwsServiceException e, Message message); + void handleExtendVisibilityTimeoutException(AwsServiceException e, MessageWrapper message); /** * Defines how exceptions, that are thrown by the message acknowledgement are handled. @@ -62,7 +62,7 @@ public interface ErrorHandlingStrategy { * @param message that was tried to extend */ - void handleAcknowledgeMessageException(AwsServiceException e, Message message); + void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper message); diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java index 36a0da5..0251e7f 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandler.java @@ -130,8 +130,8 @@ public class LongRunningMessageHandler { * @param message * the message to be processed */ - public void handleMessage(@NonNull Message message) { - String messageId = String.valueOf(message.getHeaders().get("id", UUID.class)); + public void handleMessage(@NonNull MessageWrapper message) { + String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); if (messagesInProcessing.contains(messageId)) { return; @@ -174,7 +174,7 @@ public int getFreeWorkerCapacity() { return messagesInProcessing.free(); } - private void scheduleNewMessageTask(@NonNull Message message, + private void scheduleNewMessageTask(@NonNull MessageWrapper message, ScheduledFuture visibilityTimeoutExtender) { MessageHandlingRunnable messageTask = messageHandlingRunnableFactory.get(worker, message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy); @@ -182,7 +182,7 @@ private void scheduleNewMessageTask(@NonNull Message message, messageProcessingExecutor.submit(messageTask); } - private ScheduledFuture scheduleNewVisibilityTimeoutExtender(@NonNull Message message) { + private ScheduledFuture scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper message) { VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy); return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender, timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java index 09791a0..00ada95 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnable.java @@ -31,7 +31,7 @@ public class MessageHandlingRunnable implements Runnable { private final MessageWorkerWithHeaders worker; - private final Message message; + private final MessageWrapper message; private final FinishedMessageCallback finishedMessageCallback; @@ -42,32 +42,31 @@ public class MessageHandlingRunnable implements Runnable { private final ErrorHandlingStrategy errorHandlingStrategy; MessageHandlingRunnable(@NonNull MessageWorkerWithHeaders worker, - @NonNull Message message, + @NonNull MessageWrapper message, @NonNull FinishedMessageCallback finishedMessageCallback, @NonNull SetWithUpperBound messages, @NonNull ScheduledFuture visibilityTimeoutExtender, @NonNull ErrorHandlingStrategy errorHandlingStrategy) { this.worker = worker; - this.message = message; this.finishedMessageCallback = finishedMessageCallback; this.messages = messages; + this.message = message; this.visibilityTimeoutExtender = visibilityTimeoutExtender; this.errorHandlingStrategy = errorHandlingStrategy; - } @Override public void run() { - String messageId = String.valueOf(message.getHeaders().get("id", UUID.class)); - Acknowledgement acknowledgment = message.getHeaders().get("Acknowledgment", + String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); + Acknowledgement acknowledgment = message.getMessage().getHeaders().get("Acknowledgment", Acknowledgement.class); try { log.info("starting processing of message " + messageId); - O outcome = worker.work(message.getPayload(), message.getHeaders()); + O outcome = worker.work(message.getMessage().getPayload(), message.getMessage().getHeaders()); - finishedMessageCallback.call(message.getPayload(), outcome); + finishedMessageCallback.call(message.getMessage().getPayload(), outcome); acknowledge(messageId, acknowledgment); log.info("message task successfully processed and message acknowledged: " + messageId); } catch (InterruptedException e) { diff --git a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java index ce167bb..07a398d 100644 --- a/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java +++ b/src/main/java/com/mercateo/sqs/utils/message/handling/MessageWrapper.java @@ -1,9 +1,6 @@ package com.mercateo.sqs.utils.message.handling; -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; - -import io.awspring.cloud.messaging.listener.Acknowledgment; +import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement; import java.util.concurrent.TimeUnit; @@ -13,6 +10,8 @@ import lombok.SneakyThrows; import org.springframework.messaging.Message; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; @RequiredArgsConstructor public class MessageWrapper { @@ -33,18 +32,19 @@ public String getReceiptHandle() { @SneakyThrows public synchronized void acknowledge() { - Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgment.class); + Acknowledgement acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgement.class); if (acknowledgment == null) { throw new NullPointerException("there is no \"Acknowledgment\" in the message headers"); } - acknowledgment.acknowledge().get(2, TimeUnit.MINUTES); + acknowledgment.acknowledgeAsync().get(2, TimeUnit.MINUTES); acknowledged = true; } - public synchronized void changeMessageVisibility(AmazonSQS sqsClient, ChangeMessageVisibilityRequest request) { + @SneakyThrows + public synchronized void changeMessageVisibility(SqsAsyncClient sqsClient, ChangeMessageVisibilityRequest request) { if (acknowledged) { return; } - sqsClient.changeMessageVisibility(request); + sqsClient.changeMessageVisibility(request).get(); } } diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java index 34d4931..fe57473 100644 --- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java +++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java @@ -22,6 +22,7 @@ import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; +import com.mercateo.sqs.utils.message.handling.MessageWrapper; import java.net.UnknownHostException; import java.time.Duration; @@ -39,14 +40,14 @@ public class VisibilityTimeoutExtender implements Runnable { private final ChangeMessageVisibilityRequest request; - private final Message message; + private final MessageWrapper message; private final ErrorHandlingStrategy errorHandlingStrategy; private final Retryer retryer; VisibilityTimeoutExtender(@NonNull SqsAsyncClient sqsClient, @NonNull Duration newVisibilityTimeout, - @NonNull Message message, @NonNull String queueUrl, + @NonNull MessageWrapper message, @NonNull String queueUrl, @NonNull ErrorHandlingStrategy errorHandlingStrategy, @NonNull RetryStrategy retryStrategy) { this.sqsClient = sqsClient; @@ -61,7 +62,7 @@ public class VisibilityTimeoutExtender implements Runnable { request = ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) - .receiptHandle(message.getHeaders().get("ReceiptHandle", String.class)) + .receiptHandle(message.getMessage().getHeaders().get("ReceiptHandle", String.class)) .visibilityTimeout(timeoutInSeconds(newVisibilityTimeout)) .build(); } @@ -78,7 +79,7 @@ public void run() { } catch (AwsServiceException e) { errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message); } catch (Exception e) { - log.error("error while extending message visibility for " + message.getHeaders().get("MessageId", + log.error("error while extending message visibility for " + message.getMessage().getHeaders().get("MessageId", String.class), e); throw new RuntimeException(e); } diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java index c277679..495159c 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LogAndRethrowStrategyTest.java @@ -31,7 +31,7 @@ public void setUp() throws Exception { public void handle_throws_exception() { // Given Exception e = new IllegalArgumentException(); - Message message = createMessage(); + MessageWrapper message = createMessage(); // When Throwable throwable = catchThrowable(() -> uut.handleWorkerException(e, message)); @@ -41,11 +41,11 @@ public void handle_throws_exception() { } - private Message createMessage() { + private MessageWrapper createMessage() { HashMap headerMap = new HashMap<>(); headerMap.put("id", "mid"); headerMap.put("Acknowledgment", acknowledgment); - return new GenericMessage<>(3, new MessageHeaders(headerMap)); + return new MessageWrapper<>(new GenericMessage<>(3, new MessageHeaders(headerMap))); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java index 9c27ad7..3db5bbb 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerIntegrationTest.java @@ -72,8 +72,8 @@ public void setUp() throws Exception { @Test public void testHandleMessage_processesOneMessageAndReturns() { // given - Message message = createMessage(1); - String uuid = String.valueOf(message.getHeaders().get("id", UUID.class)); + MessageWrapper message = createMessage(1); + String uuid = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class)); // when Thread thread = new Thread(() -> uut.handleMessage(message)); @@ -81,17 +81,17 @@ public void testHandleMessage_processesOneMessageAndReturns() { // then await().until(() -> !thread.isAlive()); - await().until(() -> message.getPayload().isRunning()); + await().until(() -> message.getMessage().getPayload().isRunning()); assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(uuid); } @Test public void testHandleMessage_processesTwoMessagesAndBlocks() { // given - Message message1 = createMessage(1); - Message message2 = createMessage(2); - List uuids = List.of(String.valueOf(message1.getHeaders().get("id", UUID.class)), - String.valueOf(message2.getHeaders().get("id", UUID.class))); + MessageWrapper message1 = createMessage(1); + MessageWrapper message2 = createMessage(2); + List uuids = List.of(String.valueOf(message1.getMessage().getHeaders().get("id", UUID.class)), + String.valueOf(message2.getMessage().getHeaders().get("id", UUID.class))); Thread thread1 = new Thread(() -> uut.handleMessage(message1)); thread1.start(); @@ -103,27 +103,27 @@ public void testHandleMessage_processesTwoMessagesAndBlocks() { // then await().until(() -> Thread.State.WAITING == thread2.getState()); - await().until(() -> message1.getPayload().isRunning()); - await().until(() -> message2.getPayload().isRunning()); + await().until(() -> message1.getMessage().getPayload().isRunning()); + await().until(() -> message2.getMessage().getPayload().isRunning()); assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(uuids.toArray(String[]::new)); } @Test public void testHandleMessage_processesFourMessagesAndFillsQueue() { // given - Message message1 = createMessage(1); - Message message2 = createMessage(2); - Message message3 = createMessage(3); - Message message4 = createMessage(4); - List uuids = List.of(String.valueOf(message1.getHeaders().get("id", UUID.class)), - String.valueOf(message2.getHeaders().get("id", UUID.class)), - String.valueOf(message3.getHeaders().get("id", UUID.class)), - String.valueOf(message4.getHeaders().get("id", UUID.class))); + MessageWrapper message1 = createMessage(1); + MessageWrapper message2 = createMessage(2); + MessageWrapper message3 = createMessage(3); + MessageWrapper message4 = createMessage(4); + List uuids = List.of(String.valueOf(message1.getMessage().getHeaders().get("id", UUID.class)), + String.valueOf(message2.getMessage().getHeaders().get("id", UUID.class)), + String.valueOf(message3.getMessage().getHeaders().get("id", UUID.class)), + String.valueOf(message4.getMessage().getHeaders().get("id", UUID.class))); new Thread(() -> uut.handleMessage(message1)).start(); new Thread(() -> uut.handleMessage(message2)).start(); - await().until(() -> message1.getPayload().isRunning()); - await().until(() -> message2.getPayload().isRunning()); + await().until(() -> message1.getMessage().getPayload().isRunning()); + await().until(() -> message2.getMessage().getPayload().isRunning()); // when Thread thread3 = new Thread(() -> uut.handleMessage(message3)); @@ -134,8 +134,8 @@ public void testHandleMessage_processesFourMessagesAndFillsQueue() { // then await().until(() -> Thread.State.WAITING == thread3.getState()); await().until(() -> Thread.State.WAITING == thread4.getState()); - assertFalse(message3.getPayload().isRunning()); - assertFalse(message4.getPayload().isRunning()); + assertFalse(message3.getMessage().getPayload().isRunning()); + assertFalse(message4.getMessage().getPayload().isRunning()); assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(uuids.toArray(String[]::new)); verify(scheduledExecutorService, times(4)).scheduleAtFixedRate(any(), anyLong(), anyLong(), any()); @@ -144,23 +144,23 @@ public void testHandleMessage_processesFourMessagesAndFillsQueue() { @Test public void testHandleMessage_processesSixMessageAndCrashes() { // given - Message message1 = createMessage(1); - Message message2 = createMessage(2); - Message message3 = createMessage(3); - Message message4 = createMessage(4); - Message message5 = createMessage(5); - Message message6 = createMessage(6); - - List uuids = List.of(message1.getHeaders().get("id", UUID.class).toString(), - message2.getHeaders().get("id", UUID.class).toString(), - message3.getHeaders().get("id", UUID.class).toString(), - message4.getHeaders().get("id", UUID.class).toString(), - message5.getHeaders().get("id", UUID.class).toString()); + MessageWrapper message1 = createMessage(1); + MessageWrapper message2 = createMessage(2); + MessageWrapper message3 = createMessage(3); + MessageWrapper message4 = createMessage(4); + MessageWrapper message5 = createMessage(5); + MessageWrapper message6 = createMessage(6); + + List uuids = List.of(message1.getMessage().getHeaders().get("id", UUID.class).toString(), + message2.getMessage().getHeaders().get("id", UUID.class).toString(), + message3.getMessage().getHeaders().get("id", UUID.class).toString(), + message4.getMessage().getHeaders().get("id", UUID.class).toString(), + message5.getMessage().getHeaders().get("id", UUID.class).toString()); new Thread(() -> uut.handleMessage(message1)).start(); new Thread(() -> uut.handleMessage(message2)).start(); - await().until(() -> message1.getPayload().isRunning()); - await().until(() -> message2.getPayload().isRunning()); + await().until(() -> message1.getMessage().getPayload().isRunning()); + await().until(() -> message2.getMessage().getPayload().isRunning()); Thread thread3 = new Thread(() -> uut.handleMessage(message3)); thread3.start(); @@ -183,36 +183,36 @@ public void testHandleMessage_processesSixMessageAndCrashes() { @Test public void testHandleMessage_startsQueuedProcess() { // given - Message message1 = createMessage(1); - Message message2 = createMessage(2); - Message message3 = createMessage(3); - List uuids = List.of(String.valueOf(message1.getHeaders().get("id", UUID.class)), - String.valueOf(message3.getHeaders().get("id", UUID.class))); + MessageWrapper message1 = createMessage(1); + MessageWrapper message2 = createMessage(2); + MessageWrapper message3 = createMessage(3); + List uuids = List.of(String.valueOf(message1.getMessage().getHeaders().get("id", UUID.class)), + String.valueOf(message3.getMessage().getHeaders().get("id", UUID.class))); new Thread(() -> uut.handleMessage(message1)).start(); - await().until(() -> message1.getPayload().isRunning()); + await().until(() -> message1.getMessage().getPayload().isRunning()); Thread thread2 = new Thread(() -> uut.handleMessage(message2)); thread2.start(); - await().until(() -> message2.getPayload().isRunning()); + await().until(() -> message2.getMessage().getPayload().isRunning()); Thread thread3 = new Thread(() -> uut.handleMessage(message3)); thread3.start(); // when - message2.getPayload().stop(); + message2.getMessage().getPayload().stop(); // then await().until(() -> Thread.State.WAITING == thread2.getState()); await().until(() -> Thread.State.WAITING == thread3.getState()); - assertTrue(message3.getPayload().isRunning()); + assertTrue(message3.getMessage().getPayload().isRunning()); assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(uuids.toArray(String[]::new)); } @Test public void testHandleMessage_resumesWaitingThreads() { // given - Message message1 = createMessage(1); - Message message2 = createMessage(2); - List uuids = List.of(String.valueOf(message2.getHeaders().get("id", UUID.class))); + MessageWrapper message1 = createMessage(1); + MessageWrapper message2 = createMessage(2); + List uuids = List.of(String.valueOf(message2.getMessage().getHeaders().get("id", UUID.class))); Thread thread1 = new Thread(() -> uut.handleMessage(message1)); thread1.start(); @@ -223,20 +223,20 @@ public void testHandleMessage_resumesWaitingThreads() { await().until(() -> Thread.State.WAITING == thread2.getState()); // when - message1.getPayload().stop(); + message1.getMessage().getPayload().stop(); // then await().until(() -> !thread2.isAlive()); assertThat(uut.getMessagesInProcessing().getBackingSet()).containsOnly(uuids.toArray(String[]::new)); } - private Message createMessage(int number) { + private MessageWrapper createMessage(int number) { Map headers = new HashMap<>(); headers.put("id", UUID.fromString("bf308aa2-bf48-49b8-a839-61611c71043" + number).toString()); headers.put("ReceiptHandle", "receiptHandle" + number); MessageHeaders messageHeaders = new MessageHeaders(headers); - return new GenericMessage<>(new InputObject(), messageHeaders); + return new MessageWrapper<>(new GenericMessage<>(new InputObject(), messageHeaders)); } private class TestWorkerWithHeaders implements MessageWorkerWithHeaders { diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java index a7a27bc..f08ab7e 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/LongRunningMessageHandlerTest.java @@ -100,7 +100,7 @@ public void timeUntilTimeOutNegative() throws Exception { @Test public void testHandleMessage_handlesExceptionDuringTimeoutExtension() { // given - Message message = createMessage(); + MessageWrapper message = createMessage(); RuntimeException exception = new RuntimeException("test exception"); when(timeoutExtensionExecutor.scheduleAtFixedRate(any(), anyLong(), anyLong(), any())) .thenThrow(exception); @@ -112,12 +112,12 @@ public void testHandleMessage_handlesExceptionDuringTimeoutExtension() { assertThat(uut.getMessagesInProcessing().getBackingSet()).isEmpty(); } - private Message createMessage() { + private MessageWrapper createMessage() { Map headers = new HashMap<>(); String uuid = UUID.fromString("bf308aa2-bf48-49b8-a839-61611c710430").toString(); headers.put("id", uuid); MessageHeaders messageHeaders = new MessageHeaders(headers); - return new GenericMessage<>(1, messageHeaders); + return new MessageWrapper<>(new GenericMessage<>(1, messageHeaders)); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java index b37d378..eb411a7 100644 --- a/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java +++ b/src/test/java/com/mercateo/sqs/utils/message/handling/MessageHandlingRunnableTest.java @@ -33,7 +33,7 @@ class MessageHandlingRunnableTest { @Mock private Acknowledgement acknowledgment; - private Message message; + private MessageWrapper message; @Mock private FinishedMessageCallback finishedMessageCallback; @@ -57,8 +57,8 @@ public void setUp() throws Exception { HashMap headerMap = new HashMap<>(); headerMap.put("MessageId", "bf308aa2-bf48-49b8-a839-61611c710431"); headerMap.put("Acknowledgment", acknowledgment); - message = new GenericMessage<>(3, new MessageHeaders(headerMap)); - messageGeneratedUUID = message.getHeaders().getId(); + message = new MessageWrapper<>(new GenericMessage<>(3, new MessageHeaders(headerMap))); + messageGeneratedUUID = message.getMessage().getHeaders().getId(); uut = new MessageHandlingRunnable<>(worker, message, finishedMessageCallback, messages, visibilityTimeoutExtender, errorHandlingStrategy); } @@ -77,7 +77,7 @@ void testNullContracts() throws Exception { @Test void testRun() throws Throwable { // given - when(worker.work(3, message.getHeaders())).thenReturn("3S"); + when(worker.work(3, message.getMessage().getHeaders())).thenReturn("3S"); when(acknowledgment.acknowledgeAsync()).thenReturn(mock(CompletableFuture.class)); // when @@ -94,7 +94,7 @@ void testRun() throws Throwable { void testRun_throws_workerException_and_does_not_ack() throws Throwable { // given Exception e = new IllegalArgumentException(); - doThrow(e).when(worker).work(3, message.getHeaders()); + doThrow(e).when(worker).work(3, message.getMessage().getHeaders()); doThrow(e).when(errorHandlingStrategy).handleWorkerException(e, message); // when @@ -114,7 +114,7 @@ void testRun_throws_workerException_and_does_not_ack() throws Throwable { void testRun_throws_workerException_and_acks() throws Throwable { // given Exception e = new IllegalArgumentException(); - doThrow(e).when(worker).work(3, message.getHeaders()); + doThrow(e).when(worker).work(3, message.getMessage().getHeaders()); when(acknowledgment.acknowledgeAsync()).thenReturn(mock(CompletableFuture.class)); // when diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java index 491a244..419b0ee 100644 --- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java +++ b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactoryTest.java @@ -1,6 +1,7 @@ package com.mercateo.sqs.utils.visibility; import com.google.common.testing.NullPointerTester; +import com.mercateo.sqs.utils.message.handling.MessageWrapper; import com.mercateo.sqs.utils.queue.Queue; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +29,7 @@ public void testNullContracts() throws Exception { // given NullPointerTester nullPointerTester = new NullPointerTester(); nullPointerTester.setDefault(Queue.class, Mockito.mock(Queue.class)); + nullPointerTester.setDefault(MessageWrapper.class, Mockito.mock(MessageWrapper.class)); // when nullPointerTester.testInstanceMethods(uut, NullPointerTester.Visibility.PACKAGE); diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java index e2c5fcf..b81ddc6 100644 --- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java +++ b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java @@ -12,6 +12,7 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.testing.NullPointerTester; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; +import com.mercateo.sqs.utils.message.handling.MessageWrapper; import java.net.UnknownHostException; import java.time.Duration; @@ -47,8 +48,8 @@ public void setUp() throws Exception { MockitoAnnotations.openMocks(this); HashMap headerMap = new HashMap<>(); headerMap.put("ReceiptHandle", "rhd"); - GenericMessage message = new GenericMessage<>(new Object(), new MessageHeaders( - headerMap)); + MessageWrapper message = new MessageWrapper<>(new GenericMessage<>(new Object(), new MessageHeaders( + headerMap))); RetryStrategy retryStrategy = new RetryStrategy(WaitStrategies.fixedWait(1, TimeUnit.MICROSECONDS), StopStrategies.stopAfterAttempt(5)); uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofSeconds(10*60), message, "queue",