Skip to content

Commit

Permalink
Downgrade Netty from 4.1.x series to 4.0.x series
Browse files Browse the repository at this point in the history
  • Loading branch information
ash211 committed Aug 21, 2017
1 parent 00a5e85 commit bd08934
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,6 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down Expand Up @@ -237,7 +232,7 @@ private void encryptMore() throws IOException {
int copied = byteRawChannel.write(buf.nioBuffer());
buf.skipBytes(copied);
} else {
region.transferTo(byteRawChannel, region.transferred());
region.transferTo(byteRawChannel, region.transfered());
}
cos.write(byteRawChannel.getData(), 0, byteRawChannel.length());
cos.flush();
Expand All @@ -246,28 +241,6 @@ private void encryptMore() throws IOException {
0, byteEncChannel.length());
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
protected void deallocate() {
byteRawChannel.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public long transfered() {
return totalBytesTransferred;
}

@Override
public long transferred() {
return totalBytesTransferred;
}

/**
* This code is more complicated than you would think because we might require multiple
* transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
Expand Down Expand Up @@ -132,28 +127,6 @@ public long transferTo(final WritableByteChannel target, final long position) th
return writtenHeader + writtenBody;
}

@Override
public FileRegion touch(Object msg) {
return this;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
header.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ public long transfered() {
return transferred;
}

@Override
public long transferred() {
return transferred;
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down Expand Up @@ -267,7 +262,7 @@ private void nextChunk() throws IOException {
int copied = byteChannel.write(buf.nioBuffer());
buf.skipBytes(copied);
} else {
region.transferTo(byteChannel, region.transferred());
region.transferTo(byteChannel, region.transfered());
}

byte[] encrypted = backend.wrap(byteChannel.getData(), 0, byteChannel.length());
Expand All @@ -277,28 +272,6 @@ private void nextChunk() throws IOException {
this.unencryptedChunkSize = byteChannel.length();
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
if (currentHeader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ public void close() {
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.config() != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully();
if (bootstrap != null && bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
}
if (bootstrap != null && bootstrap.config() != null
&& bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully();
if (bootstrap != null &&bootstrap.childGroup() != null) {
bootstrap.childGroup().shutdownGracefully();
}
bootstrap = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeOneInbound(serverChannel.readOutbound());
clientChannel.writeInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeOneInbound(clientChannel.readOutbound());
serverChannel.writeInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down Expand Up @@ -116,8 +116,8 @@ public void encode(ChannelHandlerContext ctx, FileRegion in, List<Object> out)
throws Exception {

ByteArrayWritableChannel channel = new ByteArrayWritableChannel(Ints.checkedCast(in.count()));
while (in.transferred() < in.count()) {
in.transferTo(channel, in.transferred());
while (in.transfered() < in.count()) {
in.transferTo(channel, in.transfered());
}
out.add(Unpooled.wrappedBuffer(channel.getData()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ public long transfered() {
return 8 * written;
}

@Override
public long transferred() {
return 8 * written;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
for (int i = 0; i < writesPerCall; i++) {
Expand All @@ -153,28 +148,6 @@ public long transferTo(WritableByteChannel target, long position) throws IOExcep
return 8 * writesPerCall;
}

@Override
public FileRegion retain() {
super.retain();
return this;
}

@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public FileRegion touch(Object o) {
return this;
}

@Override
public FileRegion touch() {
return this;
}

@Override
protected void deallocate() {
}
Expand Down
16 changes: 0 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,6 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:
}

override def deallocate(): Unit = source.close()

override def transferred(): Long = _transferred

override def touch(o: scala.Any): FileRegion = this

override def retain(): FileRegion = {
super.retain()
this
}

override def retain(increment: Int): FileRegion = {
super.retain(increment)
this
}

override def touch(): FileRegion = this
}

private class CountingWritableChannel(sink: WritableByteChannel) extends WritableByteChannel {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-palantir
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ mimepull-1.9.6.jar
minlog-1.3.0.jar
mx4j-3.0.2.jar
netty-3.10.6.Final.jar
netty-all-4.1.13.Final.jar
netty-all-4.0.50.Final.jar
nimbus-jose-jwt-3.9.jar
objenesis-2.5.1.jar
okhttp-2.7.5.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.13.Final</version>
<version>4.0.50.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down

0 comments on commit bd08934

Please sign in to comment.