Skip to content

Commit

Permalink
[FLINK-35175][filesystem] Avoid reading with ByteBuffer on hadoop ver…
Browse files Browse the repository at this point in the history
…sion below 3.3.0 (#24691)
  • Loading branch information
masteryhx committed Apr 22, 2024
1 parent b3fdb07 commit a4c71c8
Showing 1 changed file with 30 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.core.fs.ByteBufferReadable;
import org.apache.flink.core.fs.FSDataInputStream;

import org.apache.hadoop.fs.StreamCapabilities;

import javax.annotation.Nonnull;

import java.io.IOException;
Expand Down Expand Up @@ -147,53 +145,47 @@ public void skipFully(long bytes) throws IOException {

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
// Not all internal stream supports ByteBufferReadable
if (fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) {
return fsDataInputStream.read(byteBuffer);
// TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(ByteBuffer) to improve the
// performance after updating hadoop version to 3.3.0 and above.
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
// Fallback to read byte then put
int c = read();
// Fallback to read byte then put
int c = read();
if (c == -1) {
return -1;
}
byteBuffer.put((byte) c);

int n = 1, len = byteBuffer.remaining() + 1;
for (; n < len; n++) {
c = read();
if (c == -1) {
return -1;
break;
}
byteBuffer.put((byte) c);

int n = 1, len = byteBuffer.remaining() + 1;
for (; n < len; n++) {
c = read();
if (c == -1) {
break;
}
byteBuffer.put((byte) c);
}
return n;
}
return n;
}
}

@Override
public int read(long position, ByteBuffer byteBuffer) throws IOException {
// Not all internal stream supports ByteBufferPositionedReadable
if (fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) {
return fsDataInputStream.read(position, byteBuffer);
// TODO: Use org.apache.hadoop.fs.FSDataInputStream#read(long, ByteBuffer) to improve the
// performance after updating hadoop version to 3.3.0 and above.
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(
position, byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
if (byteBuffer.hasArray()) {
int len = byteBuffer.remaining();
fsDataInputStream.readFully(
position, byteBuffer.array(), byteBuffer.arrayOffset(), len);
return len;
} else {
// Fallback to positionable read bytes then put
byte[] tmp = new byte[byteBuffer.remaining()];
fsDataInputStream.readFully(position, tmp, 0, tmp.length);
byteBuffer.put(tmp);
return tmp.length;
}
// Fallback to positionable read bytes then put
byte[] tmp = new byte[byteBuffer.remaining()];
fsDataInputStream.readFully(position, tmp, 0, tmp.length);
byteBuffer.put(tmp);
return tmp.length;
}
}
}

0 comments on commit a4c71c8

Please sign in to comment.