Skip to content

Commit

Permalink
lightning/parser: support STARTING BY (#40821)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 committed Jan 30, 2023
1 parent 9544e26 commit 17d70c1
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 10 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ type CSVConfig struct {
TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"`
NotNull bool `toml:"not-null" json:"not-null"`
BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"`
// hide these options for lightning configuration file, they can only be used by LOAD DATA
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling
StartingBy string `toml:"-" json:"-"`
}

type MydumperRuntime struct {
Expand Down
63 changes: 54 additions & 9 deletions br/pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ type CSVParser struct {
blockParser
cfg *config.CSVConfig

comma []byte
quote []byte
newLine []byte
comma []byte
quote []byte
newLine []byte
startingBy []byte

charsetConvertor *CharsetConvertor
// These variables are used with IndexAnyByte to search a byte slice for the
Expand Down Expand Up @@ -120,6 +121,12 @@ func NewCSVParser(
}
unquoteStopSet = append(unquoteStopSet, newLineStopSet...)

if len(cfg.StartingBy) > 0 {
if strings.Contains(cfg.StartingBy, terminator) {
return nil, errors.New("starting-by cannot contain (line) terminator")
}
}

escFlavor := backslashEscapeFlavorNone
if cfg.BackslashEscape {
escFlavor = backslashEscapeFlavorMySQL
Expand All @@ -138,6 +145,7 @@ func NewCSVParser(
comma: []byte(separator),
quote: []byte(delimiter),
newLine: []byte(terminator),
startingBy: []byte(cfg.StartingBy),
escFlavor: escFlavor,
quoteByteSet: makeByteSet(quoteStopSet),
unquoteByteSet: makeByteSet(unquoteStopSet),
Expand Down Expand Up @@ -370,11 +378,43 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) {

isEmptyLine := true
whitespaceLine := true
foundStartingByThisLine := false
prevToken := csvTokenNewLine
var firstToken csvToken

outside:
for {
// we should drop
// 1. the whole line if it does not contain startingBy
// 2. any character before startingBy
// since we have checked startingBy does not contain terminator, we can
// split at terminator to check the substring contains startingBy. Even
// if the terminator is inside a quoted field which means it's not the
// end of a line, the substring can still be dropped by rule 2.
if len(parser.startingBy) > 0 && !foundStartingByThisLine {
oldPos := parser.pos
content, _, err := parser.ReadUntilTerminator()
if err != nil {
if !(errors.Cause(err) == io.EOF) {
return nil, err
}
if len(content) == 0 {
return nil, err
}
// if we reached EOF, we should still check the content contains
// startingBy and try to put back and parse it.
}
idx := bytes.Index(content, parser.startingBy)
if idx == -1 {
continue
}
foundStartingByThisLine = true
content = content[idx+len(parser.startingBy):]
content = append(content, parser.newLine...)
parser.buf = append(content, parser.buf...)
parser.pos = oldPos + int64(idx+len(parser.startingBy))
}

content, firstByte, err := parser.readUntil(&parser.unquoteByteSet)

if len(content) > 0 {
Expand Down Expand Up @@ -415,6 +455,7 @@ outside:
}
whitespaceLine = false
case csvTokenNewLine:
foundStartingByThisLine = false
// new line = end of record (ignore empty lines)
prevToken = firstToken
if isEmptyLine {
Expand Down Expand Up @@ -578,17 +619,21 @@ func (parser *CSVParser) ReadColumns() error {
}

// ReadUntilTerminator seeks the file until the terminator token is found, and
// returns the file offset beyond the terminator.
// This function is used in strict-format dividing a CSV file.
func (parser *CSVParser) ReadUntilTerminator() (int64, error) {
// returns
// - the content before terminator
// - the file offset beyond the terminator
// - error
// Note that the terminator string pattern may be the content of a field, which
// means it's inside quotes. Caller should make sure to handle this case.
func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) {
for {
_, firstByte, err := parser.readUntil(&parser.newLineByteSet)
content, firstByte, err := parser.readUntil(&parser.newLineByteSet)
if err != nil {
return 0, err
return content, 0, err
}
parser.skipBytes(1)
if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil {
return parser.pos, err
return content, parser.pos, err
}
}
}
234 changes: 234 additions & 0 deletions br/pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int
}
}

func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
assert.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor)
assert.NoError(t, err)

for ignoreNLines > 0 {
// IGNORE N LINES will directly find (line) terminator without checking it's inside quotes
_, _, err = parser.ReadUntilTerminator()
if errors.Cause(err) == io.EOF {
assert.Len(t, tc.expected, 0, "input = %q", tc.input)
return
}
assert.NoError(t, err)
ignoreNLines--
}

for i, row := range tc.expected {
comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1)
e := parser.ReadRow()
assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e))
assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment)
assert.Equal(t, row, parser.LastRow().Row, comment)
}
assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input)
}
}

