diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala index fe10aee6246..218ffcd805f 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala @@ -29,7 +29,7 @@ import pekko.stream._ import pekko.stream.impl._ import pekko.testkit.{ TestActor, TestProbe } import pekko.testkit.TestActor.AutoPilot -import pekko.util.JavaDurationConverters +import pekko.util.JavaDurationConverters._ import pekko.util.ccompat._ import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -189,6 +189,14 @@ object TestPublisher { self } + /** + * Expect no messages for a given duration. + */ + def expectNoMessage(max: java.time.Duration): Self = executeAfterSubscription { + probe.expectNoMessage(max.asScala) + self + } + /** * Receive messages for a given duration or until one does not match a given partial function. */ @@ -223,10 +231,35 @@ object TestPublisher { probe.within(min, max)(f) } + /** + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "pekko.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(Duration.ofMillis(50)) { + * test ! "ping" + * expectMsgClass(classOf[String]) + * } + * }}} + */ + def within[T](min: java.time.Duration, max: java.time.Duration)(f: => T): T = executeAfterSubscription { + probe.within(min.asScala, max.asScala)(f) + } + /** * Same as calling `within(0 seconds, max)(f)`. */ def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { probe.within(max)(f) } + + /** + * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + */ + def within[T](max: java.time.Duration)(f: => T): T = executeAfterSubscription { probe.within(max.asScala)(f) } } object Probe { @@ -372,6 +405,12 @@ object TestSubscriber { def expectEvent(max: FiniteDuration): SubscriberEvent = probe.expectMsgType[SubscriberEvent](max) + /** + * Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`). + */ + def expectEvent(max: java.time.Duration): SubscriberEvent = + probe.expectMsgType[SubscriberEvent](max.asScala) + /** * Fluent DSL * @@ -401,6 +440,12 @@ object TestSubscriber { } } + /** + * Expect and return a stream element during specified time or timeout. + */ + def expectNext(d: java.time.Duration): I = + expectNext(d.asScala) + /** * Fluent DSL * @@ -421,6 +466,16 @@ object TestSubscriber { self } + /** + * Fluent DSL + * + * Expect a stream element during specified time or timeout. + */ + def expectNext(d: java.time.Duration, element: I): Self = { + probe.expectMsg(d.asScala, OnNext(element)) + self + } + /** * Fluent DSL * @@ -704,7 +759,6 @@ object TestSubscriber { * Java API: Assert that no message is received for the specified time. */ def expectNoMessage(remaining: java.time.Duration): Self = { - import JavaDurationConverters._ probe.expectNoMessage(remaining.asScala) self } @@ -770,6 +824,12 @@ object TestSubscriber { } .flatten + /** + * Drains a given number of messages + */ + def receiveWithin(max: java.time.Duration, messages: Int = Int.MaxValue): immutable.Seq[I] = + receiveWithin(max.asScala, messages) + /** * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). * @@ -800,6 +860,14 @@ object TestSubscriber { drain() } + /** + * Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements). + * + * '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!''' + */ + def toStrict(atMost: java.time.Duration): immutable.Seq[I] = + toStrict(atMost.asScala) + /** * Execute code block while bounding its execution time between `min` and * `max`. `within` blocks may be nested. All methods in this trait which @@ -818,11 +886,35 @@ object TestSubscriber { */ def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = probe.within(min, max)(f) + /** + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "pekko.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(Duration.ofMillis(50)) { + * test ! "ping" + * expectMsgClass(classOf[String]) + * } + * }}} + */ + def within[T](min: java.time.Duration, max: java.time.Duration)(f: pekko.japi.function.Function[Unit, T]): T = + probe.within(min.asScala, max.asScala)(f.apply()) + /** * Same as calling `within(0 seconds, max)(f)`. */ def within[T](max: FiniteDuration)(f: => T): T = probe.within(max)(f) + /** + * Same as calling `within(Duration.ofSeconds(0), max)(f)`. + */ + def within[T](max: java.time.Duration)(f: => T): T = probe.within(max.asScala)(f) + def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription) def onNext(element: I): Unit = probe.ref ! OnNext(element) def onComplete(): Unit = probe.ref ! OnComplete @@ -891,6 +983,14 @@ object TestSubscriber { subscription.request(1) expectNext(d) } + + /** + * Request and expect a stream element during the specified time or timeout. + */ + def requestNext(d: java.time.Duration): T = { + subscription.request(1) + expectNext(d) + } } }