Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix csv parser (#9005) #10269

Merged
merged 2 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
`"\0\b\n\r\t\Z\\\ \c\'\""`,
[]string{string([]byte{0, '\b', '\n', '\r', '\t', 26, '\\', ' ', ' ', 'c', '\'', '"'})},
},
// Test mixed.
{
`"123",456,"\t7890",abcd`,
[]string{"123", "456", "\t7890", "abcd"},
},
}

ldInfo := LoadDataInfo{
Expand All @@ -212,7 +217,7 @@ func (s *testExecSuite) TestGetFieldsFromLine(c *C) {
}

_, err := ldInfo.getFieldsFromLine([]byte(`1,a string,100.20`))
c.Assert(err, NotNil)
c.Assert(err, IsNil)
}

func assertEqualStrings(c *C, got []field, expect []string) {
Expand Down
187 changes: 165 additions & 22 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package executor

import (
"bytes"
"context"
"fmt"
"strings"

Expand All @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
"golang.org/x/net/context"
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -209,7 +208,6 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
if len(prevData) == 0 && len(curData) == 0 {
return nil, false, nil
}

var line []byte
var isEOF, hasStarting, reachLimit bool
if len(prevData) > 0 && len(curData) == 0 {
Expand All @@ -220,7 +218,6 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
for len(curData) > 0 {
line, curData, hasStarting = e.getLine(prevData, curData)
prevData = nil

// If it doesn't find the terminated symbol and this data isn't the last data,
// the data can't be inserted.
if line == nil && !isEOF {
Expand Down Expand Up @@ -300,28 +297,174 @@ func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
type field struct {
str []byte
maybeNull bool
enclosed bool
}

type fieldWriter struct {
pos int
enclosedChar byte
fieldTermChar byte
term *string
isEnclosed bool
isLineStart bool
isFieldStart bool
ReadBuf *[]byte
OutputBuf []byte
}

func (w *fieldWriter) Init(enclosedChar byte, fieldTermChar byte, readBuf *[]byte, term *string) {
w.isEnclosed = false
w.isLineStart = true
w.isFieldStart = true
w.ReadBuf = readBuf
w.enclosedChar = enclosedChar
w.fieldTermChar = fieldTermChar
w.term = term
}

func (w *fieldWriter) putback() {
w.pos--
}

func (w *fieldWriter) getChar() (bool, byte) {
if w.pos < len(*w.ReadBuf) {
ret := (*w.ReadBuf)[w.pos]
w.pos++
return true, ret
}
return false, 0
}

func (w *fieldWriter) isTerminator() bool {
chkpt, isterm := w.pos, true
for i := 1; i < len(*w.term); i++ {
flag, ch := w.getChar()
if !flag || ch != (*w.term)[i] {
isterm = false
break
}
}
if !isterm {
w.pos = chkpt
return false
}
return true
}

func (w *fieldWriter) outputField(enclosed bool) field {
var fild []byte
start := 0
if enclosed {
start = 1
}
for i := start; i < len(w.OutputBuf); i++ {
fild = append(fild, w.OutputBuf[i])
}
if len(fild) == 0 {
fild = []byte("")
}
w.OutputBuf = w.OutputBuf[0:0]
w.isEnclosed = false
w.isFieldStart = true
return field{fild, false, enclosed}
}

func (w *fieldWriter) GetField() (bool, field) {
// The first return value implies whether fieldWriter read the last character of line.
if w.isLineStart {
_, ch := w.getChar()
if ch == w.enclosedChar {
w.isEnclosed = true
w.isFieldStart, w.isLineStart = false, false
w.OutputBuf = append(w.OutputBuf, ch)
} else {
w.putback()
}
}
for {
flag, ch := w.getChar()
if !flag {
ret := w.outputField(false)
return true, ret
}
if ch == w.enclosedChar && w.isFieldStart {
// If read enclosed char at field start.
w.isEnclosed = true
w.OutputBuf = append(w.OutputBuf, ch)
w.isLineStart, w.isFieldStart = false, false
continue
}
w.isLineStart, w.isFieldStart = false, false
if ch == w.fieldTermChar && !w.isEnclosed {
// If read filed terminate char.
if w.isTerminator() {
ret := w.outputField(false)
return false, ret
}
w.OutputBuf = append(w.OutputBuf, ch)
} else if ch == w.enclosedChar && w.isEnclosed {
// If read enclosed char, look ahead.
flag, ch = w.getChar()
if !flag {
ret := w.outputField(true)
return true, ret
} else if ch == w.enclosedChar {
w.OutputBuf = append(w.OutputBuf, ch)
continue
} else if ch == w.fieldTermChar {
// If the next char is fieldTermChar, look ahead.
if w.isTerminator() {
ret := w.outputField(true)
return false, ret
}
w.OutputBuf = append(w.OutputBuf, ch)
} else {
// If there is no terminator behind enclosedChar, put the char back.
w.OutputBuf = append(w.OutputBuf, w.enclosedChar)
w.putback()
}
} else if ch == '\\' {
// TODO: escape only support '\'
w.OutputBuf = append(w.OutputBuf, ch)
flag, ch = w.getChar()
if flag {
if ch == w.enclosedChar {
w.OutputBuf = append(w.OutputBuf, ch)
} else {
w.putback()
}
}
} else {
w.OutputBuf = append(w.OutputBuf, ch)
}
}
}

// getFieldsFromLine splits line according to fieldsInfo.
func (e *LoadDataInfo) getFieldsFromLine(line []byte) ([]field, error) {
var sep []byte
if e.FieldsInfo.Enclosed != 0 {
if line[0] != e.FieldsInfo.Enclosed || line[len(line)-1] != e.FieldsInfo.Enclosed {
return nil, errors.Errorf("line %s should begin and end with %c", string(line), e.FieldsInfo.Enclosed)
var (
reader fieldWriter
fields []field
)

if len(line) == 0 {
str := []byte("")
fields = append(fields, field{str, false, false})
return fields, nil
}

reader.Init(e.FieldsInfo.Enclosed, e.FieldsInfo.Terminated[0], &line, &e.FieldsInfo.Terminated)
for {
eol, f := reader.GetField()
f = f.escape()
if string(f.str) == "NULL" && !f.enclosed {
f.str = []byte{'N'}
f.maybeNull = true
}
fields = append(fields, f)
if eol {
break
}
line = line[1 : len(line)-1]
sep = make([]byte, 0, len(e.FieldsInfo.Terminated)+2)
sep = append(sep, e.FieldsInfo.Enclosed)
sep = append(sep, e.FieldsInfo.Terminated...)
sep = append(sep, e.FieldsInfo.Enclosed)
} else {
sep = []byte(e.FieldsInfo.Terminated)
}
rawCols := bytes.Split(line, sep)
fields := make([]field, 0, len(rawCols))
for _, v := range rawCols {
f := field{v, false}
fields = append(fields, f.escape())
}
return fields, nil
}
Expand All @@ -341,7 +484,7 @@ func (f *field) escape() field {
f.str[pos] = c
pos++
}
return field{f.str[:pos], f.maybeNull}
return field{f.str[:pos], f.maybeNull, f.enclosed}
}

func (f *field) escapeChar(c byte) byte {
Expand Down
Loading