diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 47ea78da..38b04a2e 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -43,11 +43,9 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -78,18 +76,17 @@ public final class ClusterImpl implements Cluster { private Function handler = cluster -> new ClusterMessageHandler() {}; - // Subject - private final DirectProcessor membershipEvents = DirectProcessor.create(); - private final FluxSink membershipSink = membershipEvents.sink(); + // Sink + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); // Lifecycle - private final MonoProcessor start = MonoProcessor.create(); - private final MonoProcessor onStart = MonoProcessor.create(); - private final MonoProcessor shutdown = MonoProcessor.create(); - private final MonoProcessor onShutdown = MonoProcessor.create(); + private final Sinks.One start = Sinks.one(); + private final Sinks.One onStart = Sinks.one(); + private final Sinks.One shutdown = Sinks.one(); + private final Sinks.One onShutdown = Sinks.one(); // Cluster components private Transport transport; @@ -119,14 +116,16 @@ private ClusterImpl(ClusterImpl that) { private void initLifecycle() { start + .asMono() .then(doStart()) - .doOnSuccess(avoid -> onStart.onComplete()) - .doOnError(onStart::onError) + .doOnSuccess(c -> onStart.tryEmitEmpty()) + .doOnError(onStart::tryEmitError) .subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th)); shutdown + .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.onComplete()) + .doFinally(s -> onShutdown.tryEmitEmpty()) .subscribe( null, th -> @@ -232,8 +231,8 @@ public ClusterImpl handler(Function handler) { public Mono start() { return Mono.defer( () -> { - start.onComplete(); - return onStart.thenReturn(this); + start.tryEmitEmpty(); + return onStart.asMono().thenReturn(this); }); } @@ -260,7 +259,7 @@ private Mono doStart0() { new FailureDetectorImpl( localMember, transport, - membershipEvents.onBackpressureBuffer(), + sink.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), scheduler, cidGenerator); @@ -269,7 +268,7 @@ private Mono doStart0() { new GossipProtocolImpl( localMember, transport, - membershipEvents.onBackpressureBuffer(), + sink.asFlux().onBackpressureBuffer(), config.gossipConfig(), scheduler); @@ -295,7 +294,7 @@ private Mono doStart0() { .listen() /*.publishOn(scheduler)*/ // Dont uncomment, already beign executed inside sc-cluster thread - .subscribe(membershipSink::next, this::onError, membershipSink::complete)); + .subscribe(sink::tryEmitNext, this::onError, sink::tryEmitComplete)); return Mono.fromRunnable(() -> failureDetector.start()) .then(Mono.fromRunnable(() -> gossip.start())) @@ -373,7 +372,7 @@ private Flux listenGossip() { private Flux listenMembership() { // listen on live stream - return membershipEvents.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } /** @@ -481,7 +480,7 @@ public Mono updateMetadata(T metadata) { @Override public void shutdown() { - shutdown.onComplete(); + shutdown.tryEmitEmpty(); } private Mono doShutdown() { @@ -524,12 +523,12 @@ private Mono dispose() { @Override public Mono onShutdown() { - return onShutdown; + return onShutdown.asMono(); } @Override public boolean isShutdown() { - return onShutdown.isDisposed(); + return onShutdown.asMono().toFuture().isDone(); } private static class SenderAwareTransport implements Transport { diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 7c72de85..435bcb8e 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -20,10 +20,8 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; public final class FailureDetectorImpl implements FailureDetector { @@ -46,18 +44,15 @@ public final class FailureDetectorImpl implements FailureDetector { // State private long currentPeriod = 0; - private List pingMembers = new ArrayList<>(); private int pingMemberIndex = 0; // index for sequential ping member selection + private final List pingMembers = new ArrayList<>(); // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); - // Subject - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - - private final FluxSink sink = subject.sink(); + // Sink + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Scheduled private final Scheduler scheduler; @@ -111,12 +106,12 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.complete(); + sink.tryEmitComplete(); } @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } // ================================================ @@ -376,7 +371,7 @@ private List selectPingReqMembers(Member pingMember) { private void publishPingResult(long period, Member member, MemberStatus status) { LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status); - sink.next(new FailureDetectorEvent(member, status)); + sink.tryEmitNext(new FailureDetectorEvent(member, status)); } private MemberStatus computeMemberStatus(Message message, long period) { diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index d6edd8b8..32830f23 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -21,12 +21,10 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; public final class GossipProtocolImpl implements GossipProtocol { @@ -58,12 +56,8 @@ public final class GossipProtocolImpl implements GossipProtocol { private final Disposable.Composite actionsDisposables = Disposables.composite(); - // Subject - - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - - private final FluxSink sink = subject.sink(); + // Sink + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Scheduled @@ -119,7 +113,7 @@ public void stop() { actionsDisposables.dispose(); // Stop publishing events - sink.complete(); + sink.tryEmitComplete(); } @Override @@ -131,7 +125,7 @@ public Mono spread(Message message) { @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } // ================================================ @@ -207,7 +201,7 @@ private void onGossipReq(Message message) { if (gossipState == null) { // new gossip gossipState = new GossipState(gossip, period); gossips.put(gossip.gossipId(), gossipState); - sink.next(gossip.message()); + sink.tryEmitNext(gossip.message()); } gossipState.addToInfected(gossipRequest.from()); } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 3a8a8c3c..5beb21ce 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -44,12 +44,10 @@ import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Disposables; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxProcessor; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; public final class MembershipProtocolImpl implements MembershipProtocol { @@ -91,11 +89,8 @@ private enum MembershipUpdateReason { private final List removedMembersHistory = new CopyOnWriteArrayList<>(); private final Set aliveEmittedSet = new HashSet<>(); - // Subject - - private final FluxProcessor subject = - DirectProcessor.create().serialize(); - private final FluxSink sink = subject.sink(); + // Sink + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Disposables private final Disposable.Composite actionsDisposables = Disposables.composite(); @@ -204,7 +199,7 @@ private boolean checkAddressesNotEqual(Address address0, Address address1) { @Override public Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } /** @@ -307,7 +302,7 @@ public void stop() { suspicionTimeoutTasks.clear(); // Stop publishing events - sink.complete(); + sink.tryEmitComplete(); } @Override @@ -735,7 +730,7 @@ private Mono onLeavingDetected(MembershipRecord r0, MembershipRecord r1) { private void publishEvent(MembershipEvent event) { LOGGER.info("[{}][publishEvent] {}", localMember, event); - sink.next(event); + sink.tryEmitNext(event); } private Mono onDeadMemberDetected(MembershipRecord r1) { diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java index 1d493649..7ec553be 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java @@ -231,6 +231,7 @@ public void testIsolatedParentNamespaces() { parent1.address(), parent2.address(), bob.address(), carol.address())) .startAwait(); + //noinspection unused Cluster eve = new ClusterImpl() .transportFactory(WebsocketTransportFactory::new) diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index ff3b6f4a..f71a2c3a 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -25,7 +25,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; public class ClusterTest extends BaseTest { @@ -449,7 +449,7 @@ public void onMembershipEvent(MembershipEvent event) { @Test public void testMemberMetadataRemoved() throws InterruptedException { // Start seed member - ReplayProcessor seedEvents = ReplayProcessor.create(); + final Sinks.Many sink = Sinks.many().replay().all(); Map seedMetadata = new HashMap<>(); seedMetadata.put("seed", "shmid"); final Cluster seedNode = @@ -460,7 +460,7 @@ public void testMemberMetadataRemoved() throws InterruptedException { new ClusterMessageHandler() { @Override public void onMembershipEvent(MembershipEvent event) { - seedEvents.onNext(event); + sink.tryEmitNext(event); } }) .startAwait(); @@ -469,7 +469,7 @@ public void onMembershipEvent(MembershipEvent event) { // Start member with metadata Map node1Metadata = new HashMap<>(); node1Metadata.put("node", "shmod"); - ReplayProcessor node1Events = ReplayProcessor.create(); + final Sinks.Many sink1 = Sinks.many().replay().all(); final Cluster node1 = new ClusterImpl() .config(opts -> opts.metadata(node1Metadata)) @@ -479,16 +479,18 @@ public void onMembershipEvent(MembershipEvent event) { new ClusterMessageHandler() { @Override public void onMembershipEvent(MembershipEvent event) { - node1Events.onNext(event); + sink1.tryEmitNext(event); } }) .startAwait(); // Check events - MembershipEvent nodeAddedEvent = seedEvents.as(Mono::from).block(Duration.ofSeconds(3)); + MembershipEvent nodeAddedEvent = sink.asFlux().blockFirst(Duration.ofSeconds(3)); + //noinspection ConstantConditions assertEquals(Type.ADDED, nodeAddedEvent.type()); - MembershipEvent seedAddedEvent = node1Events.as(Mono::from).block(Duration.ofSeconds(3)); + MembershipEvent seedAddedEvent = sink1.asFlux().blockFirst(Duration.ofSeconds(3)); + //noinspection ConstantConditions assertEquals(Type.ADDED, seedAddedEvent.type()); // Check metadata @@ -498,7 +500,7 @@ public void onMembershipEvent(MembershipEvent event) { // Remove node1 from cluster CountDownLatch latch = new CountDownLatch(1); AtomicReference> removedMetadata = new AtomicReference<>(); - seedEvents + sink.asFlux() .filter(MembershipEvent::isRemoved) .subscribe( event -> { diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index d5035504..828250f3 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -32,11 +32,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; @@ -586,8 +584,8 @@ public void testRestartStoppedMembers() { .then() .block(TIMEOUT); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); stop(cmC); stop(cmD); @@ -668,8 +666,8 @@ public void testRestartStoppedMembersOnSameAddresses() { assertTrusted(cmC, cmB.member(), cmA.member(), cmD.member()); assertTrusted(cmD, cmB.member(), cmC.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); stop(cmC); stop(cmD); @@ -867,9 +865,9 @@ public void testNetworkPartitionDueNoInboundThenRemoved() { assertTrusted(cmB, cmA.member(), cmC.member()); assertTrusted(cmC, cmB.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_RemovedHistory = startRecordingRemoved(cmC); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_RemovedHistory = startRecordingRemoved(cmC); // block inbound msgs from all c.networkEmulator().blockAllInbound(); @@ -907,9 +905,9 @@ public void testNetworkPartitionDueNoInboundUntilRemovedThenInboundRecover() { assertTrusted(cmB, cmA.member(), cmC.member()); assertTrusted(cmC, cmB.member(), cmA.member()); - ReplayProcessor cmA_RemovedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_RemovedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_RemovedHistory = startRecordingRemoved(cmC); + Sinks.Many cmA_RemovedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_RemovedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_RemovedHistory = startRecordingRemoved(cmC); // block inbound msgs from all c.networkEmulator().blockAllInbound(); @@ -1059,10 +1057,10 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { assertTrusted(cmD, cmA.member(), cmB.member(), cmC.member()); assertNoSuspected(cmD); - ReplayProcessor cmA_removedHistory = startRecordingRemoved(cmA); - ReplayProcessor cmB_removedHistory = startRecordingRemoved(cmB); - ReplayProcessor cmC_removedHistory = startRecordingRemoved(cmC); - ReplayProcessor cmD_removedHistory = startRecordingRemoved(cmD); + Sinks.Many cmA_removedHistory = startRecordingRemoved(cmA); + Sinks.Many cmB_removedHistory = startRecordingRemoved(cmB); + Sinks.Many cmC_removedHistory = startRecordingRemoved(cmC); + Sinks.Many cmD_removedHistory = startRecordingRemoved(cmD); // Split into several clusters Stream.of(a, b, c, d) @@ -1129,8 +1127,7 @@ private MembershipProtocolImpl createMembership( private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) { Member localMember = new Member(newMemberId(), null, transport.address(), NAMESPACE); - DirectProcessor membershipProcessor = DirectProcessor.create(); - FluxSink membershipSink = membershipProcessor.sink(); + Sinks.Many sink = Sinks.many().multicast().directBestEffort(); CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id()); @@ -1138,14 +1135,18 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf new FailureDetectorImpl( localMember, transport, - membershipProcessor, + sink.asFlux().onBackpressureBuffer(), config.failureDetectorConfig(), scheduler, cidGenerator); GossipProtocolImpl gossipProtocol = new GossipProtocolImpl( - localMember, transport, membershipProcessor, config.gossipConfig(), scheduler); + localMember, + transport, + sink.asFlux().onBackpressureBuffer(), + config.gossipConfig(), + scheduler); MetadataStoreImpl metadataStore = new MetadataStoreImpl(localMember, transport, null, config, scheduler, cidGenerator); @@ -1162,7 +1163,7 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf cidGenerator, new ClusterMonitorModel.Builder()); - membership.listen().subscribe(membershipSink::next); + membership.listen().subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete); try { failureDetector.start(); @@ -1239,9 +1240,9 @@ private void assertSuspected(MembershipProtocolImpl membership, Member... expect } } - private void assertRemoved(ReplayProcessor recording, Member... expected) { + private void assertRemoved(Sinks.Many recording, Member... expected) { List actual = new ArrayList<>(); - recording.map(MembershipEvent::member).subscribe(actual::add); + recording.asFlux().map(MembershipEvent::member).subscribe(actual::add); assertEquals( expected.length, actual.size(), @@ -1261,10 +1262,6 @@ private void assertSelfTrusted(MembershipProtocolImpl membership) { assertTrusted(membership); } - private void assertNoRemoved(ReplayProcessor recording) { - assertRemoved(recording); - } - private void assertMemberAndType( MembershipEvent membershipEvent, String expectedMemberId, MembershipEvent.Type expectedType) { @@ -1283,10 +1280,12 @@ private List membersByStatus(MembershipProtocolImpl membership, MemberSt .collect(Collectors.toList()); } - private ReplayProcessor startRecordingRemoved( - MembershipProtocolImpl membership) { - ReplayProcessor recording = ReplayProcessor.create(); - membership.listen().filter(MembershipEvent::isRemoved).subscribe(recording); - return recording; + private Sinks.Many startRecordingRemoved(MembershipProtocolImpl membership) { + final Sinks.Many sink = Sinks.many().replay().all(); + membership + .listen() + .filter(MembershipEvent::isRemoved) + .subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete); + return sink; } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index 3385cd44..18574b4b 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -26,11 +26,9 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Sinks; import reactor.netty.Connection; import reactor.netty.DisposableServer; import reactor.netty.resources.LoopResources; @@ -41,13 +39,12 @@ public final class TransportImpl implements Transport { private final MessageCodec messageCodec; - // Subject - private final DirectProcessor subject = DirectProcessor.create(); - private final FluxSink sink = subject.sink(); + // Sink + private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); // Close handler - private final MonoProcessor stop = MonoProcessor.create(); - private final MonoProcessor onStop = MonoProcessor.create(); + private final Sinks.One stop = Sinks.one(); + private final Sinks.One onStop = Sinks.one(); // Server private Address address; @@ -86,8 +83,9 @@ private void init(DisposableServer server) { this.server = server; this.address = prepareAddress(server); // Setup cleanup - stop.then(doStop()) - .doFinally(s -> onStop.onComplete()) + stop.asMono() + .then(doStop()) + .doFinally(s -> onStop.tryEmitEmpty()) .subscribe( null, ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString())); } @@ -148,17 +146,17 @@ public static Mono bind(TransportConfig config) { */ @Override public Mono start() { - return Mono.deferWithContext(context -> receiver.bind()) + return Mono.deferContextual(context -> receiver.bind()) .doOnNext(this::init) .doOnSuccess(t -> LOGGER.info("[bind0][{}] Bound cluster transport", t.address())) .doOnError(ex -> LOGGER.error("[bind0][{}] Exception occurred: {}", address, ex.toString())) .thenReturn(this) .cast(Transport.class) - .subscriberContext( + .contextWrite( context -> context.put( ReceiverContext.class, - new ReceiverContext(loopResources, this::toMessage, sink::next))); + new ReceiverContext(loopResources, this::toMessage, sink::tryEmitNext))); } @Override @@ -168,15 +166,15 @@ public Address address() { @Override public boolean isStopped() { - return onStop.isDisposed(); + return onStop.asMono().toFuture().isDone(); } @Override public final Mono stop() { return Mono.defer( () -> { - stop.onComplete(); - return onStop; + stop.tryEmitEmpty(); + return onStop.asMono(); }); } @@ -185,7 +183,7 @@ private Mono doStop() { () -> { LOGGER.info("[{}][doStop] Stopping", address); // Complete incoming messages observable - sink.complete(); + sink.tryEmitComplete(); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) @@ -195,17 +193,17 @@ private Mono doStop() { @Override public final Flux listen() { - return subject.onBackpressureBuffer(); + return sink.asFlux().onBackpressureBuffer(); } @Override public Mono send(Address address, Message message) { - return Mono.deferWithContext(context -> connections.computeIfAbsent(address, this::connect0)) + return Mono.deferContextual(context -> connections.computeIfAbsent(address, this::connect0)) .flatMap( connection -> - Mono.deferWithContext(context -> sender.send(message)) - .subscriberContext(context -> context.put(Connection.class, connection))) - .subscriberContext( + Mono.deferContextual(context -> sender.send(message)) + .contextWrite(context -> context.put(Connection.class, connection))) + .contextWrite( context -> context.put( SenderContext.class, new SenderContext(loopResources, this::toByteBuf))); diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java index 3986a01f..e2122847 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpChannelInitializer.java @@ -25,11 +25,11 @@ final class TcpChannelInitializer implements BiConsumer bind() { - return Mono.deferWithContext(context -> Mono.just(context.get(ReceiverContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(ReceiverContext.class))) .flatMap( context -> newTcpServer(context) @@ -38,13 +38,12 @@ private TcpServer newTcpServer(ReceiverContext context) { return TcpServer.create() .runOn(context.loopResources()) .bindAddress(() -> new InetSocketAddress(config.port())) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true) .doOnChannelInit( - (connectionObserver, channel, remoteAddress) -> { - new TcpChannelInitializer(config.maxFrameLength()) - .accept(connectionObserver, channel); - }); + (connectionObserver, channel, remoteAddress) -> + new TcpChannelInitializer(config.maxFrameLength()) + .accept(connectionObserver, channel)); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java index 326edc86..8c2fe8e7 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/tcp/TcpSender.java @@ -21,14 +21,14 @@ final class TcpSender implements Sender { @Override public Mono connect(Address address) { - return Mono.deferWithContext(context -> Mono.just(context.get(SenderContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newTcpClient(context, address)) .flatMap(TcpClient::connect); } @Override public Mono send(Message message) { - return Mono.deferWithContext( + return Mono.deferContextual( context -> { Connection connection = context.get(Connection.class); SenderContext senderContext = context.get(SenderContext.class); diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java index 623ea83e..f7492adb 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketReceiver.java @@ -9,7 +9,6 @@ import reactor.core.publisher.Mono; import reactor.netty.DisposableServer; import reactor.netty.http.server.HttpServer; -import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketOutbound; @@ -24,31 +23,25 @@ final class WebsocketReceiver implements Receiver { @Override public Mono bind() { - return Mono.deferWithContext(context -> Mono.just(context.get(ReceiverContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(ReceiverContext.class))) .flatMap( context -> newHttpServer(context) - .handle((request, response) -> onMessage(context, request, response)) + .handle((request, response) -> onMessage(context, response)) .bind() .cast(DisposableServer.class)); } private HttpServer newHttpServer(ReceiverContext context) { return HttpServer.create() - .tcpConfiguration( - tcpServer -> - tcpServer - .runOn(context.loopResources()) - .bindAddress(() -> new InetSocketAddress(config.port())) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true)); + .runOn(context.loopResources()) + .bindAddress(() -> new InetSocketAddress(config.port())) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_REUSEADDR, true); } - private Mono onMessage( - ReceiverContext context, - @SuppressWarnings("unused") HttpServerRequest request, - HttpServerResponse response) { + private Mono onMessage(ReceiverContext context, HttpServerResponse response) { return response.sendWebsocket( (WebsocketInbound inbound, WebsocketOutbound outbound) -> inbound diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java index 1298d152..fe48fd55 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/websocket/WebsocketSender.java @@ -11,7 +11,7 @@ import reactor.netty.Connection; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.WebsocketClientSpec; -import reactor.netty.tcp.TcpClient; +import reactor.netty.resources.ConnectionProvider; final class WebsocketSender implements Sender { @@ -23,14 +23,14 @@ final class WebsocketSender implements Sender { @Override public Mono connect(Address address) { - return Mono.deferWithContext(context -> Mono.just(context.get(SenderContext.class))) + return Mono.deferContextual(context -> Mono.just(context.get(SenderContext.class))) .map(context -> newWebsocketSender(context, address)) .flatMap(sender -> sender.uri("/").connect()); } @Override public Mono send(Message message) { - return Mono.deferWithContext( + return Mono.deferContextual( context -> { Connection connection = context.get(Connection.class); SenderContext senderContext = context.get(SenderContext.class); @@ -46,21 +46,19 @@ public Mono send(Message message) { } private HttpClient.WebsocketSender newWebsocketSender(SenderContext context, Address address) { - return HttpClient.newConnection() - .tcpConfiguration( - tcpClient -> { - TcpClient tcpClient1 = - tcpClient - .runOn(context.loopResources()) - .host(address.host()) - .port(address.port()) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()); - return config.isSecured() ? tcpClient1.secure() : tcpClient1; - }) - .websocket( - WebsocketClientSpec.builder().maxFramePayloadLength(config.maxFrameLength()).build()); + HttpClient httpClient = + HttpClient.create(ConnectionProvider.newConnection()) + .runOn(context.loopResources()) + .host(address.host()) + .port(address.port()) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeout()); + + httpClient = config.isSecured() ? httpClient.secure() : httpClient; + + return httpClient.websocket( + WebsocketClientSpec.builder().maxFramePayloadLength(config.maxFrameLength()).build()); } } diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java index f3da9803..2d2a8487 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/tcp/TcpTransportTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import reactor.core.publisher.Flux; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; public class TcpTransportTest extends BaseTest { @@ -322,8 +322,8 @@ public void testBlockAndUnblockTraffic() throws Exception { server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - ReplayProcessor responses = ReplayProcessor.create(); - client.listen().subscribe(responses); + final Sinks.Many sink = Sinks.many().replay().all(); + client.listen().subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete); // test at unblocked transport send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); @@ -333,7 +333,7 @@ public void testBlockAndUnblockTraffic() throws Exception { client.networkEmulator().blockOutbound(server.address()); send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - StepVerifier.create(responses) + StepVerifier.create(sink.asFlux()) .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) .expectNoEvent(Duration.ofMillis(300)) .thenCancel() diff --git a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java index 88495dcf..ef12467e 100644 --- a/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java +++ b/transport-parent/transport-netty/src/test/java/io/scalecube/transport/netty/websocket/WebsocketTransportTest.java @@ -22,7 +22,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import reactor.core.publisher.Flux; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; public class WebsocketTransportTest extends BaseTest { @@ -322,8 +322,8 @@ public void testBlockAndUnblockTraffic() throws Exception { server.listen().subscribe(message -> server.send(message.sender(), message).subscribe()); - ReplayProcessor responses = ReplayProcessor.create(); - client.listen().subscribe(responses); + final Sinks.Many sink = Sinks.many().replay().all(); + client.listen().subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete); // test at unblocked transport send(client, server.address(), Message.fromQualifier("q/unblocked")).subscribe(); @@ -333,7 +333,7 @@ public void testBlockAndUnblockTraffic() throws Exception { client.networkEmulator().blockOutbound(server.address()); send(client, server.address(), Message.fromQualifier("q/blocked")).subscribe(); - StepVerifier.create(responses) + StepVerifier.create(sink.asFlux()) .assertNext(message -> assertEquals("q/unblocked", message.qualifier())) .expectNoEvent(Duration.ofMillis(300)) .thenCancel()