Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Amqp] Moving tests using VirtualTimeScheduler to dedicated test classes to run them in sequential and isolated mode #29492

Merged
merged 4 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;
import java.util.HashMap;

/**
* Tests for {@link AmqpChannelProcessor} using
* {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs
* to run in isolated and sequential.
*/
@Execution(ExecutionMode.SAME_THREAD)
@Isolated
public class AmqpChannelProcessorIsolatedTest {
private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(30);
private final TestObject connection1 = new TestObject();

@Mock
private AmqpRetryPolicy retryPolicy;
private AmqpChannelProcessor<TestObject> channelProcessor;
private AutoCloseable mocksCloseable;

@BeforeEach
void setup() {
mocksCloseable = MockitoAnnotations.openMocks(this);
channelProcessor = new AmqpChannelProcessor<>("namespace-test", TestObject::getStates, retryPolicy, new HashMap<>());
}

@AfterEach
void teardown() throws Exception {
// Tear down any inline mocks to avoid memory leaks.
// https://github.com/mockito/mockito/wiki/What's-new-in-Mockito-2#mockito-2250
Mockito.framework().clearInlineMock(this);
if (mocksCloseable != null) {
mocksCloseable.close();
}
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
void doesNotEmitConnectionWhenNotActive() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.expectNoEvent(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED))
.expectNoEvent(Duration.ofMinutes(10))
.thenCancel()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
@Execution(ExecutionMode.SAME_THREAD)
void waitsLongPeriodOfTimeForConnection() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(connection1)
.expectComplete()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
@Execution(ExecutionMode.SAME_THREAD)
void waitsLongPeriodOfTimeForChainedConnections() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();
final String contents = "Emitted something after 10 minutes.";

// Act & Assert
final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
StepVerifier.withVirtualTime(() -> {
return publisher.next(connection1).flux()
.subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents));
}, () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(contents)
.expectComplete()
.verify(VERIFY_TIMEOUT);
} finally {
virtualTimeScheduler.dispose();
}
}

static final class TestObject {
private final TestPublisher<AmqpEndpointState> processor = TestPublisher.createCold();

public Flux<AmqpEndpointState> getStates() {
return processor.flux();
}

public TestPublisher<AmqpEndpointState> getSink() {
return processor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.mockito.MockitoAnnotations;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;
Expand Down Expand Up @@ -308,73 +307,13 @@ void errorsWhenResubscribingOnTerminated() {
assertTrue(channelProcessor.isChannelClosed());
}

@Test
void doesNotEmitConnectionWhenNotActive() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.expectNoEvent(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.UNINITIALIZED))
.expectNoEvent(Duration.ofMinutes(10))
.thenCancel()
.verify(VERIFY_TIMEOUT);
}

@Test
void requiresNonNull() {
Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onNext(null));

Assertions.assertThrows(NullPointerException.class, () -> channelProcessor.onError(null));
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
void waitsLongPeriodOfTimeForConnection() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();

// Act & Assert
StepVerifier.withVirtualTime(() -> publisher.next(connection1).flux()
.subscribeWith(channelProcessor), () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(connection1)
.expectComplete()
.verify(VERIFY_TIMEOUT);
}

/**
* Verifies that this AmqpChannelProcessor won't time out even if the 5 minutes default timeout occurs. This is
* possible when there is a disconnect for a long period of time.
*/
@Test
void waitsLongPeriodOfTimeForChainedConnections() {
// Arrange
final TestPublisher<TestObject> publisher = TestPublisher.createCold();
final String contents = "Emitted something after 10 minutes.";

// Act & Assert
StepVerifier.withVirtualTime(() -> {
return publisher.next(connection1).flux()
.subscribeWith(channelProcessor).flatMap(e -> Mono.just(contents));
}, () -> virtualTimeScheduler, 1)
.expectSubscription()
.thenAwait(Duration.ofMinutes(10))
.then(() -> connection1.getSink().next(AmqpEndpointState.ACTIVE))
.expectNext(contents)
.expectComplete()
.verify(VERIFY_TIMEOUT);
}

static final class TestObject {
private final TestPublisher<AmqpEndpointState> processor = TestPublisher.createCold();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,6 @@ public void sendWorkTimeout() throws IOException {
}).when(reactorDispatcher).invoke(any(Runnable.class));

doAnswer(invocation -> {
System.out.println("Running send timeout work.");

final Runnable argument = invocation.getArgument(0);
argument.run();
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.scheduler.VirtualTimeScheduler;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* This class contains tests for {@link RetryUtil} using
* {@link reactor.test.scheduler.VirtualTimeScheduler} hence needs
* to run in isolated and sequential.
*/
@Execution(ExecutionMode.SAME_THREAD)
@Isolated
public class RetryUtilIsolatedTest {
/**
* Tests a retry that times out on a Flux.
*/
@Test
@Execution(ExecutionMode.SAME_THREAD)
void withRetryFluxEmitsItemsLaterThanTimeout() {
// Arrange
final String timeoutMessage = "Operation timed out.";
final Duration timeout = Duration.ofSeconds(5);
final AmqpRetryOptions options = new AmqpRetryOptions()
.setDelay(Duration.ofSeconds(1))
.setMaxRetries(2)
.setTryTimeout(timeout);
final Duration totalWaitTime = Duration.ofSeconds(options.getMaxRetries() * options.getDelay().getSeconds());

final AtomicInteger resubscribe = new AtomicInteger();
final TestPublisher<AmqpTransportType> singleItem = TestPublisher.create();

final Flux<AmqpTransportType> flux = singleItem.flux()
.doOnSubscribe(s -> resubscribe.incrementAndGet());

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create();
try {
// Act & Assert
StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(flux, options, timeoutMessage),
() -> virtualTimeScheduler, 1)
.expectSubscription()
.then(() -> singleItem.next(AmqpTransportType.AMQP_WEB_SOCKETS))
.expectNext(AmqpTransportType.AMQP_WEB_SOCKETS)
.expectNoEvent(totalWaitTime)
.thenCancel()
.verify();
} finally {
virtualTimeScheduler.dispose();
}

assertEquals(1, resubscribe.get());
}

static Stream<Throwable> withNonTransientError() {
return Stream.of(
new AmqpException(false, "Test-exception", new AmqpErrorContext("test-ns")),
new IllegalStateException("Some illegal State")
);
}

@ParameterizedTest
@MethodSource
@Execution(ExecutionMode.SAME_THREAD)
void withNonTransientError(Throwable nonTransientError) {
// Arrange
final String timeoutMessage = "Operation timed out.";
final Duration timeout = Duration.ofSeconds(30);
final AmqpRetryOptions options = new AmqpRetryOptions()
.setMode(AmqpRetryMode.FIXED)
.setDelay(Duration.ofSeconds(1))
.setMaxRetries(1)
.setTryTimeout(timeout);

final Flux<Integer> stream = Flux.concat(
Flux.defer(() -> Flux.just(0, 1, 2)),
Flux.defer(() -> Flux.error(nonTransientError)),
Flux.defer(() -> Flux.just(3, 4)));

final VirtualTimeScheduler virtualTimeScheduler = VirtualTimeScheduler.create(true);

// Act & Assert
try {
StepVerifier.withVirtualTime(() -> RetryUtil.withRetry(stream, options, timeoutMessage),
() -> virtualTimeScheduler, 4)
.expectNext(0, 1, 2)
.expectErrorMatches(error -> error.equals(nonTransientError))
.verify();
} finally {
virtualTimeScheduler.dispose();
}
}
}
Loading