Skip to content

Commit

Permalink
Merge pull request #999 from booky10/fix/bungee-injection
Browse files Browse the repository at this point in the history
Rework bungee injector to allow for packet cancellation
  • Loading branch information
retrooper authored Sep 20, 2024
2 parents cb277bf + 7a9f831 commit 28c7849
Showing 1 changed file with 63 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,30 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.EncoderException;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import net.md_5.bungee.api.connection.ProxiedPlayer;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

// Thanks to ViaVersion for the compression method.
@ChannelHandler.Sharable
public class PacketEventsEncoder extends MessageToMessageEncoder<ByteBuf> {
public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter {

private static final Recycler<OutList> OUT_LIST_RECYCLER = new Recycler<OutList>() {
@Override
protected OutList newObject(Handle<OutList> handle) {
return new OutList(handle);
}
};

public ProxiedPlayer player;
public User user;
public boolean handledCompression;
Expand All @@ -45,7 +59,7 @@ public PacketEventsEncoder(User user) {
this.user = user;
}

public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) throws Exception {
boolean doCompression = handleCompressionOrder(ctx, buffer);
int firstReaderIndex = buffer.readerIndex();
PacketSendEvent packetSendEvent = EventCreationUtil.createSendEvent(ctx.channel(), user, player,
Expand All @@ -57,17 +71,16 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
ByteBufHelper.clear(packetSendEvent.getByteBuf());
packetSendEvent.getLastUsedWrapper().writeVarInt(packetSendEvent.getPacketId());
packetSendEvent.getLastUsedWrapper().write();
}
else {
} else {
buffer.readerIndex(firstReaderIndex);
}
if (doCompression) {
recompress(ctx, buffer, out);
this.recompress(ctx, buffer, promise);
} else {
out.add(buffer.retain());
ctx.write(buffer, promise);
}
} else {
ByteBufHelper.clear(packetSendEvent.getByteBuf());
ReferenceCountUtil.release(packetSendEvent.getByteBuf());
}
if (packetSendEvent.hasPostTasks()) {
for (Runnable task : packetSendEvent.getPostTasks()) {
Expand All @@ -77,16 +90,17 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) th
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
if (!msg.isReadable()) {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (!(msg instanceof ByteBuf)) {
super.write(ctx, msg, promise);
return;
}
read(ctx, msg, out);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
buf.release();
} else {
this.read(ctx, buf, promise);
}
}

private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer) {
Expand Down Expand Up @@ -125,12 +139,41 @@ private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer
return false;
}

private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
ChannelHandler compressor = ctx.pipeline().get("compress");
private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) {
OutList outWrapper = OUT_LIST_RECYCLER.get();
List<Object> out = outWrapper.list;
try {
ChannelHandler compressor = ctx.pipeline().get("compress");
CustomPipelineUtil.callPacketEncodeByteBuf(compressor, ctx, buffer, out);
} catch (InvocationTargetException e) {
e.printStackTrace();

int len = out.size();
if (len == 1) {
// should be the only case which
// happens on vanilla bungeecord
ctx.write(out.get(0), promise);
} else {
// copied from MessageToMessageEncoder#writePromiseCombiner
PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < len; i++) {
combiner.add(ctx.write(out.get(i)));
}
combiner.finish(promise);
}
} catch (InvocationTargetException exception) {
throw new EncoderException("Error while recompressing bytebuf " + buffer.readableBytes(), exception);
} finally {
outWrapper.handle.recycle(outWrapper);
}
}

private static final class OutList {

// the default bungee compressor only produces one output bytebuf
private final List<Object> list = new ArrayList<>(1);
private final Recycler.Handle<OutList> handle;

public OutList(Recycler.Handle<OutList> handle) {
this.handle = handle;
}
}
}

0 comments on commit 28c7849

Please sign in to comment.