diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index 3fa7a54ad58..d0cc6f1c2e6 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -30,7 +30,6 @@ import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; import java.time.Duration; import java.util.concurrent.TimeoutException; @@ -235,7 +234,7 @@ public void restartUptoMaxRetries() throws Exception { Thread.sleep(500); assertEquals(BackpressureTimeoutException.class, probe.expectError().getClass()); probe.request(1); // send demand - probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); // but no more restart + probe.expectNoMessage(Duration.ofMillis(200)); // but no more restart } }; } diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java index c5d085c458b..06224429b73 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java @@ -15,11 +15,17 @@ import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.*; +import org.apache.pekko.stream.Attributes; +import org.apache.pekko.stream.FlowShape; +import org.apache.pekko.stream.Inlet; +import org.apache.pekko.stream.Outlet; import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; -import org.apache.pekko.stream.stage.*; +import org.apache.pekko.stream.stage.AbstractInHandler; +import org.apache.pekko.stream.stage.AbstractOutHandler; +import org.apache.pekko.stream.stage.GraphStage; +import org.apache.pekko.stream.stage.GraphStageLogic; import org.apache.pekko.stream.testkit.TestPublisher; import org.apache.pekko.stream.testkit.TestSubscriber; import org.apache.pekko.stream.testkit.javadsl.TestSink; @@ -28,9 +34,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; +import java.time.Duration; public class RecipeHold extends RecipeTest { static ActorSystem system; @@ -188,10 +193,8 @@ public void workForVersion2() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS); - sub.request(1); - sub.expectNoMessage(timeout); + sub.expectNoMessage(Duration.ofMillis(200)); pub.sendNext(1); sub.expectNext(1); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java index d8bf30def9d..c76460c9b6f 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeManualTrigger.java @@ -15,7 +15,10 @@ import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.Pair; -import org.apache.pekko.stream.*; +import org.apache.pekko.stream.ClosedShape; +import org.apache.pekko.stream.FanInShape2; +import org.apache.pekko.stream.FlowShape; +import org.apache.pekko.stream.SourceShape; import org.apache.pekko.stream.javadsl.*; import org.apache.pekko.stream.testkit.TestPublisher; import org.apache.pekko.stream.testkit.TestSubscriber; @@ -25,10 +28,9 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.Arrays; -import java.util.concurrent.TimeUnit; public class RecipeManualTrigger extends RecipeTest { static ActorSystem system; @@ -85,7 +87,7 @@ public void zipped() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(100); sub.expectSubscription().request(1000); sub.expectNoMessage(timeout); @@ -140,7 +142,7 @@ public void zipWith() throws Exception { TestPublisher.Probe pub = pubSub.first(); TestSubscriber.Probe sub = pubSub.second(); - FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(100); sub.expectSubscription().request(1000); sub.expectNoMessage(timeout); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java index ac11660c8a4..fbdc3438a5b 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeMissedTicks.java @@ -31,6 +31,7 @@ import org.junit.Test; import scala.concurrent.Await; +import java.time.Duration; import java.util.concurrent.TimeUnit; public class RecipeMissedTicks extends RecipeTest { @@ -83,8 +84,7 @@ class Tick {} pub.sendNext(Tick); pub.sendNext(Tick); - scala.concurrent.duration.FiniteDuration timeout = - scala.concurrent.duration.FiniteDuration.create(200, TimeUnit.MILLISECONDS); + Duration timeout = Duration.ofMillis(200); Await.ready(latch, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS)); diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala index fe10aee6246..a8525b03ba0 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala @@ -13,8 +13,7 @@ package org.apache.pekko.stream.testkit -import java.io.PrintWriter -import java.io.StringWriter +import java.io.{ PrintWriter, StringWriter } import java.util.concurrent.CountDownLatch import scala.annotation.tailrec @@ -23,14 +22,21 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import org.apache.pekko -import pekko.actor.{ ActorRef, ActorSystem, DeadLetterSuppression, NoSerializationVerificationNeeded } -import pekko.actor.ClassicActorSystemProvider +import pekko.actor.{ + ActorRef, + ActorSystem, + ClassicActorSystemProvider, + DeadLetterSuppression, + NoSerializationVerificationNeeded +} +import pekko.japi._ import pekko.stream._ import pekko.stream.impl._ import pekko.testkit.{ TestActor, TestProbe } import pekko.testkit.TestActor.AutoPilot -import pekko.util.JavaDurationConverters +import pekko.util.JavaDurationConverters._ import pekko.util.ccompat._ +import pekko.util.ccompat.JavaConverters._ import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -93,6 +99,15 @@ object TestPublisher { */ def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T] = new ManualProbe(autoOnSubscribe)(system.classicSystem) + + /** + * JAVA API + * + * Probe that implements [[org.reactivestreams.Publisher]] interface. + * @since 1.1.0 + */ + def create[T](autoOnSubscribe: Boolean, system: ClassicActorSystemProvider): ManualProbe[T] = + new ManualProbe(autoOnSubscribe)(system.classicSystem) } /** @@ -136,6 +151,14 @@ object TestPublisher { f } + /** + * JAVA API + * @since 1.1.0 + */ + def executeAfterSubscription[T](f: function.Creator[T]): T = { + executeAfterSubscription(f.create()) + } + /** * Expect a subscription. */ @@ -189,17 +212,40 @@ object TestPublisher { self } + /** + * JAVA API + * + * Expect no messages for a given duration. + * @since 1.1.0 + */ + def expectNoMessage(max: java.time.Duration): Self = expectNoMessage(max.asScala) + /** * Receive messages for a given duration or until one does not match a given partial function. */ - def receiveWhile[T]( - max: Duration = Duration.Undefined, + def receiveWhile[T](max: Duration = Duration.Undefined, idle: Duration = Duration.Inf, messages: Int = Int.MaxValue)(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T] = - executeAfterSubscription { probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) } + executeAfterSubscription { + probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) + } + + /** + * JAVA API + * + * Receive messages for a given duration or until one does not match a given partial function. + * @since 1.1.0 + */ + def receiveWhile[T](max: java.time.Duration, + idle: java.time.Duration, + messages: Int, + f: PartialFunction[PublisherEvent, T]): java.util.List[T] = + receiveWhile(max.asScala, idle.asScala, messages)(f).asJava def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T = - executeAfterSubscription { probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]]) } + executeAfterSubscription { + probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]]) + } def getPublisher: Publisher[I] = this @@ -223,15 +269,58 @@ object TestPublisher { probe.within(min, max)(f) } + /** + * JAVA API + * + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "pekko.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(Duration.ofMillis(50)) { + * test ! "ping" + * expectMsgClass(classOf[String]) + * } + * }}} + * + * @since 1.1.0 + */ + def within[T](min: java.time.Duration, + max: java.time.Duration, + creator: function.Creator[T]): T = + within(min.asScala, max.asScala)(creator.create()) + /** * Same as calling `within(0 seconds, max)(f)`. */ - def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { probe.within(max)(f) } + def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { + probe.within(max)(f) + } + + /** + * JAVA API + * + * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + * @since 1.1.0 + */ + def within[T](max: java.time.Duration, + creator: function.Creator[T]): T = within(max.asScala)(creator.create()) } object Probe { def apply[T](initialPendingRequests: Long = 0)(implicit system: ClassicActorSystemProvider): Probe[T] = new Probe(initialPendingRequests)(system.classicSystem) + + /** + * JAVA API + * @since 1.1.0 + */ + def create[T](initialPendingRequests: Long, system: ClassicActorSystemProvider): Probe[T] = + apply(initialPendingRequests)(system.classicSystem) } /** @@ -291,6 +380,7 @@ object TestPublisher { assert(cause == expectedCause, s"Expected cancellation cause to be $expectedCause but was $cause") this } + def expectCancellationWithCause[E <: Throwable: ClassTag](): E = subscription.expectCancellation() match { case e: E => e case cause => @@ -334,6 +424,13 @@ object TestSubscriber { object ManualProbe { def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T] = new ManualProbe()(system.classicSystem) + + /** + * JAVA API + * @since 1.1.0 + */ + def create[T]()(system: ClassicActorSystemProvider): ManualProbe[T] = + apply()(system.classicSystem) } /** @@ -372,6 +469,14 @@ object TestSubscriber { def expectEvent(max: FiniteDuration): SubscriberEvent = probe.expectMsgType[SubscriberEvent](max) + /** + * JAVA API + * + * Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`). + * @since 1.1.0 + */ + def expectEvent(max: java.time.Duration): SubscriberEvent = expectEvent(max.asScala) + /** * Fluent DSL * @@ -401,6 +506,14 @@ object TestSubscriber { } } + /** + * JAVA API + * + * Expect and return a stream element during specified time or timeout. + * @since 1.1.0 + */ + def expectNext(d: java.time.Duration): I = expectNext(d.asScala) + /** * Fluent DSL * @@ -421,6 +534,16 @@ object TestSubscriber { self } + /** + * JAVA PAI + * + * Fluent DSL + * + * Expect a stream element during specified time or timeout. + * @since 1.1.0 + */ + def expectNext(d: java.time.Duration, element: I): Self = expectNext(d.asScala, element) + /** * Fluent DSL * @@ -491,6 +614,15 @@ object TestSubscriber { self } + /** + * JAVA API + * + * Fluent DSL + * Expect the given elements to be signalled in any order. + * @since 1.1.0 + */ + def expectNextUnorderedN(all: java.util.List[I]): Self = expectNextUnorderedN(Util.immutableSeq(all)) + /** * Fluent DSL * @@ -704,7 +836,6 @@ object TestSubscriber { * Java API: Assert that no message is received for the specified time. */ def expectNoMessage(remaining: java.time.Duration): Self = { - import JavaDurationConverters._ probe.expectNoMessage(remaining.asScala) self } @@ -720,11 +851,23 @@ object TestSubscriber { * * @param max wait no more than max time, otherwise throw AssertionError */ - def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = - expectEventWithTimeoutPF(max, - { - case OnNext(n) if f.isDefinedAt(n) => f(n) - }) + def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = { + val pf: PartialFunction[SubscriberEvent, Any] = { + case OnNext(n) => n + } + expectEventWithTimeoutPF(max, pf.andThen(f)) + } + + /** + * JAVA API + * + * Expect a stream element and test it with partial function. + * + * @param max wait no more than max time, otherwise throw AssertionError + * @since 1.1.0 + */ + def expectNextWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[Any, T]): T = + expectEventWithTimeoutPF(max.asScala, f) /** * Expect a stream element during specified time or timeout and test it with partial function. @@ -736,6 +879,19 @@ object TestSubscriber { def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self = expectNextWithTimeoutPF(max, f.andThen(_ => self)) + /** + * JAVA API + * + * Expect a stream element during specified time or timeout and test it with partial function. + * + * Allows chaining probe methods. + * + * @param max wait no more than max time, otherwise throw AssertionError + * @since 1.1.0 + */ + def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any, Any]): Self = + expectNextChainingPF(max.asScala, f) + /** * Expect a stream element during specified time or timeout and test it with partial function. * @@ -747,6 +903,13 @@ object TestSubscriber { def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T = probe.expectMsgPF[T](max, hint = "message matching partial function")(f.asInstanceOf[PartialFunction[Any, T]]) + /** + * JAVA API + * @since 1.1.0 + */ + def expectEventWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[SubscriberEvent, T]): T = + expectEventWithTimeoutPF(max.asScala, f) + def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T = expectEventWithTimeoutPF(Duration.Undefined, f) @@ -759,6 +922,19 @@ object TestSubscriber { messages: Int = Int.MaxValue)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T] = probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) + /** + * JAVA API + * + * Receive messages for a given duration or until one does not match a given partial function. + * @since 1.1.0 + */ + def receiveWhile[T]( + max: java.time.Duration, + idle: java.time.Duration, + messages: Int, + f: PartialFunction[SubscriberEvent, T]): java.util.List[T] = + receiveWhile(max.asScala, idle.asScala, messages)(f).asJava + /** * Drains a given number of messages */ @@ -770,6 +946,15 @@ object TestSubscriber { } .flatten + /** + * JAVA API + * + * Drains a given number of messages + * @since 1.1.0 + */ + def receiveWithin(max: java.time.Duration, messages: Int): java.util.List[I] = + receiveWithin(max.asScala, messages).asJava + /** * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). * @@ -800,6 +985,17 @@ object TestSubscriber { drain() } + /** + * JAVA API + * + * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). + * + * '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!''' + * @since 1.1.0 + */ + def toStrict(atMost: java.time.Duration): java.util.List[I] = + toStrict(atMost.asScala).asJava + /** * Execute code block while bounding its execution time between `min` and * `max`. `within` blocks may be nested. All methods in this trait which @@ -818,11 +1014,43 @@ object TestSubscriber { */ def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = probe.within(min, max)(f) + /** + * JAVA API + * + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "pekko.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(Duration.ofMillis(50)) { + * test ! "ping" + * expectMsgClass(classOf[String]) + * } + * }}} + * + * @since 1.1.0 + */ + def within[T](min: java.time.Duration, + max: java.time.Duration, + creator: function.Creator[T]): T = within(min.asScala, max.asScala)(creator.create()) + /** * Same as calling `within(0 seconds, max)(f)`. */ def within[T](max: FiniteDuration)(f: => T): T = probe.within(max)(f) + /** + * JAVA API + * + * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + * @since 1.1.0 + */ + def within[T](max: java.time.Duration)(creator: function.Creator[T]): T = within(max.asScala)(creator.create()) + def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription) def onNext(element: I): Unit = probe.ref ! OnNext(element) def onComplete(): Unit = probe.ref ! OnComplete @@ -831,6 +1059,13 @@ object TestSubscriber { object Probe { def apply[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = new Probe()(system.classicSystem) + + /** + * JAVA API + * + * @since 1.1.0 + */ + def create[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = apply()(system) } /** @@ -891,6 +1126,14 @@ object TestSubscriber { subscription.request(1) expectNext(d) } + + /** + * JAVA API + * + * Request and expect a stream element during the specified time or timeout. + * @since 1.1.0 + */ + def requestNext(d: java.time.Duration): T = requestNext(d.asScala) } } diff --git a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala index 5cf81300538..21850fc9d25 100644 --- a/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala +++ b/stream-testkit/src/test/scala/org/apache/pekko/stream/testkit/StreamTestKitSpec.scala @@ -136,15 +136,15 @@ class StreamTestKitSpec extends PekkoSpec { // It also needs to be dilated since the testkit will dilate the timeout // accordingly to `-Dpekko.test.timefactor` value. val initialDelay = (timeout * 2).dilated + val pf: PartialFunction[Any, Unit] = { + case 1 => + system.log.info("Message received :(") + } Source .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) - .expectNextWithTimeoutPF(timeout, - { - case 1 => - system.log.info("Message received :(") - }) + .expectNextWithTimeoutPF(timeout, pf) }.getMessage should include("timeout") } @@ -180,15 +180,15 @@ class StreamTestKitSpec extends PekkoSpec { // It also needs to be dilated since the testkit will dilate the timeout // accordingly to `-Dpekko.test.timefactor` value. val initialDelay = (timeout * 2).dilated + val pf: PartialFunction[Any, Unit] = { + case 1 => + system.log.info("Message received :(") + } Source .tick(initialDelay, 1.millis, 1) .runWith(TestSink.probe) .request(1) - .expectNextChainingPF(timeout, - { - case 1 => - system.log.info("Message received :(") - }) + .expectNextChainingPF(timeout, pf) }.getMessage should include("timeout") } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 997d401a830..b44a21d9de2 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1242,8 +1242,7 @@ public void mustBeAbleToUseMerge3() { .mergeAll(Arrays.asList(sourceB, sourceC), false) .runWith(TestSink.probe(system), system); sub.expectSubscription().request(9); - sub.expectNextUnorderedN(Util.immutableSeq(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9))) - .expectComplete(); + sub.expectNextUnorderedN(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)).expectComplete(); } @Test