func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) {
for _, tc := range cases {
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
Expand Down Expand Up @@ -935,6 +964,211 @@ func TestTerminator(t *testing.T) {
runTestCasesCSV(t, &cfg, 1, testCases)
}

func TestStartingBy(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "xxx",
},
}
testCases := []testCase{
{
input: `xxx"abc",1
something xxx"def",2
"ghi",3`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

testCases = []testCase{
{
input: `xxxabc,1
something xxxdef,2
ghi,3
"bad syntax"aaa`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

// test that special characters appears before StartingBy, and StartingBy only takes effect after once

testCases = []testCase{
{
input: `xxx"abc",1
something xxxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xxxxxx,8
`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("yyy"), types.NewStringDatum("4")},
{types.NewStringDatum("yyy"), types.NewStringDatum("5")},
{types.NewStringDatum("yyy"), types.NewStringDatum("6")},
{types.NewStringDatum("yyyxxx"), types.NewStringDatum("7")},
{types.NewStringDatum("xxx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

// test StartingBy contains special characters

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "x,xx",
},
}
testCases = []testCase{
{
input: `x,xx"abc",1
something x,xxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xx,xxxx,8`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: `x"xx`,
},
}
testCases = []testCase{
{
input: `x"xx"abc",1
something x"xxdef,2
"ghi",3
"yyy"xxx"yyy",4
"yyy",5,xxxyyy,5
qwe,zzzxxxyyy,6
"yyyxxx"yyyxxx",7
yyy",5,xx"xxxx,8
`,
expected: [][]types.Datum{
{types.NewStringDatum("abc"), types.NewStringDatum("1")},
{types.NewStringDatum("def"), types.NewStringDatum("2")},
{types.NewStringDatum("xx"), types.NewStringDatum("8")},
},
},
}
runTestCasesCSV(t, &cfg, 1, testCases)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
StartingBy: "x\nxx",
},
}
_, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil)
require.ErrorContains(t, err, "starting-by cannot contain (line) terminator")
}

func TestCallerCanIgnoreNLines(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases := []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{
{types.NewStringDatum("3"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

testCases = []testCase{
{
input: `"bad syntax"1
"b",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `1,1
2,2
3,3`,
expected: [][]types.Datum{},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100)

// test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes

cfg = config.MydumperRuntime{
CSV: config.CSVConfig{
Separator: ",",
Delimiter: `"`,
Terminator: "\n",
},
}
testCases = []testCase{
{
input: `"a
",1
"b
",2
"c",3`,
expected: [][]types.Datum{
{types.NewStringDatum("b\n"), types.NewStringDatum("2")},
{types.NewStringDatum("c"), types.NewStringDatum("3")},
},
},
}
runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2)
}

func TestCharsetConversion(t *testing.T) {
cfg := config.MydumperRuntime{
CSV: config.CSVConfig{
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func SplitLargeFile(
if err = parser.SetPos(endOffset, prevRowIDMax); err != nil {
return 0, nil, nil, err
}
pos, err := parser.ReadUntilTerminator()
_, pos, err := parser.ReadUntilTerminator()
if err != nil {
if !errors.ErrorEqual(err, io.EOF) {
return 0, nil, nil, err
Expand Down

0 comments on commit 17d70c1

Please sign in to comment.