diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index 3fa7a54ad58..d0cc6f1c2e6 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -235,7 +234,7 @@ public void restartUptoMaxRetries() throws Exception { Thread.sleep(500); assertEquals(BackpressureTimeoutException.class, probe.expectError().getClass()); probe.request(1); // send demand - probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); // but no more restart + probe.expectNoMessage(Duration.ofMillis(200)); // but no more restart } }; } diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java index c5d085c458b..06224429b73 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java @@ -15,11 +15,17 @@ import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.*; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.FlowShape; +import org.apache.pekko.stream.Inlet; +import org.apache.pekko.stream.Outlet; import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.stage.*; +import org.apache.pekko.stream.stage.AbstractInHandler; +import org.apache.pekko.stream.stage.AbstractOutHandler; +import org.apache.pekko.stream.stage.GraphStage; +import org.apache.pekko.stream.stage.GraphStageLogic; import org.apache.pekko.stream.testkit.TestPublisher; import org.apache.pekko.stream.testkit.TestSubscriber; import org.apache.pekko.stream.testkit.javadsl.TestSink; @@ -28,9 +34,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; +import java.time.Duration; public class RecipeHold extends RecipeTest { static ActorSystem system; @@ -188,10 +193,8 @@ public void workForVersion2() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS); - sub.request(1); - sub.expectNoMessage(timeout); + sub.expectNoMessage(Duration.ofMillis(200)); pub.sendNext(1); sub.expectNext(1); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java index d8bf30def9d..c76460c9b6f 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java @@ -15,7 +15,10 @@ import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.*; +import org.apache.pekko.stream.ClosedShape; +import org.apache.pekko.stream.FanInShape2; +import org.apache.pekko.stream.FlowShape; +import org.apache.pekko.stream.SourceShape; import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.stream.testkit.TestPublisher; import org.apache.pekko.stream.testkit.TestSubscriber; @@ -25,10 +28,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.Arrays; -import java.util.concurrent.TimeUnit; public class RecipeManualTrigger extends RecipeTest { static ActorSystem system; @@ -85,7 +87,7 @@ public void zipped() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(100); sub.expectSubscription().request(1000); sub.expectNoMessage(timeout); @@ -140,7 +142,7 @@ public void zipWith() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(100); sub.expectSubscription().request(1000); sub.expectNoMessage(timeout); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java index ac11660c8a4..fbdc3438a5b 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java @@ -31,6 +31,7 @@ import org.junit.Test; import scala.concurrent.Await; +import java.time.Duration; import java.util.concurrent.TimeUnit; public class RecipeMissedTicks extends RecipeTest { @@ -83,8 +84,7 @@ class Tick {} pub.sendNext(Tick); pub.sendNext(Tick); - scala.concurrent.duration.FiniteDuration timeout = - scala.concurrent.duration.FiniteDuration.create(200, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(200); Await.ready(latch, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS)); diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala index 8d1256fdc75..8cab7079c12 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala @@ -13,8 +13,15 @@ package org.apache.pekko.stream.testkit +import java.io.{ PrintWriter, StringWriter } +import java.util.concurrent.CountDownLatch + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.reflect.ClassTag + import org.apache.pekko -import org.reactivestreams.{ Publisher, Subscriber, Subscription } import pekko.actor.{ ActorRef, ActorSystem, @@ -25,18 +32,13 @@ import pekko.actor.{ import pekko.japi._ import pekko.stream._ import pekko.stream.impl._ -import pekko.testkit.TestActor.AutoPilot import pekko.testkit.{ TestActor, TestProbe } +import pekko.testkit.TestActor.AutoPilot import pekko.util.JavaDurationConverters._ -import pekko.util.ccompat.JavaConverters._ import pekko.util.ccompat._ +import pekko.util.ccompat.JavaConverters._ -import java.io.{ PrintWriter, StringWriter } -import java.util.concurrent.CountDownLatch -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.duration._ -import scala.reflect.ClassTag +import org.reactivestreams.{ Publisher, Subscriber, Subscription } /** * Provides factory methods for various Publishers. @@ -100,7 +102,9 @@ object TestPublisher { /** * JAVA API + * * Probe that implements [[org.reactivestreams.Publisher]] interface. + * @since 1.1.0 */ def create[T](autoOnSubscribe: Boolean, system: ClassicActorSystemProvider): ManualProbe[T] = new ManualProbe(autoOnSubscribe)(system.classicSystem) @@ -149,6 +153,7 @@ object TestPublisher { /** * JAVA API + * @since 1.1.0 */ def executeAfterSubscription[T](f: function.Creator[T]): T = { executeAfterSubscription(f.create()) @@ -209,11 +214,11 @@ object TestPublisher { /** * JAVA API + * * Expect no messages for a given duration. + * @since 1.1.0 */ - def expectNoMessage(max: java.time.Duration): Self = { - expectNoMessage(max.asScala) - } + def expectNoMessage(max: java.time.Duration): Self = expectNoMessage(max.asScala) /** * Receive messages for a given duration or until one does not match a given partial function. @@ -227,14 +232,15 @@ object TestPublisher { /** * JAVA API + * * Receive messages for a given duration or until one does not match a given partial function. + * @since 1.1.0 */ def receiveWhile[T](max: java.time.Duration, idle: java.time.Duration, messages: Int, - f: PartialFunction[PublisherEvent, T]): java.util.List[T] = { + f: PartialFunction[PublisherEvent, T]): java.util.List[T] = receiveWhile(max.asScala, idle.asScala, messages)(f).asJava - } def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T = executeAfterSubscription { @@ -265,6 +271,7 @@ object TestPublisher { /** * JAVA API + * * Execute code block while bounding its execution time between `min` and * `max`. `within` blocks may be nested. All methods in this trait which * take maximum wait times are available in a version which implicitly uses @@ -279,6 +286,8 @@ object TestPublisher { * expectMsgClass(classOf[String]) * } * }}} + * + * @since 1.1.0 */ def within[T](min: java.time.Duration, max: java.time.Duration, @@ -294,7 +303,9 @@ object TestPublisher { /** * JAVA API + * * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + * @since 1.1.0 */ def within[T](max: java.time.Duration, creator: function.Creator[T]): T = within(max.asScala)(creator.create()) @@ -306,6 +317,7 @@ object TestPublisher { /** * JAVA API + * @since 1.1.0 */ def create[T](initialPendingRequests: Long, system: ClassicActorSystemProvider): Probe[T] = apply(initialPendingRequests)(system.classicSystem) @@ -415,6 +427,7 @@ object TestSubscriber { /** * JAVA API + * @since 1.1.0 */ def create[T]()(system: ClassicActorSystemProvider): ManualProbe[T] = apply()(system.classicSystem) @@ -458,7 +471,9 @@ object TestSubscriber { /** * JAVA API + * * Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`). + * @since 1.1.0 */ def expectEvent(max: java.time.Duration): SubscriberEvent = expectEvent(max.asScala) @@ -495,6 +510,7 @@ object TestSubscriber { * JAVA API * * Expect and return a stream element during specified time or timeout. + * @since 1.1.0 */ def expectNext(d: java.time.Duration): I = expectNext(d.asScala) @@ -524,6 +540,7 @@ object TestSubscriber { * Fluent DSL * * Expect a stream element during specified time or timeout. + * @since 1.1.0 */ def expectNext(d: java.time.Duration, element: I): Self = expectNext(d.asScala, element) @@ -602,6 +619,7 @@ object TestSubscriber { * * Fluent DSL * Expect the given elements to be signalled in any order. + * @since 1.1.0 */ def expectNextUnorderedN(all: java.util.List[I]): Self = expectNextUnorderedN(Util.immutableSeq(all)) @@ -833,11 +851,23 @@ object TestSubscriber { * * @param max wait no more than max time, otherwise throw AssertionError */ - def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = - expectEventWithTimeoutPF(max, - { - case OnNext(n) if f.isDefinedAt(n) => f(n) - }) + def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = { + val pf: PartialFunction[SubscriberEvent, Any] = { + case OnNext(n) => n + } + expectEventWithTimeoutPF(max, pf.andThen(f)) + } + + /** + * JAVA API + * + * Expect a stream element and test it with partial function. + * + * @param max wait no more than max time, otherwise throw AssertionError + * @since 1.1.0 + */ + def expectNextWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[Any, T]): T = + expectEventWithTimeoutPF(max.asScala, f) /** * Expect a stream element during specified time or timeout and test it with partial function. @@ -849,6 +879,19 @@ object TestSubscriber { def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self = expectNextWithTimeoutPF(max, f.andThen(_ => self)) + /** + * JAVA API + * + * Expect a stream element during specified time or timeout and test it with partial function. + * + * Allows chaining probe methods. + * + * @param max wait no more than max time, otherwise throw AssertionError + * @since 1.1.0 + */ + def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any, Any]): Self = + expectNextChainingPF(max.asScala, f) + /** * Expect a stream element during specified time or timeout and test it with partial function. * @@ -860,6 +903,13 @@ object TestSubscriber { def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T = probe.expectMsgPF[T](max, hint = "message matching partial function")(f.asInstanceOf[PartialFunction[Any, T]]) + /** + * JAVA API + * @since 1.1.0 + */ + def expectEventWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[SubscriberEvent, T]): T = + expectEventWithTimeoutPF(max.asScala, f) + def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T = expectEventWithTimeoutPF(Duration.Undefined, f) @@ -872,6 +922,19 @@ object TestSubscriber { messages: Int = Int.MaxValue)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T] = probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) + /** + * JAVA API + * + * Receive messages for a given duration or until one does not match a given partial function. + * @since 1.1.0 + */ + def receiveWhile[T]( + max: java.time.Duration, + idle: java.time.Duration, + messages: Int, + f: PartialFunction[SubscriberEvent, T]): java.util.List[T] = + receiveWhile(max.asScala, idle.asScala, messages)(f).asJava + /** * Drains a given number of messages */ @@ -887,10 +950,10 @@ object TestSubscriber { * JAVA API * * Drains a given number of messages + * @since 1.1.0 */ - def receiveWithin(max: java.time.Duration, messages: Int): java.util.List[I] = { + def receiveWithin(max: java.time.Duration, messages: Int): java.util.List[I] = receiveWithin(max.asScala, messages).asJava - } /** * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). @@ -923,12 +986,15 @@ object TestSubscriber { } /** + * JAVA API + * * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). * * '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!''' + * @since 1.1.0 */ - def toStrict(atMost: java.time.Duration): immutable.Seq[I] = - toStrict(atMost.asScala) + def toStrict(atMost: java.time.Duration): java.util.List[I] = + toStrict(atMost.asScala).asJava /** * Execute code block while bounding its execution time between `min` and @@ -965,6 +1031,8 @@ object TestSubscriber { * expectMsgClass(classOf[String]) * } * }}} + * + * @since 1.1.0 */ def within[T](min: java.time.Duration, max: java.time.Duration, @@ -979,6 +1047,7 @@ object TestSubscriber { * JAVA API * * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + * @since 1.1.0 */ def within[T](max: java.time.Duration)(creator: function.Creator[T]): T = within(max.asScala)(creator.create()) @@ -993,6 +1062,8 @@ object TestSubscriber { /** * JAVA API + * + * @since 1.1.0 */ def create[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = apply()(system) } @@ -1057,7 +1128,10 @@ object TestSubscriber { } /** + * JAVA API + * * Request and expect a stream element during the specified time or timeout. + * @since 1.1.0 */ def requestNext(d: java.time.Duration): T = requestNext(d.asScala) } diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala index 5cf81300538..21850fc9d25 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala @@ -136,15 +136,15 @@ class StreamTestKitSpec extends PekkoSpec { // It also needs to be dilated since the testkit will dilate the timeout // accordingly to `-Dpekko.test.timefactor` value. val initialDelay = (timeout * 2).dilated + val pf: PartialFunction[Any, Unit] = { + case 1 => + system.log.info("Message received :(") + } Source .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) - .expectNextWithTimeoutPF(timeout, - { - case 1 => - system.log.info("Message received :(") - }) + .expectNextWithTimeoutPF(timeout, pf) }.getMessage should include("timeout") } @@ -180,15 +180,15 @@ class StreamTestKitSpec extends PekkoSpec { // It also needs to be dilated since the testkit will dilate the timeout // accordingly to `-Dpekko.test.timefactor` value. val initialDelay = (timeout * 2).dilated + val pf: PartialFunction[Any, Unit] = { + case 1 => + system.log.info("Message received :(") + } Source .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) - .expectNextChainingPF(timeout, - { - case 1 => - system.log.info("Message received :(") - }) + .expectNextChainingPF(timeout, pf) }.getMessage should include("timeout") } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 997d401a830..b44a21d9de2 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1242,8 +1242,7 @@ public void mustBeAbleToUseMerge3() { .mergeAll(Arrays.asList(sourceB, sourceC), false) .runWith(TestSink.probe(system), system); sub.expectSubscription().request(9); - sub.expectNextUnorderedN(Util.immutableSeq(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9))) - .expectComplete(); + sub.expectNextUnorderedN(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)).expectComplete(); } @Test