Skip to content

Commit

Permalink
replace CountingInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow committed Jun 12, 2024
1 parent 2127949 commit ca916f7
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.{InputStream, InputStreamReader, IOException, Reader}
import java.nio.ByteBuffer
import java.nio.charset.Charset

import org.apache.commons.io.input.CountingInputStream
import org.apache.commons.io.input.BoundedInputStream
import org.apache.hadoop.fs.Seekable
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.io.compress._
Expand Down Expand Up @@ -67,7 +67,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
private var end: Long = _
private var reader: Reader = _
private var filePosition: Seekable = _
private var countingIn: CountingInputStream = _
private var countingIn: BoundedInputStream = _
private var readerLeftoverCharFn: () => Boolean = _
private var readerByteBuffer: ByteBuffer = _
private var decompressor: Decompressor = _
Expand Down Expand Up @@ -117,7 +117,10 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
}
} else {
fsin.seek(start)
countingIn = new CountingInputStream(fsin)
countingIn = BoundedInputStream.builder()
.setInputStream(fsin)
.setMaxCount(fileSplit.getLength)
.get()
in = countingIn
// don't use filePosition in this case. We have to count bytes read manually
}
Expand Down Expand Up @@ -156,7 +159,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] {
if (filePosition != null) {
filePosition.getPos
} else {
start + countingIn.getByteCount -
start + countingIn.getCount -
readerByteBuffer.remaining() -
(if (readerLeftoverCharFn()) 1 else 0)
}
Expand Down

0 comments on commit ca916f7

Please sign in to comment.