From 08cefe0c6b447e0f39977e293397d35385dffc70 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 18 Nov 2019 11:51:27 +0200 Subject: [PATCH 1/6] prioritization Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/RSocketRequester.java | 2 +- .../rsocket/internal/UnboundedProcessor.java | 64 ++- .../queues/BaseSpscLinkedArrayQueue.java | 367 ++++++++++++++++++ .../queues/SpscUnboundedArrayQueue.java | 73 ++++ .../internal/UnboundedProcessorTest.java | 31 ++ 5 files changed, 529 insertions(+), 8 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java create mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java index 5590a9df0..94d0f42cf 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java @@ -124,7 +124,7 @@ class RSocketRequester implements RSocket { new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout); this.keepAliveFramesAcceptor = keepAliveHandler.start( - keepAliveSupport, sendProcessor::onNext, this::tryTerminateOnKeepAlive); + keepAliveSupport, sendProcessor::onNextPrioritized, this::tryTerminateOnKeepAlive); } else { keepAliveFramesAcceptor = null; } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index dfcc13a64..0af4eb0a2 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -18,6 +18,7 @@ import io.netty.util.ReferenceCounted; import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue; +import io.rsocket.internal.jctools.queues.SpscUnboundedArrayQueue; import java.util.Objects; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -56,6 +57,7 @@ public final class UnboundedProcessor extends FluxProcessor AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested"); final Queue queue; + final Queue priorityQueue; volatile boolean done; Throwable error; volatile CoreSubscriber actual; @@ -67,6 +69,7 @@ public final class UnboundedProcessor extends FluxProcessor public UnboundedProcessor() { this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); + this.priorityQueue = new SpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); } @Override @@ -84,6 +87,7 @@ void drainRegular(Subscriber a) { int missed = 1; final Queue q = queue; + final Queue pq = priorityQueue; for (; ; ) { @@ -93,10 +97,18 @@ void drainRegular(Subscriber a) { while (r != e) { boolean d = done; - T t = q.poll(); - boolean empty = t == null; + T t; + boolean empty; + + if (!pq.isEmpty()) { + t = pq.poll(); + empty = false; + } else { + t = q.poll(); + empty = t == null; + } - if (checkTerminated(d, empty, a, q)) { + if (checkTerminated(d, empty, a, q, pq)) { return; } @@ -110,7 +122,7 @@ void drainRegular(Subscriber a) { } if (r == e) { - if (checkTerminated(done, q.isEmpty(), a, q)) { + if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a, q, pq)) { return; } } @@ -130,11 +142,13 @@ void drainFused(Subscriber a) { int missed = 1; final Queue q = queue; + final Queue pq = priorityQueue; for (; ; ) { if (cancelled) { q.clear(); + pq.clear(); actual = null; return; } @@ -188,7 +202,8 @@ public void drain() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue q) { + boolean checkTerminated( + boolean d, boolean empty, Subscriber a, Queue q, Queue pq) { if (cancelled) { while (!q.isEmpty()) { T t = q.poll(); @@ -196,6 +211,12 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a, Queue release(t); } } + while (!pq.isEmpty()) { + T t = pq.poll(); + if (t != null) { + release(t); + } + } actual = null; return true; } @@ -237,6 +258,23 @@ public Context currentContext() { return actual != null ? actual.currentContext() : Context.empty(); } + public void onNextPrioritized(T t) { + if (done || cancelled) { + Operators.onNextDropped(t, currentContext()); + release(t); + return; + } + + if (!priorityQueue.offer(t)) { + Throwable ex = + Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()); + onError(Operators.onOperatorError(null, ex, t, currentContext())); + release(t); + return; + } + drain(); + } + @Override public void onNext(T t) { if (done || cancelled) { @@ -321,23 +359,29 @@ public void cancel() { @Override public T peek() { + if (!priorityQueue.isEmpty()) { + return priorityQueue.peek(); + } return queue.peek(); } @Override @Nullable public T poll() { + if (!priorityQueue.isEmpty()) { + return priorityQueue.poll(); + } return queue.poll(); } @Override public int size() { - return queue.size(); + return priorityQueue.size() + queue.size(); } @Override public boolean isEmpty() { - return queue.isEmpty(); + return priorityQueue.isEmpty() && queue.isEmpty(); } @Override @@ -348,6 +392,12 @@ public void clear() { release(t); } } + while (!priorityQueue.isEmpty()) { + T t = priorityQueue.poll(); + if (t != null) { + release(t); + } + } } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java new file mode 100644 index 000000000..9108f91f9 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java @@ -0,0 +1,367 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.internal.jctools.queues; + +import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; +import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.length; +import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.nextArrayOffset; +import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; +import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; +import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; +import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.soElement; + +import io.rsocket.internal.jctools.util.PortableJvmInfo; +import java.util.AbstractQueue; +import java.util.Iterator; + +abstract class BaseSpscLinkedArrayQueuePrePad extends AbstractQueue + implements IndexedQueueSizeUtil.IndexedQueue { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15; + // p16, p17; drop 2 longs, the cold fields act as buffer +} + +abstract class BaseSpscLinkedArrayQueueConsumerColdFields + extends BaseSpscLinkedArrayQueuePrePad { + protected long consumerMask; + protected E[] consumerBuffer; +} + +// $gen:ordered-fields +abstract class BaseSpscLinkedArrayQueueConsumerField + extends BaseSpscLinkedArrayQueueConsumerColdFields { + private static final long C_INDEX_OFFSET = + fieldOffset(BaseSpscLinkedArrayQueueConsumerField.class, "consumerIndex"); + + private volatile long consumerIndex; + + @Override + public final long lvConsumerIndex() { + return consumerIndex; + } + + final long lpConsumerIndex() { + return UNSAFE.getLong(this, C_INDEX_OFFSET); + } + + final void soConsumerIndex(long newValue) { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); + } +} + +abstract class BaseSpscLinkedArrayQueueL2Pad extends BaseSpscLinkedArrayQueueConsumerField { + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +// $gen:ordered-fields +abstract class BaseSpscLinkedArrayQueueProducerFields extends BaseSpscLinkedArrayQueueL2Pad { + private static final long P_INDEX_OFFSET = + fieldOffset(BaseSpscLinkedArrayQueueProducerFields.class, "producerIndex"); + + private volatile long producerIndex; + + @Override + public final long lvProducerIndex() { + return producerIndex; + } + + final void soProducerIndex(long newValue) { + UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); + } + + final long lpProducerIndex() { + return UNSAFE.getLong(this, P_INDEX_OFFSET); + } +} + +abstract class BaseSpscLinkedArrayQueueProducerColdFields + extends BaseSpscLinkedArrayQueueProducerFields { + protected long producerBufferLimit; + protected long producerMask; // fixed for chunked and unbounded + + protected E[] producerBuffer; +} + +abstract class BaseSpscLinkedArrayQueue extends BaseSpscLinkedArrayQueueProducerColdFields + implements MessagePassingQueue, QueueProgressIndicators { + + private static final Object JUMP = new Object(); + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public final int size() { + return IndexedQueueSizeUtil.size(this); + } + + @Override + public final boolean isEmpty() { + return IndexedQueueSizeUtil.isEmpty(this); + } + + @Override + public String toString() { + return this.getClass().getName(); + } + + @Override + public long currentProducerIndex() { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() { + return lvConsumerIndex(); + } + + protected final void soNext(E[] curr, E[] next) { + long offset = nextArrayOffset(curr); + soElement(curr, offset, next); + } + + @SuppressWarnings("unchecked") + protected final E[] lvNextArrayAndUnlink(E[] curr) { + final long offset = nextArrayOffset(curr); + final E[] nextBuffer = (E[]) lvElement(curr, offset); + // prevent GC nepotism + soElement(curr, offset, null); + return nextBuffer; + } + + @Override + public boolean relaxedOffer(E e) { + return offer(e); + } + + @Override + public E relaxedPoll() { + return poll(); + } + + @Override + public E relaxedPeek() { + return peek(); + } + + @Override + public int drain(Consumer c) { + E e; + int i = 0; + while ((e = relaxedPoll()) != null) { + i++; + c.accept(e); + } + return i; + } + + @Override + public int fill(Supplier s) { + long result = + 0; // result is a long because we want to have a safepoint check at regular intervals + final int capacity = capacity(); + do { + final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); + if (filled == 0) { + return (int) result; + } + result += filled; + } while (result <= capacity); + return (int) result; + } + + @Override + public int drain(Consumer c, int limit) { + E e; + int i = 0; + for (; i < limit && (e = relaxedPoll()) != null; i++) { + c.accept(e); + } + return i; + } + + @Override + public int fill(Supplier s, int limit) { + if (null == s) throw new IllegalArgumentException("supplier is null"); + if (limit < 0) throw new IllegalArgumentException("limit is negative:" + limit); + if (limit == 0) return 0; + + for (int i = 0; i < limit; i++) { + // local load of field to avoid repeated loads after volatile reads + final E[] buffer = producerBuffer; + final long index = lpProducerIndex(); + final long mask = producerMask; + final long offset = calcElementOffset(index, mask); + // expected hot path + if (index < producerBufferLimit) { + writeToQueue(buffer, s.get(), index, offset); + } else { + if (!offerColdPath(buffer, mask, index, offset, null, s)) { + return i; + } + } + } + return limit; + } + + @Override + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { + int idleCounter = 0; + while (exit.keepRunning()) { + final E e = relaxedPoll(); + if (e == null) { + idleCounter = wait.idle(idleCounter); + continue; + } + idleCounter = 0; + c.accept(e); + } + } + + @Override + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { + if (null == wait) throw new IllegalArgumentException("waiter is null"); + if (null == exit) throw new IllegalArgumentException("exit condition is null"); + while (exit.keepRunning()) { + while (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) != 0 && exit.keepRunning()) { + continue; + } + int idleCounter = 0; + while (exit.keepRunning() && fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { + idleCounter = wait.idle(idleCounter); + } + } + } + + /** + * {@inheritDoc} + * + *

