diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java index bb0c4db725..05d6288b3d 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/MinecraftConnection.java @@ -51,6 +51,7 @@ import com.velocitypowered.proxy.util.except.QuietDecoderException; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -224,12 +225,16 @@ public EventLoop eventLoop() { * Writes and immediately flushes a message to the connection. * * @param msg the message to write + * + * @return A {@link ChannelFuture} that will complete when packet is successfully sent */ - public void write(Object msg) { + @Nullable + public ChannelFuture write(Object msg) { if (channel.isActive()) { - channel.writeAndFlush(msg, channel.voidPromise()); + return channel.writeAndFlush(msg, channel.newPromise()); } else { ReferenceCountUtil.release(msg); + return null; } } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java index 67242d9a5d..2ddcff17d1 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java @@ -20,9 +20,11 @@ import com.velocitypowered.proxy.connection.MinecraftConnection; import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.protocol.MinecraftPacket; +import io.netty.channel.ChannelFuture; +import org.checkerframework.checker.nullness.qual.Nullable; import java.time.Instant; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; +import java.util.function.Function; /** * A precisely ordered queue which allows for outside entries into the ordered queue through @@ -58,9 +60,8 @@ public void queuePacket(CompletableFuture nextPacket, Instant t MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); CompletableFuture nextInLine = WrappedPacket.wrap(timestamp, nextPacket); - awaitChat(smc, this.packetFuture, + this.packetFuture = awaitChat(smc, this.packetFuture, nextInLine); // we await chat, binding `this.packetFuture` -> `nextInLine` - this.packetFuture = nextInLine; } } @@ -84,21 +85,26 @@ public void hijack(K packet, } } - private static BiConsumer writePacket(MinecraftConnection connection) { - return (wrappedPacket, throwable) -> { - if (wrappedPacket != null && !connection.isClosed()) { - wrappedPacket.write(connection); + private static Function writePacket(MinecraftConnection connection) { + return wrappedPacket -> { + if (!connection.isClosed()) { + ChannelFuture future = wrappedPacket.write(connection); + if (future != null) { + future.awaitUninterruptibly(); + } } + + return wrappedPacket; }; } - private static void awaitChat( + private static CompletableFuture awaitChat( MinecraftConnection connection, CompletableFuture binder, CompletableFuture future ) { // the binder will run -> then the future will get the `write packet` caller - binder.whenComplete((ignored1, ignored2) -> future.whenComplete(writePacket(connection))); + return binder.thenCompose(ignored -> future.thenApply(writePacket(connection))); } private static CompletableFuture hijackCurrentPacket( @@ -113,7 +119,7 @@ private static CompletableFuture h // map the new packet into a better "designed" packet with the hijacked packet's timestamp WrappedPacket.wrap(previous.timestamp, future.thenApply(item -> packetMapper.map(previous.timestamp, item))) - .whenCompleteAsync(writePacket(connection), connection.eventLoop()) + .thenApplyAsync(writePacket(connection), connection.eventLoop()) .whenComplete( (packet, throwable) -> awaitedFuture.complete(throwable != null ? null : packet)); }); @@ -148,10 +154,12 @@ private WrappedPacket(Instant timestamp, MinecraftPacket packet) { this.packet = packet; } - public void write(MinecraftConnection connection) { + @Nullable + public ChannelFuture write(MinecraftConnection connection) { if (packet != null) { - connection.write(packet); + return connection.write(packet); } + return null; } private static CompletableFuture wrap(Instant timestamp,