Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provides Prioritised delivering of Zero Streams Frames #718

Merged
merged 6 commits into from
Feb 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ subprojects {
repositories {
mavenCentral()

if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) {
if (version.endsWith('SNAPSHOT') || project.hasProperty('platformVersion')) {
maven { url 'http://repo.spring.io/libs-snapshot' }
maven {
url 'https://oss.jfrog.org/artifactory/oss-snapshot-local'
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class RSocketRequester implements RSocket {
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
this.keepAliveFramesAcceptor =
keepAliveHandler.start(
keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive);
keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should tighten this up in the spec, whether it's stream 0 prioritised or KEEPALIVE and LEASE only as you have implemented. I'll generally defer to @robertroeser for this, but put my thoughts on the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I have already created an issue on that!

} else {
keepAliveFramesAcceptor = null;
}
Expand Down Expand Up @@ -411,7 +411,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain());
payload.release();

sendProcessor.onNext(metadataPushFrame);
sendProcessor.onNextPrioritized(metadataPushFrame);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket {
.subscribe(null, this::handleSendProcessorError);

Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
.onClose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");

final Queue<T> queue;
final Queue<T> priorityQueue;
volatile boolean done;
Throwable error;
volatile CoreSubscriber<? super T> actual;
Expand All @@ -67,11 +68,12 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>

public UnboundedProcessor() {
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
}

@Override
public int getBufferSize() {
return Queues.capacity(this.queue);
return Integer.MAX_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

}

@Override
Expand All @@ -84,6 +86,7 @@ void drainRegular(Subscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;
final Queue<T> pq = priorityQueue;

for (; ; ) {

Expand All @@ -93,10 +96,18 @@ void drainRegular(Subscriber<? super T> a) {
while (r != e) {
boolean d = done;

T t = q.poll();
boolean empty = t == null;
T t;
boolean empty;

if (!pq.isEmpty()) {
t = pq.poll();
empty = false;
} else {
t = q.poll();
empty = t == null;
}

if (checkTerminated(d, empty, a, q)) {
if (checkTerminated(d, empty, a)) {
return;
}

Expand All @@ -110,7 +121,7 @@ void drainRegular(Subscriber<? super T> a) {
}

if (r == e) {
if (checkTerminated(done, q.isEmpty(), a, q)) {
if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a)) {
return;
}
}
Expand All @@ -129,12 +140,10 @@ void drainRegular(Subscriber<? super T> a) {
void drainFused(Subscriber<? super T> a) {
int missed = 1;

final Queue<T> q = queue;

for (; ; ) {

if (cancelled) {
q.clear();
clear();
actual = null;
return;
}
Expand Down Expand Up @@ -188,14 +197,9 @@ public void drain() {
}
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q) {
boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a) {
if (cancelled) {
while (!q.isEmpty()) {
T t = q.poll();
if (t != null) {
release(t);
}
}
clear();
actual = null;
return true;
}
Expand Down Expand Up @@ -237,6 +241,23 @@ public Context currentContext() {
return actual != null ? actual.currentContext() : Context.empty();
}

public void onNextPrioritized(T t) {
if (done || cancelled) {
Operators.onNextDropped(t, currentContext());
release(t);
return;
}

if (!priorityQueue.offer(t)) {
Throwable ex =
Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext());
onError(Operators.onOperatorError(null, ex, t, currentContext()));
release(t);
return;
}
drain();
}

@Override
public void onNext(T t) {
if (done || cancelled) {
Expand Down Expand Up @@ -319,25 +340,24 @@ public void cancel() {
}
}

@Override
public T peek() {
return queue.peek();
}

@Override
@Nullable
public T poll() {
Queue<T> pq = this.priorityQueue;
if (!pq.isEmpty()) {
return pq.poll();
}
return queue.poll();
}

@Override
public int size() {
return queue.size();
return priorityQueue.size() + queue.size();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
return priorityQueue.isEmpty() && queue.isEmpty();
}

@Override
Expand All @@ -348,6 +368,12 @@ public void clear() {
release(t);
}
}
while (!priorityQueue.isEmpty()) {
T t = priorityQueue.poll();
if (t != null) {
release(t);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
import reactor.util.annotation.Nullable;

/**
* Represents an empty publisher which only calls onSubscribe and onComplete.
*
* <p>This Publisher is effectively stateless and only a single instance exists. Use the {@link
* #instance()} method to obtain a properly type-parametrized view of it.
* Represents an empty publisher which only calls onSubscribe and onComplete and allows only a
* single subscriber.
*
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.rsocket.internal;

import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.EmptyPayload;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
Expand Down Expand Up @@ -82,6 +83,36 @@ public void testOnNextAfterSubscribe_1000() throws Exception {
testOnNextAfterSubscribeN(1000);
}

@Test
public void testPrioritizedSending() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));

Payload closestPayload = processor.next().block();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
}

@Test
public void testPrioritizedFused() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));

Payload closestPayload = processor.poll();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
}

public void testOnNextAfterSubscribeN(int n) throws Exception {
CountDownLatch latch = new CountDownLatch(n);
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
Expand Down