Skip to content

Commit

Permalink
Fix flaky org.opensearch.rest.ReactorNetty4StreamingStressIT.testClos…
Browse files Browse the repository at this point in the history
…eClientStreamingRequest test case (#15859)

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit 260edc5)
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Sep 13, 2024
1 parent d64c5f8 commit 9ee68d8
Showing 1 changed file with 21 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,19 @@
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;

import java.io.InterruptedIOException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.StepVerifier;
import reactor.test.scheduler.VirtualTimeScheduler;

import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.collection.IsEmptyCollection.empty;

public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
@After
Expand All @@ -49,6 +44,8 @@ public void tearDown() throws Exception {
}

public void testCloseClientStreamingRequest() throws Exception {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final AtomicInteger id = new AtomicInteger(0);
final Stream<String> stream = Stream.generate(
() -> "{ \"index\": { \"_index\": \"test-stress-streaming\", \"_id\": \""
Expand All @@ -57,39 +54,28 @@ public void testCloseClientStreamingRequest() throws Exception {
+ "{ \"name\": \"josh\" }\n"
);

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(Duration.ofMillis(500)).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
Flux.fromStream(stream).delayElements(delay, scheduler).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);
TestSubscriber<ByteBuffer> subscriber = TestSubscriber.create();
streamingResponse.getBody().subscribe(subscriber);

final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
// Await for subscriber to receive at least one chunk
assertBusy(() -> assertThat(subscriber.getReceivedOnNext(), not(empty())));

// Close client forceably
executor.schedule(() -> {
client().close();
return null;
}, 2, TimeUnit.SECONDS);
scheduler.advanceTimeBy(delay); /* emit first element */

// Await for subscriber to terminate
subscriber.block(Duration.ofSeconds(10));
assertThat(
subscriber.expectTerminalError(),
anyOf(instanceOf(InterruptedIOException.class), instanceOf(ConnectionClosedException.class))
);
} finally {
executor.shutdown();
if (executor.awaitTermination(1, TimeUnit.SECONDS) == false) {
executor.shutdownNow();
}
}
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.then(() -> {
try {
client().close();
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
})
.then(() -> scheduler.advanceTimeBy(delay))
.expectErrorMatches(t -> t instanceof ConnectionClosedException)
.verify();
}
}

0 comments on commit 9ee68d8

Please sign in to comment.