Skip to content

Commit

Permalink
Add missing java api for StreamTestKit
Browse files Browse the repository at this point in the history
  • Loading branch information
naosense authored and He-Pin committed Mar 16, 2024
1 parent 17577cf commit 94d6d2d
Showing 1 changed file with 102 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import pekko.stream._
import pekko.stream.impl._
import pekko.testkit.{ TestActor, TestProbe }
import pekko.testkit.TestActor.AutoPilot
import pekko.util.JavaDurationConverters
import pekko.util.JavaDurationConverters._
import pekko.util.ccompat._

import org.reactivestreams.{ Publisher, Subscriber, Subscription }
Expand Down Expand Up @@ -189,6 +189,14 @@ object TestPublisher {
self
}

/**
* Expect no messages for a given duration.
*/
def expectNoMessage(max: java.time.Duration): Self = executeAfterSubscription {
probe.expectNoMessage(max.asScala)
self
}

/**
* Receive messages for a given duration or until one does not match a given partial function.
*/
Expand Down Expand Up @@ -223,10 +231,35 @@ object TestPublisher {
probe.within(min, max)(f)
}

/**
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
* take maximum wait times are available in a version which implicitly uses
* the remaining time governed by the innermost enclosing `within` block.
*
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "pekko.test.timefactor", while the min Duration is not.
*
* {{{
* val ret = within(Duration.ofMillis(50)) {
* test ! "ping"
* expectMsgClass(classOf[String])
* }
* }}}
*/
def within[T](min: java.time.Duration, max: java.time.Duration)(f: => T): T = executeAfterSubscription {
probe.within(min.asScala, max.asScala)(f)
}

/**
* Same as calling `within(0 seconds, max)(f)`.
*/
def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { probe.within(max)(f) }

/**
* Same as calling `within(Duration.ofSeconds(0), max)(f)`.
*/
def within[T](max: java.time.Duration)(f: => T): T = executeAfterSubscription { probe.within(max.asScala)(f) }
}

object Probe {
Expand Down Expand Up @@ -372,6 +405,12 @@ object TestSubscriber {
def expectEvent(max: FiniteDuration): SubscriberEvent =
probe.expectMsgType[SubscriberEvent](max)

/**
* Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`).
*/
def expectEvent(max: java.time.Duration): SubscriberEvent =
probe.expectMsgType[SubscriberEvent](max.asScala)

/**
* Fluent DSL
*
Expand Down Expand Up @@ -401,6 +440,12 @@ object TestSubscriber {
}
}

/**
* Expect and return a stream element during specified time or timeout.
*/
def expectNext(d: java.time.Duration): I =
expectNext(d.asScala)

/**
* Fluent DSL
*
Expand All @@ -421,6 +466,16 @@ object TestSubscriber {
self
}

/**
* Fluent DSL
*
* Expect a stream element during specified time or timeout.
*/
def expectNext(d: java.time.Duration, element: I): Self = {
probe.expectMsg(d.asScala, OnNext(element))
self
}

/**
* Fluent DSL
*
Expand Down Expand Up @@ -704,7 +759,6 @@ object TestSubscriber {
* Java API: Assert that no message is received for the specified time.
*/
def expectNoMessage(remaining: java.time.Duration): Self = {
import JavaDurationConverters._
probe.expectNoMessage(remaining.asScala)
self
}
Expand Down Expand Up @@ -770,6 +824,12 @@ object TestSubscriber {
}
.flatten

/**
* Drains a given number of messages
*/
def receiveWithin(max: java.time.Duration, messages: Int = Int.MaxValue): immutable.Seq[I] =
receiveWithin(max.asScala, messages)

/**
* Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements).
*
Expand Down Expand Up @@ -800,6 +860,14 @@ object TestSubscriber {
drain()
}

/**
* Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements).
*
* '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!'''
*/
def toStrict(atMost: java.time.Duration): immutable.Seq[I] =
toStrict(atMost.asScala)

/**
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
Expand All @@ -818,11 +886,35 @@ object TestSubscriber {
*/
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = probe.within(min, max)(f)

/**
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
* take maximum wait times are available in a version which implicitly uses
* the remaining time governed by the innermost enclosing `within` block.
*
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "pekko.test.timefactor", while the min Duration is not.
*
* {{{
* val ret = within(Duration.ofMillis(50)) {
* test ! "ping"
* expectMsgClass(classOf[String])
* }
* }}}
*/
def within[T](min: java.time.Duration, max: java.time.Duration)(f: pekko.japi.function.Function[Unit, T]): T =
probe.within(min.asScala, max.asScala)(f.apply())

/**
* Same as calling `within(0 seconds, max)(f)`.
*/
def within[T](max: FiniteDuration)(f: => T): T = probe.within(max)(f)

/**
* Same as calling `within(Duration.ofSeconds(0), max)(f)`.
*/
def within[T](max: java.time.Duration)(f: => T): T = probe.within(max.asScala)(f)

def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription)
def onNext(element: I): Unit = probe.ref ! OnNext(element)
def onComplete(): Unit = probe.ref ! OnComplete
Expand Down Expand Up @@ -891,6 +983,14 @@ object TestSubscriber {
subscription.request(1)
expectNext(d)
}

/**
* Request and expect a stream element during the specified time or timeout.
*/
def requestNext(d: java.time.Duration): T = {
subscription.request(1)
expectNext(d)
}
}
}

Expand Down

0 comments on commit 94d6d2d

Please sign in to comment.