Skip to content

Commit

Permalink
Merge #3533 into 3.6.0-M1
Browse files Browse the repository at this point in the history
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Jul 7, 2023
2 parents 997158f + a63fcaf commit 48ba177
Showing 1 changed file with 57 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,10 @@
package reactor.core.publisher;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;
Expand All @@ -27,6 +29,7 @@
import reactor.core.Scannable;
import reactor.core.publisher.FluxFilterFuseable.FilterFuseableConditionalSubscriber;
import reactor.core.publisher.FluxFilterFuseable.FilterFuseableSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.MockUtils;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -162,26 +165,33 @@ public void discardTryOnNextPredicateMiss() {
}

@Test
public void discardPollAsyncPredicateFail() {
public void discardPollAsyncPredicateFail() throws InterruptedException {
Scheduler scheduler = Schedulers.newSingle("discardPollAsync");
CountDownLatch latch = new CountDownLatch(10);
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"), 1)
.contextWrite(previous -> {
Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD);

return Operators.enableOnDiscard(previous, (discarded) -> {
latch.countDown();
previousDiscardHandler.accept(discarded);
});
})
.filter(i -> { throw new IllegalStateException("boom"); })
)
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscarded(1) //publishOn also might discard the rest
.hasDiscardedElementsMatching(list -> !list.contains(0))
.hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
StepVerifier.Assertions assertions =
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.publishOn(scheduler, 1)
.filter(i -> {
throw new IllegalStateException("boom");
})
.contextWrite(previous -> {
Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD);

return Operators.enableOnDiscard(previous, (discarded) -> {
previousDiscardHandler.accept(discarded);
latch.countDown();
});
})
.as(StepVerifier::create)
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("boom")
.verifyThenAssertThat();

Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertions
.hasDiscarded(1) //publishOn also might discard the rest
.hasDiscardedElementsMatching(list -> !list.contains(0))
.hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}

@Test
Expand Down Expand Up @@ -277,27 +287,34 @@ public void discardConditionalTryOnNextPredicateMiss() {
}

@Test
public void discardConditionalPollAsyncPredicateFail() {
public void discardConditionalPollAsyncPredicateFail() throws InterruptedException {
Scheduler scheduler = Schedulers.newSingle("discardPollAsync");
CountDownLatch latch = new CountDownLatch(10);
StepVerifier.create(Flux.range(1, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"))
.filter(i -> { throw new IllegalStateException("boom"); })
.filter(i -> true)
.contextWrite(previous -> {
Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD);

return Operators.enableOnDiscard(previous, (discarded) -> {
latch.countDown();
previousDiscardHandler.accept(discarded);
});
})
)
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscarded(1) //publishOn also discards the rest
.hasDiscardedElementsMatching(list -> !list.contains(0))
.hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
StepVerifier.Assertions assertions =
Flux.range(1, 10)
.publishOn(scheduler, 1)
.filter(i -> {
throw new IllegalStateException("boom");
})
.filter(i -> true)
.contextWrite(previous -> {
Consumer previousDiscardHandler = previous.get(Hooks.KEY_ON_DISCARD);

return Operators.enableOnDiscard(previous, (discarded) -> {
previousDiscardHandler.accept(discarded);
latch.countDown();
});
})
.as(StepVerifier::create)
.expectFusion(Fuseable.ASYNC)
.expectErrorMessage("boom")
.verifyThenAssertThat();

Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

assertions.hasDiscarded(1) //publishOn also discards the rest
.hasDiscardedElementsMatching(list -> !list.contains(0))
.hasDiscardedElementsSatisfying(list -> assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}

@Test
Expand Down

0 comments on commit 48ba177

Please sign in to comment.