Skip to content

Commit

Permalink
provides prioritized sending for zero-stream frames; refactors benchm…
Browse files Browse the repository at this point in the history
…arks

Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Nov 19, 2019
1 parent 48a7123 commit 0aad6ea
Show file tree
Hide file tree
Showing 19 changed files with 230 additions and 529 deletions.
163 changes: 163 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
apply plugin: 'java'
apply plugin: 'idea'

configurations {
current
baseline {
resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
}
}

dependencies {
// Use the baseline to avoid using new APIs in the benchmarks
compileOnly "io.rsocket:rsocket-core:${perfBaselineVersion}"
compileOnly "io.rsocket:rsocket-transport-local:${perfBaselineVersion}"

implementation "org.openjdk.jmh:jmh-core:1.21"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:1.21"

current project(':rsocket-core')
current project(':rsocket-transport-local')
baseline "io.rsocket:rsocket-core:${perfBaselineVersion}", {
changing = true
}
baseline "io.rsocket:rsocket-transport-local:${perfBaselineVersion}", {
changing = true
}
}

task jmhProfilers(type: JavaExec, description:'Lists the available profilers for the jmh task', group: 'Development') {
classpath = sourceSets.main.runtimeClasspath
main = 'org.openjdk.jmh.Main'
args '-lprof'
}

task jmh(type: JmhExecTask, description: 'Executing JMH benchmarks') {
classpath = sourceSets.main.runtimeClasspath + configurations.current
}

task jmhBaseline(type: JmhExecTask, description: 'Executing JMH baseline benchmarks') {
classpath = sourceSets.main.runtimeClasspath + configurations.baseline
}

class JmhExecTask extends JavaExec {

private String include;
private String fullInclude;
private String exclude;
private String format = "json";
private String profilers;
private String jmhJvmArgs;
private String verify;

public JmhExecTask() {
super();
}

public String getInclude() {
return include;
}

@Option(option = "include", description="configure bench inclusion using substring")
public void setInclude(String include) {
this.include = include;
}

public String getFullInclude() {
return fullInclude;
}

@Option(option = "fullInclude", description = "explicitly configure bench inclusion using full JMH style regexp")
public void setFullInclude(String fullInclude) {
this.fullInclude = fullInclude;
}

public String getExclude() {
return exclude;
}

@Option(option = "exclude", description = "explicitly configure bench exclusion using full JMH style regexp")
public void setExclude(String exclude) {
this.exclude = exclude;
}

public String getFormat() {
return format;
}

@Option(option = "format", description = "configure report format")
public void setFormat(String format) {
this.format = format;
}

public String getProfilers() {
return profilers;
}

@Option(option = "profilers", description = "configure jmh profiler(s) to use, comma separated")
public void setProfilers(String profilers) {
this.profilers = profilers;
}

public String getJmhJvmArgs() {
return jmhJvmArgs;
}

@Option(option = "jvmArgs", description = "configure additional JMH JVM arguments, comma separated")
public void setJmhJvmArgs(String jvmArgs) {
this.jmhJvmArgs = jvmArgs;
}

public String getVerify() {
return verify;
}

@Option(option = "verify", description = "run in verify mode")
public void setVerify(String verify) {
this.verify = verify;
}

@TaskAction
public void exec() {
setMain("org.openjdk.jmh.Main");
File resultFile = getProject().file("build/reports/" + getName() + "/result." + format);

if (include != null) {
args(".*" + include + ".*");
}
else if (fullInclude != null) {
args(fullInclude);
}

if(exclude != null) {
args("-e", exclude);
}
if(verify != null) { // execute benchmarks with the minimum amount of execution (only to check if they are working)
System.out.println("Running in verify mode");
args("-f", 1);
args("-wi", 1);
args("-i", 1);
}
args("-foe", "true"); //fail-on-error
args("-v", "NORMAL"); //verbosity [SILENT, NORMAL, EXTRA]
if(profilers != null) {
for (String prof : profilers.split(",")) {
args("-prof", prof);
}
}
args("-jvmArgsPrepend", "-Xmx3072m");
args("-jvmArgsPrepend", "-Xms3072m");
if(jmhJvmArgs != null) {
for(String jvmArg : jmhJvmArgs.split(" ")) {
args("-jvmArgsPrepend", jvmArg);
}
}
args("-rf", format);
args("-rff", resultFile);

System.out.println("\nExecuting JMH with: " + getArgs() + "\n");
resultFile.getParentFile().mkdirs();

super.exec();
}
}
16 changes: 16 additions & 0 deletions benchmarks/src/main/java/io/rsocket/PayloadsPerfSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.rsocket;

import org.openjdk.jmh.infra.Blackhole;

public class PayloadsPerfSubscriber extends PerfSubscriber<Payload> {

public PayloadsPerfSubscriber(Blackhole blackhole) {
super(blackhole);
}

@Override
public void onNext(Payload payload) {
payload.release();
super.onNext(payload);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.rsocket;

import java.util.concurrent.CountDownLatch;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

public class PerfSubscriber implements CoreSubscriber<Payload> {
import java.util.concurrent.CountDownLatch;

public class PerfSubscriber<T> implements CoreSubscriber<T> {

final CountDownLatch latch = new CountDownLatch(1);
public final CountDownLatch latch = new CountDownLatch(1);
final Blackhole blackhole;

Subscription s;
Expand All @@ -23,8 +24,7 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(Payload payload) {
payload.release();
public void onNext(T payload) {
blackhole.consume(payload);
s.request(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
import io.rsocket.util.EmptyPayload;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.Field;
import java.util.Queue;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;

@BenchmarkMode(Mode.Throughput)
@Fork(
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
Expand All @@ -36,11 +42,27 @@ public class RSocketPerf {

RSocket client;
Closeable server;
Queue clientsQueue;

@TearDown
public void tearDown() {
client.dispose();
server.dispose();
}

@TearDown(Level.Iteration)
public void awaitToBeConsumed() {
while (!clientsQueue.isEmpty()) {
LockSupport.parkNanos(1000);
}
}


@Setup
public void setUp() {
public void setUp() throws NoSuchFieldException, IllegalAccessException {
server =
RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(
(setup, sendingSocket) ->
Mono.just(
Expand Down Expand Up @@ -75,35 +97,41 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

client =
RSocketFactory.connect()
.singleSubscriberRequester()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(LocalClientTransport.create("server"))
.start()
.block();

Field sendProcessorField = RSocketRequester.class.getDeclaredField("sendProcessor");
sendProcessorField.setAccessible(true);

clientsQueue = (Queue) sendProcessorField.get(client);
}

@Benchmark
@SuppressWarnings("unchecked")
public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
public PayloadsPerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
public PayloadsPerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
client.requestResponse(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole)
public PayloadsPerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
client.requestStream(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

Expand All @@ -121,9 +149,9 @@ public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole
}

@Benchmark
public PerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole)
public PayloadsPerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
subscriber.latch.await();

Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
# limitations under the License.
#
version=1.0.0-RC6
perfBaselineVersion=1.0.0-RC5
4 changes: 1 addition & 3 deletions rsocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,4 @@ dependencies {
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
}

description = "Core functionality for the RSocket library"

apply from: 'jmh.gradle'
description = "Core functionality for the RSocket library"
46 changes: 0 additions & 46 deletions rsocket-core/jmh.gradle

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket {
.subscribe(null, this::handleSendProcessorError);

Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
.onClose()
Expand Down
Loading

0 comments on commit 0aad6ea

Please sign in to comment.