Skip to content

Commit

Permalink
chore: Add more methods to streamKit
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Mar 16, 2024
1 parent e04b48c commit 650ef9d
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -235,7 +234,7 @@ public void restartUptoMaxRetries() throws Exception {
Thread.sleep(500);
assertEquals(BackpressureTimeoutException.class, probe.expectError().getClass());
probe.request(1); // send demand
probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); // but no more restart
probe.expectNoMessage(Duration.ofMillis(200)); // but no more restart
}
};
}
Expand Down
17 changes: 10 additions & 7 deletions docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeHold.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.stage.*;
import org.apache.pekko.stream.stage.AbstractInHandler;
import org.apache.pekko.stream.stage.AbstractOutHandler;
import org.apache.pekko.stream.stage.GraphStage;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.testkit.TestPublisher;
import org.apache.pekko.stream.testkit.TestSubscriber;
import org.apache.pekko.stream.testkit.javadsl.TestSink;
Expand All @@ -28,9 +34,8 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.TimeUnit;
import java.time.Duration;

public class RecipeHold extends RecipeTest {
static ActorSystem system;
Expand Down Expand Up @@ -188,10 +193,8 @@ public void workForVersion2() throws Exception {
TestPublisher.Probe<Integer> pub = pubSub.first();
TestSubscriber.Probe<Integer> sub = pubSub.second();

FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);

sub.request(1);
sub.expectNoMessage(timeout);
sub.expectNoMessage(Duration.ofMillis(200));

pub.sendNext(1);
sub.expectNext(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.FanInShape2;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.stream.testkit.TestPublisher;
import org.apache.pekko.stream.testkit.TestSubscriber;
Expand All @@ -25,10 +28,9 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class RecipeManualTrigger extends RecipeTest {
static ActorSystem system;
Expand Down Expand Up @@ -85,7 +87,7 @@ public void zipped() throws Exception {
TestPublisher.Probe<Trigger> pub = pubSub.first();
TestSubscriber.Probe<Message> sub = pubSub.second();

FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(100);
sub.expectSubscription().request(1000);
sub.expectNoMessage(timeout);

Expand Down Expand Up @@ -140,7 +142,7 @@ public void zipWith() throws Exception {
TestPublisher.Probe<Trigger> pub = pubSub.first();
TestSubscriber.Probe<Message> sub = pubSub.second();

FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(100);
sub.expectSubscription().request(1000);
sub.expectNoMessage(timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.Test;
import scala.concurrent.Await;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class RecipeMissedTicks extends RecipeTest {
Expand Down Expand Up @@ -83,8 +84,7 @@ class Tick {}
pub.sendNext(Tick);
pub.sendNext(Tick);

scala.concurrent.duration.FiniteDuration timeout =
scala.concurrent.duration.FiniteDuration.create(200, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(200);

Await.ready(latch, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS));

Expand Down
Loading

0 comments on commit 650ef9d

Please sign in to comment.