Skip to content

Commit

Permalink
Split up pkg/input/record_reader.go
Browse files Browse the repository at this point in the history
  • Loading branch information
johnkerl committed Feb 25, 2024
1 parent 296ff87 commit 62c90dd
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 171 deletions.
3 changes: 3 additions & 0 deletions pkg/input/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package input

const CSV_BOM = "\xef\xbb\xbf"
113 changes: 113 additions & 0 deletions pkg/input/line_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// This file contains the interface for file-format-specific record-readers, as
// well as a collection of utility functions.

package input

import (
"bufio"
"container/list"
"io"
)

type ILineReader interface {
Scan() bool
Text() string
}

// NewLineScanner handles read lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
scanner := bufio.NewScanner(handle)

// Handled by default scanner.
if irs == "\n" || irs == "\r\n" {
return scanner
}

irsbytes := []byte(irs)
irslen := len(irsbytes)

// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}

scanner.Split(recordSplitter)

return scanner
}

// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineScanner(
lineScanner *bufio.Scanner,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false

lines := list.New()

for lineScanner.Scan() {
i++

lines.PushBack(lineScanner.Text())

// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case _ = <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}

if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}
171 changes: 0 additions & 171 deletions pkg/input/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@
package input

import (
"bufio"
"container/list"
"io"
"regexp"
"strings"

"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/lib"
"github.com/johnkerl/miller/pkg/types"
)

const CSV_BOM = "\xef\xbb\xbf"

// Since Go is concurrent, the context struct (AWK-like variables such as
// FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the
// channels along with each record. Hence the initial context, which readers
Expand All @@ -32,166 +24,3 @@ type IRecordReader interface {
downstreamDoneChannel <-chan bool, // for mlr head
)
}

// NewLineScanner handles read lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
scanner := bufio.NewScanner(handle)

// Handled by default scanner.
if irs == "\n" || irs == "\r\n" {
return scanner
}

irsbytes := []byte(irs)
irslen := len(irsbytes)

// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}

scanner.Split(recordSplitter)

return scanner
}

// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineScanner(
lineScanner *bufio.Scanner,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false

lines := list.New()

for lineScanner.Scan() {
i++

lines.PushBack(lineScanner.Text())

// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case _ = <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}

if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}

// IPairSplitter splits a string into left and right, e.g. for IPS.
// This helps us reuse code for splitting by IPS string, or IPS regex.
type iPairSplitter interface {
Split(input string) []string
}

func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
if options.IPSRegex == nil {
return &tIPSSplitter{ips: options.IPS}
} else {
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
}
}

type tIPSSplitter struct {
ips string
}

func (s *tIPSSplitter) Split(input string) []string {
return strings.SplitN(input, s.ips, 2)
}

type tIPSRegexSplitter struct {
ipsRegex *regexp.Regexp
}

func (s *tIPSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
}

// IFieldSplitter splits a string into pieces, e.g. for IFS.
// This helps us reuse code for splitting by IFS string, or IFS regex.
type iFieldSplitter interface {
Split(input string) []string
}

func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
if options.IFSRegex == nil {
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
} else {
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
}
}

type tIFSSplitter struct {
ifs string
allowRepeatIFS bool
}

func (s *tIFSSplitter) Split(input string) []string {
fields := lib.SplitString(input, s.ifs)
if s.allowRepeatIFS {
fields = lib.StripEmpties(fields) // left/right trim
}
return fields
}

type tIFSRegexSplitter struct {
ifsRegex *regexp.Regexp
}

func (s *tIFSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ifsRegex, input, -1)
}
77 changes: 77 additions & 0 deletions pkg/input/splitters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// This file contains the interface for file-format-specific record-readers, as
// well as a collection of utility functions.

package input

import (
"regexp"
"strings"

"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/lib"
)

// IPairSplitter splits a string into left and right, e.g. for IPS.
// This helps us reuse code for splitting by IPS string, or IPS regex.
type iPairSplitter interface {
Split(input string) []string
}

func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
if options.IPSRegex == nil {
return &tIPSSplitter{ips: options.IPS}
} else {
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
}
}

type tIPSSplitter struct {
ips string
}

func (s *tIPSSplitter) Split(input string) []string {
return strings.SplitN(input, s.ips, 2)
}

type tIPSRegexSplitter struct {
ipsRegex *regexp.Regexp
}

func (s *tIPSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
}

// IFieldSplitter splits a string into pieces, e.g. for IFS.
// This helps us reuse code for splitting by IFS string, or IFS regex.
type iFieldSplitter interface {
Split(input string) []string
}

func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
if options.IFSRegex == nil {
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
} else {
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
}
}

type tIFSSplitter struct {
ifs string
allowRepeatIFS bool
}

func (s *tIFSSplitter) Split(input string) []string {
fields := lib.SplitString(input, s.ifs)
if s.allowRepeatIFS {
fields = lib.StripEmpties(fields) // left/right trim
}
return fields
}

type tIFSRegexSplitter struct {
ifsRegex *regexp.Regexp
}

func (s *tIFSRegexSplitter) Split(input string) []string {
return lib.RegexCompiledSplitString(s.ifsRegex, input, -1)
}

0 comments on commit 62c90dd

Please sign in to comment.