From 2dd5f980fdfb9c78abc7c20e113d6cc2f42186c6 Mon Sep 17 00:00:00 2001 From: segabriel Date: Tue, 23 Mar 2021 11:20:02 +0200 Subject: [PATCH] Update reactor version (#354) * Update reactor version --- .../cluster/membership/MembershipProtocolTest.java | 3 ++- pom.xml | 6 ++++-- .../io/scalecube/transport/netty/TransportImpl.java | 5 +++-- .../transport/netty/tcp/TcpChannelInitializer.java | 6 +++--- .../io/scalecube/transport/netty/tcp/TcpReceiver.java | 10 +++++----- .../io/scalecube/transport/netty/tcp/TcpSender.java | 9 ++++----- .../transport/netty/tcp/TcpTransportTest.java | 2 +- .../netty/websocket/WebsocketTransportTest.java | 2 +- 8 files changed, 23 insertions(+), 20 deletions(-) diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 90d0bf86..d5035504 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -39,6 +39,7 @@ import reactor.core.publisher.ReplayProcessor; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public class MembershipProtocolTest extends BaseTest { @@ -1199,7 +1200,7 @@ private void stop(MembershipProtocolImpl membership) { private Mono awaitUntil(Runnable assertAction) { return Mono.fromRunnable(assertAction) - .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(100), Duration.ofSeconds(1)); + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))); } private void assertTrusted(MembershipProtocolImpl membership, Member... expected) { diff --git a/pom.xml b/pom.xml index 73d12ccb..39d61c1d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -37,7 +39,7 @@ 1.7.30 2.13.2 - Dysprosium-SR9 + 2020.0.5 2.11.0 2.27.0 diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 4dd4698c..3385cd44 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -14,6 +14,7 @@ import io.scalecube.net.Address; import io.scalecube.transport.netty.tcp.TcpTransportFactory; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -72,8 +73,8 @@ public TransportImpl(MessageCodec messageCodec, Receiver receiver, Sender sender } private static Address prepareAddress(DisposableServer server) { - InetAddress address = server.address().getAddress(); - int port = server.address().getPort(); + InetAddress address = ((InetSocketAddress) server.address()).getAddress(); + int port = ((InetSocketAddress) server.address()).getPort(); if (address.isAnyLocalAddress()) { return Address.create(Address.getLocalIpAddress().getHostAddress(), port); } else { diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java index bef209a4..3986a01f 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java @@ -25,11 +25,11 @@ final class TcpChannelInitializer implements BiConsumer - BootstrapHandlers.updateConfiguration( - b, "inbound", new TcpChannelInitializer(config.maxFrameLength()))); + .doOnChannelInit( + (connectionObserver, channel, remoteAddress) -> { + new TcpChannelInitializer(config.maxFrameLength()) + .accept(connectionObserver, channel); + }); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index a983b769..326edc86 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -8,7 +8,6 @@ import io.scalecube.transport.netty.TransportImpl.SenderContext; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.channel.BootstrapHandlers; import reactor.netty.resources.ConnectionProvider; import reactor.netty.tcp.TcpClient; @@ -50,10 +49,10 @@ private TcpClient newTcpClient(SenderContext context, Address address) { .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()) - .bootstrap( - b -> - BootstrapHandlers.updateConfiguration( - b, "outbound", new TcpChannelInitializer(config.maxFrameLength()))); + .doOnChannelInit( + (connectionObserver, channel, remoteAddress) -> + new TcpChannelInitializer(config.maxFrameLength()) + .accept(connectionObserver, channel)); return config.isSecured() ? tcpClient.secure() : tcpClient; } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index d95bd8e4..f3da9803 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -47,7 +47,7 @@ public void testUnresolvedHostConnection() { try { Address address = Address.from("wronghost:49255"); Message message = Message.withData("q").build(); - client.send(address, message).block(Duration.ofSeconds(5)); + client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); } catch (Exception e) { assertEquals( diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index cb4e377f..88495dcf 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -47,7 +47,7 @@ public void testUnresolvedHostConnection() { try { Address address = Address.from("wronghost:49255"); Message message = Message.withData("q").build(); - client.send(address, message).block(Duration.ofSeconds(5)); + client.send(address, message).block(Duration.ofSeconds(20)); fail("fail"); } catch (Exception e) { assertEquals(