Skip to content

Commit

Permalink
Add BlockHound integration and make sure tests don't use blocking calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed May 14, 2019
1 parent 1e08b10 commit a984c19
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 34 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ ext {
gradleScriptDir = "${rootProject.projectDir}/gradle"

reactorCoreVersion = "3.3.0.BUILD-SNAPSHOT"
blockHoundVersion = "1.0.0.BUILD-SNAPSHOT"

// Logging
slf4jVersion = '1.7.12'
Expand Down Expand Up @@ -214,6 +215,7 @@ configure(rootProject) { project ->
optional "org.slf4j:slf4j-api:$slf4jVersion"

compile "io.projectreactor:reactor-core:$reactorCoreVersion"
optional "io.projectreactor.tools:blockhound:$blockHoundVersion"

compile "io.netty:netty-handler:${nettyVersion}"
compile "io.netty:netty-codec-http:${nettyVersion}"
Expand Down
5 changes: 5 additions & 0 deletions gradle/doc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ configure(rootProject) {
}
}

excludes = [
// Must be public due to the ServiceLoader's requirements
"reactor/netty/resources/NettyBlockHoundIntegration.java",
]

options.addStringOption('charSet', 'UTF-8')

options.memberLevel = org.gradle.external.javadoc.JavadocMemberLevel.PROTECTED
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package reactor.netty.resources;

import io.netty.channel.ChannelInitializer;
import io.netty.util.concurrent.AbstractEventExecutor;
import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;
import reactor.core.scheduler.NonBlocking;

/**
* An internal service for automatic integration with {@link BlockHound#install(BlockHoundIntegration...)}
*
* @author Stephane Maldini
*/
public class NettyBlockHoundIntegration implements BlockHoundIntegration {
@Override
public void applyTo(BlockHound.Builder builder) {
builder.nonBlockingThreadPredicate(current -> current.or(NonBlocking.class::isInstance));

//allow set initialization that might use Yield
builder.allowBlockingCallsInside(ChannelInitializer.class.getName(), "initChannel");

//prevent blocking call in any netty event executor
builder.disallowBlockingCallsInside(AbstractEventExecutor.class.getName(), "safeExecute");
}

}
32 changes: 18 additions & 14 deletions src/test/java/reactor/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpServer;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
Expand All @@ -102,6 +104,8 @@
*/
public class HttpClientTest {

static final Logger log = Loggers.getLogger(HttpClientTest.class);

@Test
public void abort() {
DisposableServer x =
Expand Down Expand Up @@ -851,30 +855,30 @@ public void test() {
.bindNow();

createHttpClientForContextWithAddress(context)
.doOnRequest((r, c) -> System.out.println("onReq: "+r))
.doAfterRequest((r, c) -> System.out.println("afterReq: "+r))
.doOnResponse((r, c) -> System.out.println("onResp: "+r))
.doAfterResponse((r, c) -> System.out.println("afterResp: "+r))
.doOnRequest((r, c) -> log.debug("onReq: "+r))
.doAfterRequest((r, c) -> log.debug("afterReq: "+r))
.doOnResponse((r, c) -> log.debug("onResp: "+r))
.doAfterResponse((r, c) -> log.debug("afterResp: "+r))
.put()
.uri("/201")
.responseContent()
.blockLast();

createHttpClientForContextWithAddress(context)
.doOnRequest((r, c) -> System.out.println("onReq: "+r))
.doAfterRequest((r, c) -> System.out.println("afterReq: "+r))
.doOnResponse((r, c) -> System.out.println("onResp: "+r))
.doAfterResponse((r, c) -> System.out.println("afterResp: "+r))
.doOnRequest((r, c) -> log.debug("onReq: "+r))
.doAfterRequest((r, c) -> log.debug("afterReq: "+r))
.doOnResponse((r, c) -> log.debug("onResp: "+r))
.doAfterResponse((r, c) -> log.debug("afterResp: "+r))
.put()
.uri("/204")
.responseContent()
.blockLast(Duration.ofSeconds(30));

createHttpClientForContextWithAddress(context)
.doOnRequest((r, c) -> System.out.println("onReq: "+r))
.doAfterRequest((r, c) -> System.out.println("afterReq: "+r))
.doOnResponse((r, c) -> System.out.println("onResp: "+r))
.doAfterResponse((r, c) -> System.out.println("afterResp: "+r))
.doOnRequest((r, c) -> log.debug("onReq: "+r))
.doAfterRequest((r, c) -> log.debug("afterReq: "+r))
.doOnResponse((r, c) -> log.debug("onResp: "+r))
.doAfterResponse((r, c) -> log.debug("afterResp: "+r))
.get()
.uri("/200")
.responseContent()
Expand All @@ -901,7 +905,7 @@ public void testDeferredUri() {

AtomicInteger i = new AtomicInteger();
createHttpClientForContextWithAddress(context)
.observe((c, s) -> System.out.println(s + "" + c))
.observe((c, s) -> log.info(s + "" + c))
.get()
.uri(Mono.fromCallable(() -> {
switch (i.incrementAndGet()) {
Expand Down Expand Up @@ -931,7 +935,7 @@ public void testDeferredHeader() {

createHttpClientForContextWithAddress(context)
.headersWhen(h -> Mono.just(h.set("test", "test")).delayElement(Duration.ofSeconds(2)))
.observe((c, s) -> System.out.println(s + "" + c))
.observe((c, s) -> log.debug(s + "" + c))
.get()
.uri("/201")
.responseContent()
Expand Down
28 changes: 16 additions & 12 deletions src/test/java/reactor/netty/http/client/WebsocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.netty.resources.ConnectionProvider;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -63,6 +65,8 @@ public class WebsocketTest {

static final String auth = "bearer abc";

static final Logger log = Loggers.getLogger(WebsocketTest.class);

DisposableServer httpServer = null;

@After
Expand Down Expand Up @@ -150,7 +154,7 @@ public void serverWebSocketFailed() {
// .flatMapMany(in -> in.receiveWebsocket()
// .receive()
// .asByteArray())
// .doOnNext(d -> System.out.println(d.length))
// .doOnNext(d -> log.debug(d.length))
// .log()
// .subscribe();
//
Expand Down Expand Up @@ -203,7 +207,7 @@ public void webSocketRespondsToRequestsFromClients() {
HttpServer.create()
.port(0)
.route(r -> r.get("/test/{param}", (req, res) -> {
System.out.println(req.requestHeaders().get("test"));
log.debug(req.requestHeaders().get("test"));
return res.header("content-type", "text/plain")
.sendWebsocket((in, out) ->
out.options(NettyPipeline.SendOptions::flushOnEach)
Expand Down Expand Up @@ -246,14 +250,14 @@ public void webSocketRespondsToRequestsFromClients() {
.cache()
.doOnError(i -> System.err.println("Failed requesting server: " + i));

System.out.println("STARTING: server[" + serverRes.get() + "] / client[" + clientRes.get() + "]");
log.debug("STARTING: server[" + serverRes.get() + "] / client[" + clientRes.get() + "]");

StepVerifier.create(response)
.expectNextMatches(list -> "1000 World!".equals(list.get(999)))
.expectComplete()
.verify();

System.out.println("FINISHED: server[" + serverRes.get() + "] / client[" + clientRes + "]");
log.debug("FINISHED: server[" + serverRes.get() + "] / client[" + clientRes + "]");
}

@Test
Expand Down Expand Up @@ -805,7 +809,7 @@ public void testClientOnCloseIsInvokedClientSendClose() throws Exception {
Mono.delay(Duration.ofSeconds(3))
.delayUntil(i -> out.sendClose())
.subscribe(c -> {
System.out.println("context.dispose()");
log.debug("context.dispose()");
latch.countDown();
});
in.withConnection(conn ->
Expand All @@ -818,15 +822,15 @@ public void testClientOnCloseIsInvokedClientSendClose() throws Exception {
error.set(true);
},
() -> {
System.out.println("context.onClose() completed");
log.debug("context.onClose() completed");
latch.countDown();
}));
Mono.delay(Duration.ofSeconds(3))
.repeat(() -> {
AtomicBoolean disposed = new AtomicBoolean(false);
in.withConnection(conn -> {
disposed.set(conn.isDisposed());
System.out.println("context.isDisposed() " + conn.isDisposed());
log.debug("context.isDisposed() " + conn.isDisposed());
});
if (disposed.get()) {
latch.countDown();
Expand Down Expand Up @@ -867,7 +871,7 @@ public void testClientOnCloseIsInvokedClientDisposed() throws Exception {
in.withConnection(conn -> {
Mono.delay(Duration.ofSeconds(3))
.subscribe(c -> {
System.out.println("context.dispose()");
log.debug("context.dispose()");
conn.dispose();
latch.countDown();
});
Expand All @@ -880,7 +884,7 @@ public void testClientOnCloseIsInvokedClientDisposed() throws Exception {
error.set(true);
},
() -> {
System.out.println("context.onClose() completed");
log.debug("context.onClose() completed");
latch.countDown();
});
});
Expand All @@ -889,7 +893,7 @@ public void testClientOnCloseIsInvokedClientDisposed() throws Exception {
AtomicBoolean disposed = new AtomicBoolean(false);
in.withConnection(conn -> {
disposed.set(conn.isDisposed());
System.out.println("context.isDisposed() " + conn.isDisposed());
log.debug("context.isDisposed() " + conn.isDisposed());
});
if (disposed.get()) {
latch.countDown();
Expand Down Expand Up @@ -936,15 +940,15 @@ public void testClientOnCloseIsInvokedServerInitiatedClose() throws Exception {
error.set(true);
},
() -> {
System.out.println("context.onClose() completed");
log.debug("context.onClose() completed");
latch.countDown();
}));
Mono.delay(Duration.ofSeconds(3))
.repeat(() -> {
AtomicBoolean disposed = new AtomicBoolean(false);
in.withConnection(conn -> {
disposed.set(conn.isDisposed());
System.out.println("context.isDisposed() " + conn.isDisposed());
log.debug("context.isDisposed() " + conn.isDisposed());
});
if (disposed.get()) {
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void testIssue673_IllegalStateException() throws InterruptedException {
ConcurrentMap<PooledConnectionProvider.PoolKey, PooledConnectionProvider.Pool> pools = provider.channelPools;
pool.set(pools.get(pools.keySet().toArray()[0]));
provider.disposeLater()
.block(Duration.ofSeconds(30));
.subscribe();
conn.channel()
.closeFuture()
.addListener(future -> latch.countDown());
Expand Down
17 changes: 10 additions & 7 deletions src/test/java/reactor/netty/tcp/TcpClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.test.StepVerifier;
import reactor.util.Logger;
import reactor.util.Loggers;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -75,6 +76,8 @@
*/
public class TcpClientTests {

static final Logger log = Loggers.getLogger(TcpClientTests.class);

private final ExecutorService threadPool = Executors.newCachedThreadPool();
int echoServerPort;
EchoServer echoServer;
Expand Down Expand Up @@ -303,7 +306,7 @@ private void tcpClientHandlesLineFeedData(TcpClient client) throws InterruptedEx
.wiretap(true)
.connectNow(Duration.ofSeconds(30));

System.out.println("Connected");
log.debug("Connected");

c.onDispose()
.log()
Expand Down Expand Up @@ -399,7 +402,7 @@ public void connectionWillAttemptToReconnectWhenItIsDropped()
.port(abortServerPort);

Mono<? extends Connection> handler = tcpClient.handle((in, out) -> {
System.out.println("Start");
log.debug("Start");
connectionLatch.countDown();
in.receive()
.subscribe();
Expand Down Expand Up @@ -534,7 +537,7 @@ public void writeIdleDoesNotFireWhileDataIsBeingSent()
.host("localhost")
.port(echoServerPort)
.handle((in, out) -> {
System.out.println("hello");
log.debug("hello");
out.withConnection(c -> c.onWriteIdle(500, latch::countDown));

List<Publisher<Void>> allWrites =
Expand All @@ -548,7 +551,7 @@ public void writeIdleDoesNotFireWhileDataIsBeingSent()
.wiretap(true)
.connectNow();

System.out.println("Started");
log.debug("Started");

assertTrue(latch.await(5, TimeUnit.SECONDS));

Expand All @@ -564,13 +567,13 @@ public void nettyNetChannelAcceptsNettyChannelHandlers() throws InterruptedExcep
.wiretap(true);

final CountDownLatch latch = new CountDownLatch(1);
System.out.println(client.get()
log.debug(client.get()
.uri("http://www.google.com/?q=test%20d%20dq")
.responseContent()
.asString()
.collectList()
.doOnSuccess(v -> latch.countDown())
.block(Duration.ofSeconds(30)));
.block(Duration.ofSeconds(30)).toString());

assertTrue("Latch didn't time out", latch.await(15, TimeUnit.SECONDS));
}
Expand Down Expand Up @@ -672,7 +675,7 @@ public void run() {
countDown();
while (true) {
SocketChannel ch = server.accept();
System.out.println("ABORTING");
log.debug("ABORTING");
ch.close();
}
}
Expand Down

0 comments on commit a984c19

Please sign in to comment.