From 0aad6ea3bbc5e1d5ff3aad8cadc7084ae2e97f57 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 19 Nov 2019 09:55:22 +0200 Subject: [PATCH] provides prioritized sending for zero-stream frames; refactors benchmarks Signed-off-by: Oleh Dokuka --- benchmarks/build.gradle | 163 ++++++++ .../java/io/rsocket/MaxPerfSubscriber.java | 0 .../io/rsocket/PayloadsPerfSubscriber.java | 16 + .../main}/java/io/rsocket/PerfSubscriber.java | 10 +- .../main}/java/io/rsocket/RSocketPerf.java | 48 ++- .../java/io/rsocket/StreamIdSupplierPerf.java | 0 .../frame/FrameHeaderFlyweightPerf.java | 0 .../java/io/rsocket/frame/FrameTypePerf.java | 0 .../rsocket/frame/PayloadFlyweightPerf.java | 0 .../metadata/WellKnownMimeTypePerf.java | 0 gradle.properties | 1 + rsocket-core/build.gradle | 4 +- rsocket-core/jmh.gradle | 46 --- .../java/io/rsocket/RSocketResponder.java | 2 +- .../rsocket/internal/UnboundedProcessor.java | 5 +- .../queues/BaseSpscLinkedArrayQueue.java | 367 ------------------ .../queues/SpscUnboundedArrayQueue.java | 73 ---- .../jctools/queues/SupportsIterator.java | 20 - settings.gradle | 4 +- 19 files changed, 230 insertions(+), 529 deletions(-) create mode 100644 benchmarks/build.gradle rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/MaxPerfSubscriber.java (100%) create mode 100644 benchmarks/src/main/java/io/rsocket/PayloadsPerfSubscriber.java rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/PerfSubscriber.java (78%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/RSocketPerf.java (73%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/StreamIdSupplierPerf.java (100%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/frame/FrameHeaderFlyweightPerf.java (100%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/frame/FrameTypePerf.java (100%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/frame/PayloadFlyweightPerf.java (100%) rename {rsocket-core/src/jmh => benchmarks/src/main}/java/io/rsocket/metadata/WellKnownMimeTypePerf.java (100%) delete mode 100644 rsocket-core/jmh.gradle delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle new file mode 100644 index 000000000..fa1d6e04b --- /dev/null +++ b/benchmarks/build.gradle @@ -0,0 +1,163 @@ +apply plugin: 'java' +apply plugin: 'idea' + +configurations { + current + baseline { + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' + } +} + +dependencies { + // Use the baseline to avoid using new APIs in the benchmarks + compileOnly "io.rsocket:rsocket-core:${perfBaselineVersion}" + compileOnly "io.rsocket:rsocket-transport-local:${perfBaselineVersion}" + + implementation "org.openjdk.jmh:jmh-core:1.21" + annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:1.21" + + current project(':rsocket-core') + current project(':rsocket-transport-local') + baseline "io.rsocket:rsocket-core:${perfBaselineVersion}", { + changing = true + } + baseline "io.rsocket:rsocket-transport-local:${perfBaselineVersion}", { + changing = true + } +} + +task jmhProfilers(type: JavaExec, description:'Lists the available profilers for the jmh task', group: 'Development') { + classpath = sourceSets.main.runtimeClasspath + main = 'org.openjdk.jmh.Main' + args '-lprof' +} + +task jmh(type: JmhExecTask, description: 'Executing JMH benchmarks') { + classpath = sourceSets.main.runtimeClasspath + configurations.current +} + +task jmhBaseline(type: JmhExecTask, description: 'Executing JMH baseline benchmarks') { + classpath = sourceSets.main.runtimeClasspath + configurations.baseline +} + +class JmhExecTask extends JavaExec { + + private String include; + private String fullInclude; + private String exclude; + private String format = "json"; + private String profilers; + private String jmhJvmArgs; + private String verify; + + public JmhExecTask() { + super(); + } + + public String getInclude() { + return include; + } + + @Option(option = "include", description="configure bench inclusion using substring") + public void setInclude(String include) { + this.include = include; + } + + public String getFullInclude() { + return fullInclude; + } + + @Option(option = "fullInclude", description = "explicitly configure bench inclusion using full JMH style regexp") + public void setFullInclude(String fullInclude) { + this.fullInclude = fullInclude; + } + + public String getExclude() { + return exclude; + } + + @Option(option = "exclude", description = "explicitly configure bench exclusion using full JMH style regexp") + public void setExclude(String exclude) { + this.exclude = exclude; + } + + public String getFormat() { + return format; + } + + @Option(option = "format", description = "configure report format") + public void setFormat(String format) { + this.format = format; + } + + public String getProfilers() { + return profilers; + } + + @Option(option = "profilers", description = "configure jmh profiler(s) to use, comma separated") + public void setProfilers(String profilers) { + this.profilers = profilers; + } + + public String getJmhJvmArgs() { + return jmhJvmArgs; + } + + @Option(option = "jvmArgs", description = "configure additional JMH JVM arguments, comma separated") + public void setJmhJvmArgs(String jvmArgs) { + this.jmhJvmArgs = jvmArgs; + } + + public String getVerify() { + return verify; + } + + @Option(option = "verify", description = "run in verify mode") + public void setVerify(String verify) { + this.verify = verify; + } + + @TaskAction + public void exec() { + setMain("org.openjdk.jmh.Main"); + File resultFile = getProject().file("build/reports/" + getName() + "/result." + format); + + if (include != null) { + args(".*" + include + ".*"); + } + else if (fullInclude != null) { + args(fullInclude); + } + + if(exclude != null) { + args("-e", exclude); + } + if(verify != null) { // execute benchmarks with the minimum amount of execution (only to check if they are working) + System.out.println("Running in verify mode"); + args("-f", 1); + args("-wi", 1); + args("-i", 1); + } + args("-foe", "true"); //fail-on-error + args("-v", "NORMAL"); //verbosity [SILENT, NORMAL, EXTRA] + if(profilers != null) { + for (String prof : profilers.split(",")) { + args("-prof", prof); + } + } + args("-jvmArgsPrepend", "-Xmx3072m"); + args("-jvmArgsPrepend", "-Xms3072m"); + if(jmhJvmArgs != null) { + for(String jvmArg : jmhJvmArgs.split(" ")) { + args("-jvmArgsPrepend", jvmArg); + } + } + args("-rf", format); + args("-rff", resultFile); + + System.out.println("\nExecuting JMH with: " + getArgs() + "\n"); + resultFile.getParentFile().mkdirs(); + + super.exec(); + } +} \ No newline at end of file diff --git a/rsocket-core/src/jmh/java/io/rsocket/MaxPerfSubscriber.java b/benchmarks/src/main/java/io/rsocket/MaxPerfSubscriber.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/MaxPerfSubscriber.java rename to benchmarks/src/main/java/io/rsocket/MaxPerfSubscriber.java diff --git a/benchmarks/src/main/java/io/rsocket/PayloadsPerfSubscriber.java b/benchmarks/src/main/java/io/rsocket/PayloadsPerfSubscriber.java new file mode 100644 index 000000000..efc116958 --- /dev/null +++ b/benchmarks/src/main/java/io/rsocket/PayloadsPerfSubscriber.java @@ -0,0 +1,16 @@ +package io.rsocket; + +import org.openjdk.jmh.infra.Blackhole; + +public class PayloadsPerfSubscriber extends PerfSubscriber { + + public PayloadsPerfSubscriber(Blackhole blackhole) { + super(blackhole); + } + + @Override + public void onNext(Payload payload) { + payload.release(); + super.onNext(payload); + } +} diff --git a/rsocket-core/src/jmh/java/io/rsocket/PerfSubscriber.java b/benchmarks/src/main/java/io/rsocket/PerfSubscriber.java similarity index 78% rename from rsocket-core/src/jmh/java/io/rsocket/PerfSubscriber.java rename to benchmarks/src/main/java/io/rsocket/PerfSubscriber.java index 98c5edd3b..425001254 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/PerfSubscriber.java +++ b/benchmarks/src/main/java/io/rsocket/PerfSubscriber.java @@ -1,13 +1,14 @@ package io.rsocket; -import java.util.concurrent.CountDownLatch; import org.openjdk.jmh.infra.Blackhole; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -public class PerfSubscriber implements CoreSubscriber { +import java.util.concurrent.CountDownLatch; + +public class PerfSubscriber implements CoreSubscriber { - final CountDownLatch latch = new CountDownLatch(1); + public final CountDownLatch latch = new CountDownLatch(1); final Blackhole blackhole; Subscription s; @@ -23,8 +24,7 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(Payload payload) { - payload.release(); + public void onNext(T payload) { blackhole.consume(payload); s.request(1); } diff --git a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java b/benchmarks/src/main/java/io/rsocket/RSocketPerf.java similarity index 73% rename from rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java rename to benchmarks/src/main/java/io/rsocket/RSocketPerf.java index 476d6c814..e1edf38ad 100644 --- a/rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java +++ b/benchmarks/src/main/java/io/rsocket/RSocketPerf.java @@ -4,15 +4,16 @@ import io.rsocket.transport.local.LocalClientTransport; import io.rsocket.transport.local.LocalServerTransport; import io.rsocket.util.EmptyPayload; -import java.util.stream.IntStream; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; import org.reactivestreams.Publisher; @@ -20,6 +21,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.lang.reflect.Field; +import java.util.Queue; +import java.util.concurrent.locks.LockSupport; +import java.util.stream.IntStream; + @BenchmarkMode(Mode.Throughput) @Fork( value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"} @@ -36,11 +42,27 @@ public class RSocketPerf { RSocket client; Closeable server; + Queue clientsQueue; + + @TearDown + public void tearDown() { + client.dispose(); + server.dispose(); + } + + @TearDown(Level.Iteration) + public void awaitToBeConsumed() { + while (!clientsQueue.isEmpty()) { + LockSupport.parkNanos(1000); + } + } + @Setup - public void setUp() { + public void setUp() throws NoSuchFieldException, IllegalAccessException { server = RSocketFactory.receive() + .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor( (setup, sendingSocket) -> Mono.just( @@ -75,16 +97,22 @@ public Flux requestChannel(Publisher payloads) { client = RSocketFactory.connect() + .singleSubscriberRequester() .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(LocalClientTransport.create("server")) .start() .block(); + + Field sendProcessorField = RSocketRequester.class.getDeclaredField("sendProcessor"); + sendProcessorField.setAccessible(true); + + clientsQueue = (Queue) sendProcessorField.get(client); } @Benchmark @SuppressWarnings("unchecked") - public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException { - PerfSubscriber subscriber = new PerfSubscriber(blackhole); + public PayloadsPerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException { + PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole); client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber); subscriber.latch.await(); @@ -92,8 +120,8 @@ public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedExcep } @Benchmark - public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException { - PerfSubscriber subscriber = new PerfSubscriber(blackhole); + public PayloadsPerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException { + PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole); client.requestResponse(PAYLOAD).subscribe(subscriber); subscriber.latch.await(); @@ -101,9 +129,9 @@ public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedExc } @Benchmark - public PerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole) + public PayloadsPerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole) throws InterruptedException { - PerfSubscriber subscriber = new PerfSubscriber(blackhole); + PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole); client.requestStream(PAYLOAD).subscribe(subscriber); subscriber.latch.await(); @@ -121,9 +149,9 @@ public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole } @Benchmark - public PerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole) + public PayloadsPerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole) throws InterruptedException { - PerfSubscriber subscriber = new PerfSubscriber(blackhole); + PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole); client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber); subscriber.latch.await(); diff --git a/rsocket-core/src/jmh/java/io/rsocket/StreamIdSupplierPerf.java b/benchmarks/src/main/java/io/rsocket/StreamIdSupplierPerf.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/StreamIdSupplierPerf.java rename to benchmarks/src/main/java/io/rsocket/StreamIdSupplierPerf.java diff --git a/rsocket-core/src/jmh/java/io/rsocket/frame/FrameHeaderFlyweightPerf.java b/benchmarks/src/main/java/io/rsocket/frame/FrameHeaderFlyweightPerf.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/frame/FrameHeaderFlyweightPerf.java rename to benchmarks/src/main/java/io/rsocket/frame/FrameHeaderFlyweightPerf.java diff --git a/rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java b/benchmarks/src/main/java/io/rsocket/frame/FrameTypePerf.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/frame/FrameTypePerf.java rename to benchmarks/src/main/java/io/rsocket/frame/FrameTypePerf.java diff --git a/rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java b/benchmarks/src/main/java/io/rsocket/frame/PayloadFlyweightPerf.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/frame/PayloadFlyweightPerf.java rename to benchmarks/src/main/java/io/rsocket/frame/PayloadFlyweightPerf.java diff --git a/rsocket-core/src/jmh/java/io/rsocket/metadata/WellKnownMimeTypePerf.java b/benchmarks/src/main/java/io/rsocket/metadata/WellKnownMimeTypePerf.java similarity index 100% rename from rsocket-core/src/jmh/java/io/rsocket/metadata/WellKnownMimeTypePerf.java rename to benchmarks/src/main/java/io/rsocket/metadata/WellKnownMimeTypePerf.java diff --git a/gradle.properties b/gradle.properties index b85cba325..13a89e30c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,3 +12,4 @@ # limitations under the License. # version=1.0.0-RC6 +perfBaselineVersion=1.0.0-RC5 diff --git a/rsocket-core/build.gradle b/rsocket-core/build.gradle index d62452619..ca2ae0e65 100644 --- a/rsocket-core/build.gradle +++ b/rsocket-core/build.gradle @@ -46,6 +46,4 @@ dependencies { testRuntimeOnly 'org.junit.vintage:junit-vintage-engine' } -description = "Core functionality for the RSocket library" - -apply from: 'jmh.gradle' +description = "Core functionality for the RSocket library" \ No newline at end of file diff --git a/rsocket-core/jmh.gradle b/rsocket-core/jmh.gradle deleted file mode 100644 index 2a2b4d7cd..000000000 --- a/rsocket-core/jmh.gradle +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -dependencies { - jmh configurations.api - jmh configurations.implementation - jmh 'org.openjdk.jmh:jmh-core' - jmh 'org.openjdk.jmh:jmh-generator-annprocess' - jmh 'io.projectreactor:reactor-test' - jmh project(':rsocket-transport-local') -} - -jmhCompileGeneratedClasses.enabled = false - -jmh { - includeTests = false - profilers = ['gc'] - resultFormat = 'JSON' - - jvmArgs = ['-XX:+UnlockCommercialFeatures', '-XX:+FlightRecorder'] - // jvmArgsAppend = ['-XX:+UseG1GC', '-Xms4g', '-Xmx4g'] -} - -jmhJar { - from project.configurations.jmh -} - -tasks.jmh.finalizedBy tasks.jmhReport - -jmhReport { - jmhResultPath = project.file('build/reports/jmh/results.json') - jmhReportOutput = project.file('build/reports/jmh') -} diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java index 490b00967..53ced9763 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java @@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket { .subscribe(null, this::handleSendProcessorError); Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer); - Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext); + Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized); this.connection .onClose() diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 0af4eb0a2..3e4361208 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -18,7 +18,6 @@ import io.netty.util.ReferenceCounted; import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue; -import io.rsocket.internal.jctools.queues.SpscUnboundedArrayQueue; import java.util.Objects; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -69,12 +68,12 @@ public final class UnboundedProcessor extends FluxProcessor public UnboundedProcessor() { this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); - this.priorityQueue = new SpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); + this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); } @Override public int getBufferSize() { - return Queues.capacity(this.queue); + return Integer.MAX_VALUE; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java deleted file mode 100644 index 9108f91f9..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; -import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.length; -import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.nextArrayOffset; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; -import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.soElement; - -import io.rsocket.internal.jctools.util.PortableJvmInfo; -import java.util.AbstractQueue; -import java.util.Iterator; - -abstract class BaseSpscLinkedArrayQueuePrePad extends AbstractQueue - implements IndexedQueueSizeUtil.IndexedQueue { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15; - // p16, p17; drop 2 longs, the cold fields act as buffer -} - -abstract class BaseSpscLinkedArrayQueueConsumerColdFields - extends BaseSpscLinkedArrayQueuePrePad { - protected long consumerMask; - protected E[] consumerBuffer; -} - -// $gen:ordered-fields -abstract class BaseSpscLinkedArrayQueueConsumerField - extends BaseSpscLinkedArrayQueueConsumerColdFields { - private static final long C_INDEX_OFFSET = - fieldOffset(BaseSpscLinkedArrayQueueConsumerField.class, "consumerIndex"); - - private volatile long consumerIndex; - - @Override - public final long lvConsumerIndex() { - return consumerIndex; - } - - final long lpConsumerIndex() { - return UNSAFE.getLong(this, C_INDEX_OFFSET); - } - - final void soConsumerIndex(long newValue) { - UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); - } -} - -abstract class BaseSpscLinkedArrayQueueL2Pad extends BaseSpscLinkedArrayQueueConsumerField { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15, p16, p17; -} - -// $gen:ordered-fields -abstract class BaseSpscLinkedArrayQueueProducerFields extends BaseSpscLinkedArrayQueueL2Pad { - private static final long P_INDEX_OFFSET = - fieldOffset(BaseSpscLinkedArrayQueueProducerFields.class, "producerIndex"); - - private volatile long producerIndex; - - @Override - public final long lvProducerIndex() { - return producerIndex; - } - - final void soProducerIndex(long newValue) { - UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); - } - - final long lpProducerIndex() { - return UNSAFE.getLong(this, P_INDEX_OFFSET); - } -} - -abstract class BaseSpscLinkedArrayQueueProducerColdFields - extends BaseSpscLinkedArrayQueueProducerFields { - protected long producerBufferLimit; - protected long producerMask; // fixed for chunked and unbounded - - protected E[] producerBuffer; -} - -abstract class BaseSpscLinkedArrayQueue extends BaseSpscLinkedArrayQueueProducerColdFields - implements MessagePassingQueue, QueueProgressIndicators { - - private static final Object JUMP = new Object(); - - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public final int size() { - return IndexedQueueSizeUtil.size(this); - } - - @Override - public final boolean isEmpty() { - return IndexedQueueSizeUtil.isEmpty(this); - } - - @Override - public String toString() { - return this.getClass().getName(); - } - - @Override - public long currentProducerIndex() { - return lvProducerIndex(); - } - - @Override - public long currentConsumerIndex() { - return lvConsumerIndex(); - } - - protected final void soNext(E[] curr, E[] next) { - long offset = nextArrayOffset(curr); - soElement(curr, offset, next); - } - - @SuppressWarnings("unchecked") - protected final E[] lvNextArrayAndUnlink(E[] curr) { - final long offset = nextArrayOffset(curr); - final E[] nextBuffer = (E[]) lvElement(curr, offset); - // prevent GC nepotism - soElement(curr, offset, null); - return nextBuffer; - } - - @Override - public boolean relaxedOffer(E e) { - return offer(e); - } - - @Override - public E relaxedPoll() { - return poll(); - } - - @Override - public E relaxedPeek() { - return peek(); - } - - @Override - public int drain(Consumer c) { - E e; - int i = 0; - while ((e = relaxedPoll()) != null) { - i++; - c.accept(e); - } - return i; - } - - @Override - public int fill(Supplier s) { - long result = - 0; // result is a long because we want to have a safepoint check at regular intervals - final int capacity = capacity(); - do { - final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); - if (filled == 0) { - return (int) result; - } - result += filled; - } while (result <= capacity); - return (int) result; - } - - @Override - public int drain(Consumer c, int limit) { - E e; - int i = 0; - for (; i < limit && (e = relaxedPoll()) != null; i++) { - c.accept(e); - } - return i; - } - - @Override - public int fill(Supplier s, int limit) { - if (null == s) throw new IllegalArgumentException("supplier is null"); - if (limit < 0) throw new IllegalArgumentException("limit is negative:" + limit); - if (limit == 0) return 0; - - for (int i = 0; i < limit; i++) { - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = producerBuffer; - final long index = lpProducerIndex(); - final long mask = producerMask; - final long offset = calcElementOffset(index, mask); - // expected hot path - if (index < producerBufferLimit) { - writeToQueue(buffer, s.get(), index, offset); - } else { - if (!offerColdPath(buffer, mask, index, offset, null, s)) { - return i; - } - } - } - return limit; - } - - @Override - public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { - int idleCounter = 0; - while (exit.keepRunning()) { - final E e = relaxedPoll(); - if (e == null) { - idleCounter = wait.idle(idleCounter); - continue; - } - idleCounter = 0; - c.accept(e); - } - } - - @Override - public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { - if (null == wait) throw new IllegalArgumentException("waiter is null"); - if (null == exit) throw new IllegalArgumentException("exit condition is null"); - while (exit.keepRunning()) { - while (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) != 0 && exit.keepRunning()) { - continue; - } - int idleCounter = 0; - while (exit.keepRunning() && fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { - idleCounter = wait.idle(idleCounter); - } - } - } - - /** - * {@inheritDoc} - * - *

This implementation is correct for single producer thread use only. - */ - @Override - public boolean offer(final E e) { - // Objects.requireNonNull(e); - if (null == e) { - throw new NullPointerException(); - } - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = producerBuffer; - final long index = lpProducerIndex(); - final long mask = producerMask; - final long offset = calcElementOffset(index, mask); - // expected hot path - if (index < producerBufferLimit) { - writeToQueue(buffer, e, index, offset); - return true; - } - return offerColdPath(buffer, mask, index, offset, e, null); - } - - abstract boolean offerColdPath( - E[] buffer, long mask, long pIndex, long offset, E v, Supplier s); - - /** - * {@inheritDoc} - * - *

This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - final long offset = calcElementOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - boolean isNextBuffer = e == JUMP; - if (null != e && !isNextBuffer) { - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - soElement(buffer, offset, null); - return (E) e; - } else if (isNextBuffer) { - return newBufferPoll(buffer, index); - } - - return null; - } - - /** - * {@inheritDoc} - * - *

This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E peek() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - final long offset = calcElementOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - if (e == JUMP) { - return newBufferPeek(buffer, index); - } - - return (E) e; - } - - final void linkOldToNew( - final long currIndex, - final E[] oldBuffer, - final long offset, - final E[] newBuffer, - final long offsetInNew, - final E e) { - soElement(newBuffer, offsetInNew, e); // StoreStore - // link to next buffer and add next indicator as element of old buffer - soNext(oldBuffer, newBuffer); - soElement(oldBuffer, offset, JUMP); - // index is visible after elements (isEmpty/poll ordering) - soProducerIndex(currIndex + 1); // this ensures atomic write of long on 32bit platforms - } - - final void writeToQueue(final E[] buffer, final E e, final long index, final long offset) { - soElement(buffer, offset, e); // StoreStore - soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms - } - - private E newBufferPeek(final E[] buffer, final long index) { - E[] nextBuffer = lvNextArrayAndUnlink(buffer); - consumerBuffer = nextBuffer; - final long mask = length(nextBuffer) - 2; - consumerMask = mask; - final long offset = calcElementOffset(index, mask); - return lvElement(nextBuffer, offset); // LoadLoad - } - - private E newBufferPoll(final E[] buffer, final long index) { - E[] nextBuffer = lvNextArrayAndUnlink(buffer); - consumerBuffer = nextBuffer; - final long mask = length(nextBuffer) - 2; - consumerMask = mask; - final long offset = calcElementOffset(index, mask); - final E n = lvElement(nextBuffer, offset); // LoadLoad - if (null == n) { - throw new IllegalStateException("new buffer must have at least one element"); - } else { - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - soElement(nextBuffer, offset, null); // StoreStore - return n; - } - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java deleted file mode 100644 index 9b17dd677..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.allocate; -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; - -import io.rsocket.internal.jctools.util.Pow2; - -/** - * An SPSC array queue which starts at initialCapacity and grows indefinitely in linked - * chunks of the initial size. The queue grows only when the current chunk is full and elements are - * not copied on resize, instead a link to the new chunk is stored in the old chunk for the consumer - * to follow.
- * - * @param - */ -public class SpscUnboundedArrayQueue extends BaseSpscLinkedArrayQueue { - - public SpscUnboundedArrayQueue(int chunkSize) { - int chunkCapacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16); - long mask = chunkCapacity - 1; - E[] buffer = allocate(chunkCapacity + 1); - producerBuffer = buffer; - producerMask = mask; - consumerBuffer = buffer; - consumerMask = mask; - producerBufferLimit = mask - 1; // we know it's all empty to start with - } - - @Override - final boolean offerColdPath( - E[] buffer, long mask, long pIndex, long offset, E v, Supplier s) { - // use a fixed lookahead step based on buffer capacity - final long lookAheadStep = (mask + 1) / 4; - long pBufferLimit = pIndex + lookAheadStep; - - // go around the buffer or add a new buffer - if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) { - producerBufferLimit = pBufferLimit - 1; // joy, there's plenty of room - writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); - } else if (null - == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // buffer is not full - writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); - } else { - // we got one slot left to write into, and we are not full. Need to link new buffer. - // allocate new buffer of same length - final E[] newBuffer = allocate((int) (mask + 2)); - producerBuffer = newBuffer; - producerBufferLimit = pIndex + mask - 1; - - linkOldToNew(pIndex, buffer, offset, newBuffer, offset, v == null ? s.get() : v); - } - return true; - } - - @Override - public int capacity() { - return UNBOUNDED_CAPACITY; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java deleted file mode 100644 index 50d2a326f..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import io.rsocket.internal.jctools.util.InternalAPI; - -/** Tagging interface to help testing */ -@InternalAPI -public interface SupportsIterator {} diff --git a/settings.gradle b/settings.gradle index 625633774..c88d23bbe 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,10 +17,12 @@ rootProject.name = 'rsocket-java' include 'rsocket-core' -include 'rsocket-examples' include 'rsocket-load-balancer' include 'rsocket-micrometer' include 'rsocket-test' include 'rsocket-transport-local' include 'rsocket-transport-netty' include 'rsocket-bom' + +include 'rsocket-examples' +include 'benchmarks'