This implementation is correct for single producer thread use only. + */ + @Override + public boolean offer(final E e) { + // Objects.requireNonNull(e); + if (null == e) { + throw new NullPointerException(); + } + // local load of field to avoid repeated loads after volatile reads + final E[] buffer = producerBuffer; + final long index = lpProducerIndex(); + final long mask = producerMask; + final long offset = calcElementOffset(index, mask); + // expected hot path + if (index < producerBufferLimit) { + writeToQueue(buffer, e, index, offset); + return true; + } + return offerColdPath(buffer, mask, index, offset, e, null); + } + + abstract boolean offerColdPath( + E[] buffer, long mask, long pIndex, long offset, E v, Supplier s); + + /** + * {@inheritDoc} + * + *

This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E poll() { + // local load of field to avoid repeated loads after volatile reads + final E[] buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final long mask = consumerMask; + final long offset = calcElementOffset(index, mask); + final Object e = lvElement(buffer, offset); // LoadLoad + boolean isNextBuffer = e == JUMP; + if (null != e && !isNextBuffer) { + soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms + soElement(buffer, offset, null); + return (E) e; + } else if (isNextBuffer) { + return newBufferPoll(buffer, index); + } + + return null; + } + + /** + * {@inheritDoc} + * + *

This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public E peek() { + final E[] buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final long mask = consumerMask; + final long offset = calcElementOffset(index, mask); + final Object e = lvElement(buffer, offset); // LoadLoad + if (e == JUMP) { + return newBufferPeek(buffer, index); + } + + return (E) e; + } + + final void linkOldToNew( + final long currIndex, + final E[] oldBuffer, + final long offset, + final E[] newBuffer, + final long offsetInNew, + final E e) { + soElement(newBuffer, offsetInNew, e); // StoreStore + // link to next buffer and add next indicator as element of old buffer + soNext(oldBuffer, newBuffer); + soElement(oldBuffer, offset, JUMP); + // index is visible after elements (isEmpty/poll ordering) + soProducerIndex(currIndex + 1); // this ensures atomic write of long on 32bit platforms + } + + final void writeToQueue(final E[] buffer, final E e, final long index, final long offset) { + soElement(buffer, offset, e); // StoreStore + soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms + } + + private E newBufferPeek(final E[] buffer, final long index) { + E[] nextBuffer = lvNextArrayAndUnlink(buffer); + consumerBuffer = nextBuffer; + final long mask = length(nextBuffer) - 2; + consumerMask = mask; + final long offset = calcElementOffset(index, mask); + return lvElement(nextBuffer, offset); // LoadLoad + } + + private E newBufferPoll(final E[] buffer, final long index) { + E[] nextBuffer = lvNextArrayAndUnlink(buffer); + consumerBuffer = nextBuffer; + final long mask = length(nextBuffer) - 2; + consumerMask = mask; + final long offset = calcElementOffset(index, mask); + final E n = lvElement(nextBuffer, offset); // LoadLoad + if (null == n) { + throw new IllegalStateException("new buffer must have at least one element"); + } else { + soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms + soElement(nextBuffer, offset, null); // StoreStore + return n; + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java new file mode 100644 index 000000000..9b17dd677 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.rsocket.internal.jctools.queues; + +import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.allocate; +import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; +import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; + +import io.rsocket.internal.jctools.util.Pow2; + +/** + * An SPSC array queue which starts at initialCapacity and grows indefinitely in linked + * chunks of the initial size. The queue grows only when the current chunk is full and elements are + * not copied on resize, instead a link to the new chunk is stored in the old chunk for the consumer + * to follow.
+ * + * @param + */ +public class SpscUnboundedArrayQueue extends BaseSpscLinkedArrayQueue { + + public SpscUnboundedArrayQueue(int chunkSize) { + int chunkCapacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16); + long mask = chunkCapacity - 1; + E[] buffer = allocate(chunkCapacity + 1); + producerBuffer = buffer; + producerMask = mask; + consumerBuffer = buffer; + consumerMask = mask; + producerBufferLimit = mask - 1; // we know it's all empty to start with + } + + @Override + final boolean offerColdPath( + E[] buffer, long mask, long pIndex, long offset, E v, Supplier s) { + // use a fixed lookahead step based on buffer capacity + final long lookAheadStep = (mask + 1) / 4; + long pBufferLimit = pIndex + lookAheadStep; + + // go around the buffer or add a new buffer + if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) { + producerBufferLimit = pBufferLimit - 1; // joy, there's plenty of room + writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); + } else if (null + == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // buffer is not full + writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); + } else { + // we got one slot left to write into, and we are not full. Need to link new buffer. + // allocate new buffer of same length + final E[] newBuffer = allocate((int) (mask + 2)); + producerBuffer = newBuffer; + producerBufferLimit = pIndex + mask - 1; + + linkOldToNew(pIndex, buffer, offset, newBuffer, offset, v == null ? s.get() : v); + } + return true; + } + + @Override + public int capacity() { + return UNBOUNDED_CAPACITY; + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java index 0dc7d9090..7bf975543 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java @@ -17,6 +17,7 @@ package io.rsocket.internal; import io.rsocket.Payload; +import io.rsocket.util.ByteBufPayload; import io.rsocket.util.EmptyPayload; import java.util.concurrent.CountDownLatch; import org.junit.Assert; @@ -82,6 +83,36 @@ public void testOnNextAfterSubscribe_1000() throws Exception { testOnNextAfterSubscribeN(1000); } + @Test + public void testPrioritizedSending() { + UnboundedProcessor processor = new UnboundedProcessor<>(); + + for (int i = 0; i < 1000; i++) { + processor.onNext(EmptyPayload.INSTANCE); + } + + processor.onNextPrioritized(ByteBufPayload.create("test")); + + Payload closestPayload = processor.next().block(); + + Assert.assertEquals(closestPayload.getDataUtf8(), "test"); + } + + @Test + public void testPrioritizedFused() { + UnboundedProcessor processor = new UnboundedProcessor<>(); + + for (int i = 0; i < 1000; i++) { + processor.onNext(EmptyPayload.INSTANCE); + } + + processor.onNextPrioritized(ByteBufPayload.create("test")); + + Payload closestPayload = processor.poll(); + + Assert.assertEquals(closestPayload.getDataUtf8(), "test"); + } + public void testOnNextAfterSubscribeN(int n) throws Exception { CountDownLatch latch = new CountDownLatch(n); UnboundedProcessor processor = new UnboundedProcessor<>(); From 111a3b8493dae12424d136933bfc3635d24ebc86 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 19 Nov 2019 09:55:22 +0200 Subject: [PATCH 2/6] provides prioritized sending for zero-stream frames; refactors benchmarks Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/RSocketResponder.java | 2 +- .../rsocket/internal/UnboundedProcessor.java | 5 +- .../queues/BaseSpscLinkedArrayQueue.java | 367 ------------------ .../queues/SpscUnboundedArrayQueue.java | 73 ---- .../jctools/queues/SupportsIterator.java | 20 - 5 files changed, 3 insertions(+), 464 deletions(-) delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java delete mode 100644 rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java index 490b00967..53ced9763 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketResponder.java @@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket { .subscribe(null, this::handleSendProcessorError); Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer); - Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext); + Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized); this.connection .onClose() diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 0af4eb0a2..3e4361208 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -18,7 +18,6 @@ import io.netty.util.ReferenceCounted; import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue; -import io.rsocket.internal.jctools.queues.SpscUnboundedArrayQueue; import java.util.Objects; import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -69,12 +68,12 @@ public final class UnboundedProcessor extends FluxProcessor public UnboundedProcessor() { this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); - this.priorityQueue = new SpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); + this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); } @Override public int getBufferSize() { - return Queues.capacity(this.queue); + return Integer.MAX_VALUE; } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java deleted file mode 100644 index 9108f91f9..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; -import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.length; -import static io.rsocket.internal.jctools.queues.LinkedArrayQueueUtil.nextArrayOffset; -import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE; -import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.soElement; - -import io.rsocket.internal.jctools.util.PortableJvmInfo; -import java.util.AbstractQueue; -import java.util.Iterator; - -abstract class BaseSpscLinkedArrayQueuePrePad extends AbstractQueue - implements IndexedQueueSizeUtil.IndexedQueue { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15; - // p16, p17; drop 2 longs, the cold fields act as buffer -} - -abstract class BaseSpscLinkedArrayQueueConsumerColdFields - extends BaseSpscLinkedArrayQueuePrePad { - protected long consumerMask; - protected E[] consumerBuffer; -} - -// $gen:ordered-fields -abstract class BaseSpscLinkedArrayQueueConsumerField - extends BaseSpscLinkedArrayQueueConsumerColdFields { - private static final long C_INDEX_OFFSET = - fieldOffset(BaseSpscLinkedArrayQueueConsumerField.class, "consumerIndex"); - - private volatile long consumerIndex; - - @Override - public final long lvConsumerIndex() { - return consumerIndex; - } - - final long lpConsumerIndex() { - return UNSAFE.getLong(this, C_INDEX_OFFSET); - } - - final void soConsumerIndex(long newValue) { - UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); - } -} - -abstract class BaseSpscLinkedArrayQueueL2Pad extends BaseSpscLinkedArrayQueueConsumerField { - long p0, p1, p2, p3, p4, p5, p6, p7; - long p10, p11, p12, p13, p14, p15, p16, p17; -} - -// $gen:ordered-fields -abstract class BaseSpscLinkedArrayQueueProducerFields extends BaseSpscLinkedArrayQueueL2Pad { - private static final long P_INDEX_OFFSET = - fieldOffset(BaseSpscLinkedArrayQueueProducerFields.class, "producerIndex"); - - private volatile long producerIndex; - - @Override - public final long lvProducerIndex() { - return producerIndex; - } - - final void soProducerIndex(long newValue) { - UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); - } - - final long lpProducerIndex() { - return UNSAFE.getLong(this, P_INDEX_OFFSET); - } -} - -abstract class BaseSpscLinkedArrayQueueProducerColdFields - extends BaseSpscLinkedArrayQueueProducerFields { - protected long producerBufferLimit; - protected long producerMask; // fixed for chunked and unbounded - - protected E[] producerBuffer; -} - -abstract class BaseSpscLinkedArrayQueue extends BaseSpscLinkedArrayQueueProducerColdFields - implements MessagePassingQueue, QueueProgressIndicators { - - private static final Object JUMP = new Object(); - - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public final int size() { - return IndexedQueueSizeUtil.size(this); - } - - @Override - public final boolean isEmpty() { - return IndexedQueueSizeUtil.isEmpty(this); - } - - @Override - public String toString() { - return this.getClass().getName(); - } - - @Override - public long currentProducerIndex() { - return lvProducerIndex(); - } - - @Override - public long currentConsumerIndex() { - return lvConsumerIndex(); - } - - protected final void soNext(E[] curr, E[] next) { - long offset = nextArrayOffset(curr); - soElement(curr, offset, next); - } - - @SuppressWarnings("unchecked") - protected final E[] lvNextArrayAndUnlink(E[] curr) { - final long offset = nextArrayOffset(curr); - final E[] nextBuffer = (E[]) lvElement(curr, offset); - // prevent GC nepotism - soElement(curr, offset, null); - return nextBuffer; - } - - @Override - public boolean relaxedOffer(E e) { - return offer(e); - } - - @Override - public E relaxedPoll() { - return poll(); - } - - @Override - public E relaxedPeek() { - return peek(); - } - - @Override - public int drain(Consumer c) { - E e; - int i = 0; - while ((e = relaxedPoll()) != null) { - i++; - c.accept(e); - } - return i; - } - - @Override - public int fill(Supplier s) { - long result = - 0; // result is a long because we want to have a safepoint check at regular intervals - final int capacity = capacity(); - do { - final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); - if (filled == 0) { - return (int) result; - } - result += filled; - } while (result <= capacity); - return (int) result; - } - - @Override - public int drain(Consumer c, int limit) { - E e; - int i = 0; - for (; i < limit && (e = relaxedPoll()) != null; i++) { - c.accept(e); - } - return i; - } - - @Override - public int fill(Supplier s, int limit) { - if (null == s) throw new IllegalArgumentException("supplier is null"); - if (limit < 0) throw new IllegalArgumentException("limit is negative:" + limit); - if (limit == 0) return 0; - - for (int i = 0; i < limit; i++) { - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = producerBuffer; - final long index = lpProducerIndex(); - final long mask = producerMask; - final long offset = calcElementOffset(index, mask); - // expected hot path - if (index < producerBufferLimit) { - writeToQueue(buffer, s.get(), index, offset); - } else { - if (!offerColdPath(buffer, mask, index, offset, null, s)) { - return i; - } - } - } - return limit; - } - - @Override - public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { - int idleCounter = 0; - while (exit.keepRunning()) { - final E e = relaxedPoll(); - if (e == null) { - idleCounter = wait.idle(idleCounter); - continue; - } - idleCounter = 0; - c.accept(e); - } - } - - @Override - public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { - if (null == wait) throw new IllegalArgumentException("waiter is null"); - if (null == exit) throw new IllegalArgumentException("exit condition is null"); - while (exit.keepRunning()) { - while (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) != 0 && exit.keepRunning()) { - continue; - } - int idleCounter = 0; - while (exit.keepRunning() && fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0) { - idleCounter = wait.idle(idleCounter); - } - } - } - - /** - * {@inheritDoc} - * - *

This implementation is correct for single producer thread use only. - */ - @Override - public boolean offer(final E e) { - // Objects.requireNonNull(e); - if (null == e) { - throw new NullPointerException(); - } - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = producerBuffer; - final long index = lpProducerIndex(); - final long mask = producerMask; - final long offset = calcElementOffset(index, mask); - // expected hot path - if (index < producerBufferLimit) { - writeToQueue(buffer, e, index, offset); - return true; - } - return offerColdPath(buffer, mask, index, offset, e, null); - } - - abstract boolean offerColdPath( - E[] buffer, long mask, long pIndex, long offset, E v, Supplier s); - - /** - * {@inheritDoc} - * - *

This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E poll() { - // local load of field to avoid repeated loads after volatile reads - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - final long offset = calcElementOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - boolean isNextBuffer = e == JUMP; - if (null != e && !isNextBuffer) { - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - soElement(buffer, offset, null); - return (E) e; - } else if (isNextBuffer) { - return newBufferPoll(buffer, index); - } - - return null; - } - - /** - * {@inheritDoc} - * - *

This implementation is correct for single consumer thread use only. - */ - @SuppressWarnings("unchecked") - @Override - public E peek() { - final E[] buffer = consumerBuffer; - final long index = lpConsumerIndex(); - final long mask = consumerMask; - final long offset = calcElementOffset(index, mask); - final Object e = lvElement(buffer, offset); // LoadLoad - if (e == JUMP) { - return newBufferPeek(buffer, index); - } - - return (E) e; - } - - final void linkOldToNew( - final long currIndex, - final E[] oldBuffer, - final long offset, - final E[] newBuffer, - final long offsetInNew, - final E e) { - soElement(newBuffer, offsetInNew, e); // StoreStore - // link to next buffer and add next indicator as element of old buffer - soNext(oldBuffer, newBuffer); - soElement(oldBuffer, offset, JUMP); - // index is visible after elements (isEmpty/poll ordering) - soProducerIndex(currIndex + 1); // this ensures atomic write of long on 32bit platforms - } - - final void writeToQueue(final E[] buffer, final E e, final long index, final long offset) { - soElement(buffer, offset, e); // StoreStore - soProducerIndex(index + 1); // this ensures atomic write of long on 32bit platforms - } - - private E newBufferPeek(final E[] buffer, final long index) { - E[] nextBuffer = lvNextArrayAndUnlink(buffer); - consumerBuffer = nextBuffer; - final long mask = length(nextBuffer) - 2; - consumerMask = mask; - final long offset = calcElementOffset(index, mask); - return lvElement(nextBuffer, offset); // LoadLoad - } - - private E newBufferPoll(final E[] buffer, final long index) { - E[] nextBuffer = lvNextArrayAndUnlink(buffer); - consumerBuffer = nextBuffer; - final long mask = length(nextBuffer) - 2; - consumerMask = mask; - final long offset = calcElementOffset(index, mask); - final E n = lvElement(nextBuffer, offset); // LoadLoad - if (null == n) { - throw new IllegalStateException("new buffer must have at least one element"); - } else { - soConsumerIndex(index + 1); // this ensures correctness on 32bit platforms - soElement(nextBuffer, offset, null); // StoreStore - return n; - } - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java deleted file mode 100644 index 9b17dd677..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SpscUnboundedArrayQueue.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.allocate; -import static io.rsocket.internal.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset; -import static io.rsocket.internal.jctools.util.UnsafeRefArrayAccess.lvElement; - -import io.rsocket.internal.jctools.util.Pow2; - -/** - * An SPSC array queue which starts at initialCapacity and grows indefinitely in linked - * chunks of the initial size. The queue grows only when the current chunk is full and elements are - * not copied on resize, instead a link to the new chunk is stored in the old chunk for the consumer - * to follow.
- * - * @param - */ -public class SpscUnboundedArrayQueue extends BaseSpscLinkedArrayQueue { - - public SpscUnboundedArrayQueue(int chunkSize) { - int chunkCapacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16); - long mask = chunkCapacity - 1; - E[] buffer = allocate(chunkCapacity + 1); - producerBuffer = buffer; - producerMask = mask; - consumerBuffer = buffer; - consumerMask = mask; - producerBufferLimit = mask - 1; // we know it's all empty to start with - } - - @Override - final boolean offerColdPath( - E[] buffer, long mask, long pIndex, long offset, E v, Supplier s) { - // use a fixed lookahead step based on buffer capacity - final long lookAheadStep = (mask + 1) / 4; - long pBufferLimit = pIndex + lookAheadStep; - - // go around the buffer or add a new buffer - if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) { - producerBufferLimit = pBufferLimit - 1; // joy, there's plenty of room - writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); - } else if (null - == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // buffer is not full - writeToQueue(buffer, v == null ? s.get() : v, pIndex, offset); - } else { - // we got one slot left to write into, and we are not full. Need to link new buffer. - // allocate new buffer of same length - final E[] newBuffer = allocate((int) (mask + 2)); - producerBuffer = newBuffer; - producerBufferLimit = pIndex + mask - 1; - - linkOldToNew(pIndex, buffer, offset, newBuffer, offset, v == null ? s.get() : v); - } - return true; - } - - @Override - public int capacity() { - return UNBOUNDED_CAPACITY; - } -} diff --git a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java b/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java deleted file mode 100644 index 50d2a326f..000000000 --- a/rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/SupportsIterator.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.internal.jctools.queues; - -import io.rsocket.internal.jctools.util.InternalAPI; - -/** Tagging interface to help testing */ -@InternalAPI -public interface SupportsIterator {} From 37142eaea37f4e7871821e7ca3a9a7b53b15182f Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 19 Nov 2019 13:21:31 +0200 Subject: [PATCH 3/6] provides minor cleanups Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../io/rsocket/internal/UnboundedProcessor.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 3e4361208..cbe2eb19b 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -146,8 +146,7 @@ void drainFused(Subscriber a) { for (; ; ) { if (cancelled) { - q.clear(); - pq.clear(); + clear(); actual = null; return; } @@ -204,18 +203,7 @@ public void drain() { boolean checkTerminated( boolean d, boolean empty, Subscriber a, Queue q, Queue pq) { if (cancelled) { - while (!q.isEmpty()) { - T t = q.poll(); - if (t != null) { - release(t); - } - } - while (!pq.isEmpty()) { - T t = pq.poll(); - if (t != null) { - release(t); - } - } + clear(); actual = null; return true; } From 922effdc271f2f908578e198439a11182ab6ad3b Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Sat, 7 Dec 2019 19:07:45 +0200 Subject: [PATCH 4/6] more fixes Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/RSocketRequester.java | 2 +- .../rsocket/internal/UnboundedProcessor.java | 22 +++++-------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java index 94d0f42cf..ed13e9f6e 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketRequester.java @@ -411,7 +411,7 @@ private Mono handleMetadataPush(Payload payload) { MetadataPushFrameFlyweight.encode(allocator, payload.sliceMetadata().retain()); payload.release(); - sendProcessor.onNext(metadataPushFrame); + sendProcessor.onNextPrioritized(metadataPushFrame); }); } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index cbe2eb19b..ec0b90804 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -107,7 +107,7 @@ void drainRegular(Subscriber a) { empty = t == null; } - if (checkTerminated(d, empty, a, q, pq)) { + if (checkTerminated(d, empty, a)) { return; } @@ -121,7 +121,7 @@ void drainRegular(Subscriber a) { } if (r == e) { - if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a, q, pq)) { + if (checkTerminated(done, q.isEmpty() && pq.isEmpty(), a)) { return; } } @@ -140,9 +140,6 @@ void drainRegular(Subscriber a) { void drainFused(Subscriber a) { int missed = 1; - final Queue q = queue; - final Queue pq = priorityQueue; - for (; ; ) { if (cancelled) { @@ -201,7 +198,7 @@ public void drain() { } boolean checkTerminated( - boolean d, boolean empty, Subscriber a, Queue q, Queue pq) { + boolean d, boolean empty, Subscriber a) { if (cancelled) { clear(); actual = null; @@ -344,19 +341,12 @@ public void cancel() { } } - @Override - public T peek() { - if (!priorityQueue.isEmpty()) { - return priorityQueue.peek(); - } - return queue.peek(); - } - @Override @Nullable public T poll() { - if (!priorityQueue.isEmpty()) { - return priorityQueue.poll(); + Queue pq = this.priorityQueue; + if (!pq.isEmpty()) { + return pq.poll(); } return queue.poll(); } From 70ec02efea34e0f35cb7493e2ebe92770e12fdda Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Sat, 7 Dec 2019 19:17:56 +0200 Subject: [PATCH 5/6] fixes format Signed-off-by: Oleh Dokuka --- .../src/main/java/io/rsocket/internal/UnboundedProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index ec0b90804..fe664e843 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -197,8 +197,7 @@ public void drain() { } } - boolean checkTerminated( - boolean d, boolean empty, Subscriber a) { + boolean checkTerminated(boolean d, boolean empty, Subscriber a) { if (cancelled) { clear(); actual = null; From 39a74d70d9b0af14ac698c2b935c8bfff9f99ce8 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 16 Dec 2019 22:56:37 +0200 Subject: [PATCH 6/6] fixes Signed-off-by: Oleh Dokuka --- build.gradle | 5 ++++- .../src/main/java/io/rsocket/internal/UnicastMonoEmpty.java | 6 ++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index e834e21bd..7f7a3dea2 100644 --- a/build.gradle +++ b/build.gradle @@ -88,8 +88,11 @@ subprojects { repositories { mavenCentral() - if (version.endsWith('BUILD-SNAPSHOT') || project.hasProperty('platformVersion')) { + if (version.endsWith('SNAPSHOT') || project.hasProperty('platformVersion')) { maven { url 'http://repo.spring.io/libs-snapshot' } + maven { + url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' + } } } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoEmpty.java b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoEmpty.java index eb8a1aa11..64a7d4422 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoEmpty.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoEmpty.java @@ -9,10 +9,8 @@ import reactor.util.annotation.Nullable; /** - * Represents an empty publisher which only calls onSubscribe and onComplete. - * - *

This Publisher is effectively stateless and only a single instance exists. Use the {@link - * #instance()} method to obtain a properly type-parametrized view of it. + * Represents an empty publisher which only calls onSubscribe and onComplete and allows only a + * single subscriber. * * @see Reactive-Streams-Commons */