Skip to content

Commit

Permalink
Implement QuicStreamChannel.bytesBeforeUnwritable() (java-native-acce…
Browse files Browse the repository at this point in the history
…ss#264)

Motivation:

Some users may depend on QuicStreamChannel.bytesBeforeUnwritable() to make decisions on how much they will try to write.

Modifications:

- Add implementation of QuicStreamChannel.bytesBeforeUnwritable() by keep track of the stream capacity
- Add unit test

Result:

Be able to depend on QuicStreamChannel.bytesBeforeUnwritable()
  • Loading branch information
normanmaurer authored May 5, 2021
1 parent d08d177 commit 22dffdf
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 49 deletions.
9 changes: 9 additions & 0 deletions src/main/c/netty_quic_quiche.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,14 @@ static jlong netty_quiche_conn_readable(JNIEnv* env, jclass clazz, jlong conn) {
return (jlong) iter;
}

static jlong netty_quiche_conn_writable(JNIEnv* env, jclass clazz, jlong conn) {
quiche_stream_iter* iter = quiche_conn_writable((quiche_conn *) conn);
if (iter == NULL) {
return -1;
}
return (jlong) iter;
}

static void netty_quiche_stream_iter_free(JNIEnv* env, jclass clazz, jlong iter) {
quiche_stream_iter_free((quiche_stream_iter*) iter);
}
Expand Down Expand Up @@ -500,6 +508,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "quiche_conn_timeout_as_nanos", "(J)J", (void *) netty_quiche_conn_timeout_as_nanos },
{ "quiche_conn_on_timeout", "(J)V", (void *) netty_quiche_conn_on_timeout },
{ "quiche_conn_readable", "(J)J", (void *) netty_quiche_conn_readable },
{ "quiche_conn_writable", "(J)J", (void *) netty_quiche_conn_writable },
{ "quiche_stream_iter_free", "(J)V", (void *) netty_quiche_stream_iter_free },
{ "quiche_stream_iter_next", "(J[J)I", (void *) netty_quiche_stream_iter_next },
{ "quiche_conn_dgram_max_writable_len", "(J)I", (void* ) netty_quiche_conn_dgram_max_writable_len },
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/netty/incubator/codec/quic/Quiche.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ static native int quiche_conn_stream_priority(
*/
static native long quiche_conn_readable(long connAddr);

/**
* See
* <a href="https://github.com/cloudflare/quiche/blob/0.6.0/include/quiche.h#L285">quiche_conn_writable</a>.
*/
static native long quiche_conn_writable(long connAddr);

/**
* See
* <a href="https://github.com/cloudflare/quiche/blob/0.6.0/include/quiche.h#L329">quiche_stream_iter_next</a>.
Expand Down
72 changes: 38 additions & 34 deletions src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public void operationComplete(ChannelFuture future) {

private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private final long[] readableStreams = new long[128];
private final long[] writableStreams = new long[128];

private final LongObjectMap<QuicheQuicStreamChannel> streams = new LongObjectHashMap<>();
private final Queue<Long> flushPendingQueue = new ArrayDeque<>();
private final QuicheQuicChannelConfig config;
private final boolean server;
private final QuicStreamIdGenerator idGenerator;
Expand Down Expand Up @@ -316,7 +317,6 @@ void forceClose() {
state = CLOSED;

closeStreams();
flushPendingQueue.clear();

if (finBuffer != null) {
finBuffer.release();
Expand Down Expand Up @@ -782,53 +782,56 @@ void writable() {
}
}

void streamHasPendingWrites(long streamId) {
flushPendingQueue.add(streamId);
int streamCapacity(long streamId) {
if (connection.isClosed()) {
return 0;
}
return Quiche.quiche_conn_stream_capacity(connection.address(), streamId);
}

private boolean handleWritableStreams() {
int pending = flushPendingQueue.size();
if (isConnDestroyed() || pending == 0) {
if (isConnDestroyed()) {
return false;
}
inHandleWritableStreams = true;
try {
long connAddr = connection.address();
boolean mayNeedWrite = false;

if (Quiche.quiche_conn_is_established(connAddr) ||
Quiche.quiche_conn_is_in_early_data(connAddr)) {
// We only want to process the number of channels that were in the queue when we entered
// handleWritableStreams(). Otherwise we may would loop forever as a channel may add itself again
// if the write was again partial.
for (int i = 0; i < pending; i++) {
Long streamId = flushPendingQueue.poll();
if (streamId == null) {
break;
}
// Checking quiche_conn_stream_capacity(...) is cheaper then calling channel.writable() just
// to notice that we can not write again.
int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
if (capacity == 0) {
// Still not writable, put back in the queue.
flushPendingQueue.add(streamId);
} else {
long sid = streamId;
QuicheQuicStreamChannel channel = streams.get(sid);
if (channel != null) {
if (capacity > 0) {
mayNeedWrite = true;
channel.writable(capacity);
} else {
if (!Quiche.quiche_conn_stream_finished(connAddr, sid)) {
// Only fire an exception if the error was not caused because the stream is
// considered finished.
channel.pipeline().fireExceptionCaught(Quiche.newException(capacity));
long writableIterator = Quiche.quiche_conn_writable(connAddr);

try {
// For streams we always process all streams when at least on read was requested.
for (;;) {
int writable = Quiche.quiche_stream_iter_next(
writableIterator, writableStreams);
for (int i = 0; i < writable; i++) {
long streamId = writableStreams[i];
QuicheQuicStreamChannel streamChannel = streams.get(streamId);
if (streamChannel != null) {
int capacity = Quiche.quiche_conn_stream_capacity(connAddr, streamId);
if (capacity < 0) {
if (!Quiche.quiche_conn_stream_finished(connAddr, streamId)) {
// Only fire an exception if the error was not caused because the stream is
// considered finished.
streamChannel.pipeline().fireExceptionCaught(Quiche.newException(capacity));
}
// Let's close the channel if quiche_conn_stream_capacity(...) returns an error.
streamChannel.forceClose();
} else if (streamChannel.writable(capacity)) {
mayNeedWrite = true;
}
// Let's close the channel if quiche_conn_stream_capacity(...) returns an error.
channel.forceClose();
}
}
if (writable < writableStreams.length) {
// We did handle all writable streams.
break;
}
}
} finally {
Quiche.quiche_stream_iter_free(writableIterator);
}
}
return mayNeedWrite;
Expand Down Expand Up @@ -1343,6 +1346,7 @@ private QuicheQuicStreamChannel addNewStreamChannel(long streamId) {
QuicheQuicChannel.this, streamId);
QuicheQuicStreamChannel old = streams.put(streamId, streamChannel);
assert old == null;
streamChannel.writable(streamCapacity(streamId));
return streamChannel;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ final class QuicheQuicStreamChannel extends DefaultAttributeMap implements QuicS
private volatile boolean inputShutdown;
private volatile boolean outputShutdown;
private volatile QuicStreamPriority priority;
private volatile int capacity;

QuicheQuicStreamChannel(QuicheQuicChannel parent, long streamId) {
this.parent = parent;
Expand Down Expand Up @@ -328,12 +329,16 @@ public boolean isWritable() {

@Override
public long bytesBeforeUnwritable() {
return 0;
return capacity;
}

@Override
public long bytesBeforeWritable() {
return 0;
if (writable) {
return 0;
}
// Just return something positive for now
return 8;
}

@Override
Expand All @@ -359,8 +364,18 @@ public int compareTo(Channel o) {
/**
* Stream is writable.
*/
void writable(@SuppressWarnings("unused") int capacity) {
((QuicStreamChannelUnsafe) unsafe()).writeQueued();
boolean writable(@SuppressWarnings("unused") int capacity) {
this.capacity = capacity;
boolean mayNeedWrite = ((QuicStreamChannelUnsafe) unsafe()).writeQueued();
updateWritabilityIfNeeded(capacity > 0);
return mayNeedWrite;
}

private void updateWritabilityIfNeeded(boolean newWritable) {
if (writable != newWritable) {
writable = newWritable;
pipeline.fireChannelWritabilityChanged();
}
}

/**
Expand Down Expand Up @@ -559,29 +574,32 @@ private void closeIfNeeded(boolean wasFinSent) {
}
}

void writeQueued() {
boolean writeQueued() {
boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
inWriteQueued = true;
try {
if (queue.isEmpty()) {
return false;
}
boolean written = false;
for (;;) {
Object msg = queue.current();
if (msg == null) {
break;
}
try {
if (!write0(msg)) {
return;
return written;
}
} catch (Exception e) {
queue.remove().setFailure(e);
continue;
}
queue.remove().setSuccess();
written = true;
}
if (!writable) {
writable = true;
pipeline.fireChannelWritabilityChanged();
}
updateWritabilityIfNeeded(true);
return written;
} finally {
closeIfNeeded(wasFinSent);
inWriteQueued = false;
Expand Down Expand Up @@ -626,21 +644,24 @@ public void write(Object msg, ChannelPromise promise) {
}

boolean wasFinSent = QuicheQuicStreamChannel.this.finSent;
boolean mayNeedWritabilityUpdate = false;
try {
if (write0(msg)) {
ReferenceCountUtil.release(msg);
promise.setSuccess();
mayNeedWritabilityUpdate = capacity == 0;
} else {
queue.add(msg, promise);
if (writable) {
writable = false;
pipeline.fireChannelWritabilityChanged();
}
mayNeedWritabilityUpdate = true;
}
} catch (Exception e) {
ReferenceCountUtil.release(msg);
promise.setFailure(e);
mayNeedWritabilityUpdate = capacity == 0;
} finally {
if (mayNeedWritabilityUpdate) {
updateWritabilityIfNeeded(false);
}
closeIfNeeded(wasFinSent);
}
}
Expand Down Expand Up @@ -673,8 +694,13 @@ private boolean write0(Object msg) throws Exception {
try {
do {
int res = parent().streamSend(streamId(), buffer, fin);

// Update the capacity as well.
int cap = parent.streamCapacity(streamId());
if (cap >= 0) {
capacity = cap;
}
if (Quiche.throwIfError(res) || res == 0) {
parent.streamHasPendingWrites(streamId());
return false;
}
sendSomething = true;
Expand Down
Loading

0 comments on commit 22dffdf

Please sign in to comment.