Skip to content

Commit

Permalink
Provides Prioritised delivering of Zero Streams Frames (#718)
Browse files Browse the repository at this point in the history
* prioritization

Signed-off-by: Oleh Dokuka <[email protected]>

* provides prioritized sending for zero-stream frames; refactors benchmarks

Signed-off-by: Oleh Dokuka <[email protected]>

* provides minor cleanups
Signed-off-by: Oleh Dokuka <[email protected]>

Signed-off-by: Oleh Dokuka <[email protected]>

* more fixes

Signed-off-by: Oleh Dokuka <[email protected]>

* fixes format

Signed-off-by: Oleh Dokuka <[email protected]>

* fixes

Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka authored Feb 22, 2020
1 parent 3f5b304 commit f689f54
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 50 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ subprojects {
repositories {
mavenCentral()

if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) {
if (version.endsWith('SNAPSHOT') || project.hasProperty('platformVersion')) {
maven { url 'http://repo.spring.io/libs-snapshot' }
maven {
url 'https://oss.jfrog.org/artifactory/oss-snapshot-local'
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class RSocketRequester implements RSocket {
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
this.keepAliveFramesAcceptor =
keepAliveHandler.start(
keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive);
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);
} else {
keepAliveFramesAcceptor = null;
}
Expand Down Expand Up @@ -411,7 +411,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
payload.release();

sendProcessor.onNext(metadataPushFrame);
sendProcessor.onNextPrioritized(metadataPushFrame);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket {
.subscribe(null, this::handleSendProcessorError);

Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
.onClose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");

final Queue<T> queue;
final Queue<T> priorityQueue;
volatile boolean done;
Throwable error;
volatile CoreSubscriber<? super T> actual;
Expand All @@ -67,11 +68,12 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>

public UnboundedProcessor() {
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
}

@Override
public int getBufferSize() {
return Queues.capacity(this.queue);
return Integer.MAX_VALUE;
}

@Override
Expand All @@ -84,6 +86,7 @@ void drainRegular(Subscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;
final Queue<T> pq = priorityQueue;

for (; ; ) {

Expand All @@ -93,10 +96,18 @@ void drainRegular(Subscriber<? super T> a) {
while (r != e) {
boolean d = done;

T t = q.poll();
boolean empty = t == null;
T t;
boolean empty;

if (!pq.isEmpty()) {
t = pq.poll();
empty = false;
} else {
t = q.poll();
empty = t == null;
}

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(d, empty, a)) {
return;
}

Expand All @@ -110,7 +121,7 @@ void drainRegular(Subscriber<? super T> a) {
}

if (r == e) {
if (checkTerminated(done, q.isEmpty(), a, q)) {
if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a)) {
return;
}
}
Expand All @@ -129,12 +140,10 @@ void drainRegular(Subscriber<? super T> a) {
void drainFused(Subscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;

for (; ; ) {

if (cancelled) {
q.clear();
clear();
actual = null;
return;
}
Expand Down Expand Up @@ -188,14 +197,9 @@ public void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a) {
if (cancelled) {
while (!q.isEmpty()) {
T t = q.poll();
if (t != null) {
release(t);
}
}
clear();
actual = null;
return true;
}
Expand Down Expand Up @@ -237,6 +241,23 @@ public Context currentContext() {
return actual != null ? actual.currentContext() : Context.empty();
}

public void onNextPrioritized(T t) {
if (done || cancelled) {
Operators.onNextDropped(t, currentContext());
release(t);
return;
}

if (!priorityQueue.offer(t)) {
Throwable ex =
Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext());
onError(Operators.onOperatorError(null, ex, t, currentContext()));
release(t);
return;
}
drain();
}

@Override
public void onNext(T t) {
if (done || cancelled) {
Expand Down Expand Up @@ -319,25 +340,24 @@ public void cancel() {
}
}

@Override
public T peek() {
return queue.peek();
}

@Override
@Nullable
public T poll() {
Queue<T> pq = this.priorityQueue;
if (!pq.isEmpty()) {
return pq.poll();
}
return queue.poll();
}

@Override
public int size() {
return queue.size();
return priorityQueue.size() + queue.size();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
return priorityQueue.isEmpty() && queue.isEmpty();
}

@Override
Expand All @@ -348,6 +368,12 @@ public void clear() {
release(t);
}
}
while (!priorityQueue.isEmpty()) {
T t = priorityQueue.poll();
if (t != null) {
release(t);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
import reactor.util.annotation.Nullable;

/**
* Represents an empty publisher which only calls onSubscribe and onComplete.
*
* <p>This Publisher is effectively stateless and only a single instance exists. Use the {@link
* #instance()} method to obtain a properly type-parametrized view of it.
* Represents an empty publisher which only calls onSubscribe and onComplete and allows only a
* single subscriber.
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.rsocket.internal;

import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.EmptyPayload;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
Expand Down Expand Up @@ -82,6 +83,36 @@ public void testOnNextAfterSubscribe_1000() throws Exception {
testOnNextAfterSubscribeN(1000);
}

@Test
public void testPrioritizedSending() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));

Payload closestPayload = processor.next().block();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
}

@Test
public void testPrioritizedFused() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));

Payload closestPayload = processor.poll();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
}

public void testOnNextAfterSubscribeN(int n) throws Exception {
CountDownLatch latch = new CountDownLatch(n);
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
Expand Down

0 comments on commit f689f54

Please sign in to comment.