Skip to content

Commit

Permalink
new ILineReader/TLineReader
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed Feb 25, 2024
1 parent 62c90dd commit 3979dfa
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 24 deletions.
73 changes: 68 additions & 5 deletions pkg/input/line_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,63 @@ type ILineReader interface {
Text() string
}

// NewLineScanner handles read lines which may be delimited by multi-line separators,
type TLineReader struct {
scanner *bufio.Scanner
}

// NewLineReader handles reading lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineReader(handle io.Reader, irs string) *TLineReader {
scanner := bufio.NewScanner(handle)

if irs == "\n" || irs == "\r\n" {
// Handled by default scanner.
} else {
irsbytes := []byte(irs)
irslen := len(irsbytes)

// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}

scanner.Split(recordSplitter)
}

return &TLineReader{
scanner: scanner,
}
}

func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
scanner := bufio.NewScanner(handle)

Expand Down Expand Up @@ -66,12 +121,20 @@ func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
return scanner
}

func (r *TLineReader) Scan() bool {
return r.scanner.Scan()
}

func (r *TLineReader) Text() string {
return r.scanner.Text()
}

// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineScanner(
lineScanner *bufio.Scanner,
func channelizedLineReader(
lineReader ILineReader,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
Expand All @@ -81,10 +144,10 @@ func channelizedLineScanner(

lines := list.New()

for lineScanner.Scan() {
for lineReader.Scan() {
i++

lines.PushBack(lineScanner.Text())
lines.PushBack(lineReader.Text())

// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_csvlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func (reader *RecordReaderCSVLite) processHandle(
reader.headerStrings = nil

recordsPerBatch := reader.recordsPerBatch
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_dkvp_nidx.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (reader *RecordReaderDKVPNIDX) processHandle(
context.UpdateForStartOfFile(filename)
recordsPerBatch := reader.recordsPerBatch

lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context)
Expand Down
9 changes: 4 additions & 5 deletions pkg/input/record_reader_json.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package input

import (
"bufio"
"container/list"
"fmt"
"io"
Expand Down Expand Up @@ -203,7 +202,7 @@ func (reader *RecordReaderJSON) processHandle(
// JSONCommentEnabledReader implements io.Reader to strip comment lines
// off of CSV data.
type JSONCommentEnabledReader struct {
lineScanner *bufio.Scanner
lineReader ILineReader
readerOptions *cli.TReaderOptions
context *types.Context // Needed for channelized stdout-printing logic
readerChannel chan<- *list.List // list of *types.RecordAndContext
Expand All @@ -220,7 +219,7 @@ func NewJSONCommentEnabledReader(
readerChannel chan<- *list.List, // list of *types.RecordAndContext
) *JSONCommentEnabledReader {
return &JSONCommentEnabledReader{
lineScanner: bufio.NewScanner(underlying),
lineReader: NewLineReader(underlying, "\n"),
readerOptions: readerOptions,
context: types.NewNilContext(),
readerChannel: readerChannel,
Expand All @@ -237,10 +236,10 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) {
// Loop until we can get a non-comment line to pass on, or end of file.
for {
// EOF
if !bsr.lineScanner.Scan() {
if !bsr.lineReader.Scan() {
return 0, io.EOF
}
line := bsr.lineScanner.Text()
line := bsr.lineReader.Text()

// Non-comment line
if !strings.HasPrefix(line, bsr.readerOptions.CommentString) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_pprint.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle(
reader.headerStrings = nil

recordsPerBatch := reader.recordsPerBatch
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
Expand Down
4 changes: 2 additions & 2 deletions pkg/input/record_reader_tsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (reader *RecordReaderTSV) processHandle(
reader.headerStrings = nil

recordsPerBatch := reader.recordsPerBatch
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
linesChannel := make(chan *list.List, recordsPerBatch)
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)

for {
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
Expand Down
11 changes: 5 additions & 6 deletions pkg/input/record_reader_xtab.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package input

import (
"bufio"
"container/list"
"fmt"
"io"
Expand Down Expand Up @@ -105,10 +104,10 @@ func (reader *RecordReaderXTAB) processHandle(
recordsPerBatch := reader.recordsPerBatch

// XTAB uses repeated IFS, rather than IRS, to delimit records
lineScanner := NewLineScanner(handle, reader.readerOptions.IFS)
lineReader := NewLineReader(handle, reader.readerOptions.IFS)

stanzasChannel := make(chan *list.List, recordsPerBatch)
go channelizedStanzaScanner(lineScanner, reader.readerOptions, stanzasChannel, downstreamDoneChannel,
go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel,
recordsPerBatch)

for {
Expand Down Expand Up @@ -137,7 +136,7 @@ func (reader *RecordReaderXTAB) processHandle(
// start or end of file. A single stanza, once parsed, will become a single
// record.
func channelizedStanzaScanner(
lineScanner *bufio.Scanner,
lineReader ILineReader,
readerOptions *cli.TReaderOptions,
stanzasChannel chan<- *list.List, // list of list of string
downstreamDoneChannel <-chan bool, // for mlr head
Expand All @@ -150,8 +149,8 @@ func channelizedStanzaScanner(
stanzas := list.New()
stanza := newStanza()

for lineScanner.Scan() {
line := lineScanner.Text()
for lineReader.Scan() {
line := lineReader.Text()

// Check for comments-in-data feature
// TODO: function-pointer this away
Expand Down

0 comments on commit 3979dfa

Please sign in to comment.