Skip to content

Commit

Permalink
Update reactor version (#354)
Browse files Browse the repository at this point in the history
* Update reactor version
  • Loading branch information
segabriel authored Mar 23, 2021
1 parent d6dd8c1 commit 2dd5f98
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class MembershipProtocolTest extends BaseTest {

Expand Down Expand Up @@ -1199,7 +1200,7 @@ private void stop(MembershipProtocolImpl membership) {

private Mono<Void> awaitUntil(Runnable assertAction) {
return Mono.<Void>fromRunnable(assertAction)
.retryBackoff(Long.MAX_VALUE, Duration.ofMillis(100), Duration.ofSeconds(1));
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100)));
}

private void assertTrusted(MembershipProtocolImpl membership, Member... expected) {
Expand Down
6 changes: 4 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -37,7 +39,7 @@

<slf4j.version>1.7.30</slf4j.version>
<log4j.version>2.13.2</log4j.version>
<reactor.version>Dysprosium-SR9</reactor.version>
<reactor.version>2020.0.5</reactor.version>
<jackson.version>2.11.0</jackson.version>

<mockito-junit-jupiter.version>2.27.0</mockito-junit-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.scalecube.net.Address;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -72,8 +73,8 @@ public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender
}

private static Address prepareAddress(DisposableServer server) {
InetAddress address = server.address().getAddress();
int port = server.address().getPort();
InetAddress address = ((InetSocketAddress) server.address()).getAddress();
int port = ((InetSocketAddress) server.address()).getPort();
if (address.isAnyLocalAddress()) {
return Address.create(Address.getLocalIpAddress().getHostAddress(), port);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ final class TcpChannelInitializer implements BiConsumer<ConnectionObserver, Chan
@Override
public void accept(ConnectionObserver connectionObserver, Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LengthFieldPrepender(LENGTH_FIELD_LENGTH));
pipeline.addLast(
pipeline.addFirst(new ExceptionHandler());
pipeline.addFirst(
new LengthFieldBasedFrameDecoder(
maxFrameLength, 0, LENGTH_FIELD_LENGTH, 0, LENGTH_FIELD_LENGTH));
pipeline.addLast(new ExceptionHandler());
pipeline.addFirst(new LengthFieldPrepender(LENGTH_FIELD_LENGTH));
}

@ChannelHandler.Sharable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.net.InetSocketAddress;
import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.tcp.TcpServer;

final class TcpReceiver implements Receiver {
Expand Down Expand Up @@ -42,9 +41,10 @@ private TcpServer newTcpServer(ReceiverContext context) {
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.bootstrap(
b ->
BootstrapHandlers.updateConfiguration(
b, "inbound", new TcpChannelInitializer(config.maxFrameLength())));
.doOnChannelInit(
(connectionObserver, channel, remoteAddress) -> {
new TcpChannelInitializer(config.maxFrameLength())
.accept(connectionObserver, channel);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.scalecube.transport.netty.TransportImpl.SenderContext;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.TcpClient;

Expand Down Expand Up @@ -50,10 +49,10 @@ private TcpClient newTcpClient(SenderContext context, Address address) {
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout())
.bootstrap(
b ->
BootstrapHandlers.updateConfiguration(
b, "outbound", new TcpChannelInitializer(config.maxFrameLength())));
.doOnChannelInit(
(connectionObserver, channel, remoteAddress) ->
new TcpChannelInitializer(config.maxFrameLength())
.accept(connectionObserver, channel));
return config.isSecured() ? tcpClient.secure() : tcpClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testUnresolvedHostConnection() {
try {
Address address = Address.from("wronghost:49255");
Message message = Message.withData("q").build();
client.send(address, message).block(Duration.ofSeconds(5));
client.send(address, message).block(Duration.ofSeconds(20));
fail("fail");
} catch (Exception e) {
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testUnresolvedHostConnection() {
try {
Address address = Address.from("wronghost:49255");
Message message = Message.withData("q").build();
client.send(address, message).block(Duration.ofSeconds(5));
client.send(address, message).block(Duration.ofSeconds(20));
fail("fail");
} catch (Exception e) {
assertEquals(
Expand Down

0 comments on commit 2dd5f98

Please sign in to comment.