From 17d70c1b7e36fc57b2c7529d4f797e32482c6f1a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 30 Jan 2023 11:23:54 +0800 Subject: [PATCH] lightning/parser: support STARTING BY (#40821) ref pingcap/tidb#40499 --- br/pkg/lightning/config/config.go | 3 + br/pkg/lightning/mydump/csv_parser.go | 63 +++++- br/pkg/lightning/mydump/csv_parser_test.go | 234 +++++++++++++++++++++ br/pkg/lightning/mydump/region.go | 2 +- 4 files changed, 292 insertions(+), 10 deletions(-) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 45d5f1fa334a4..b5236127f5ee1 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -455,6 +455,9 @@ type CSVConfig struct { TrimLastSep bool `toml:"trim-last-separator" json:"trim-last-separator"` NotNull bool `toml:"not-null" json:"not-null"` BackslashEscape bool `toml:"backslash-escape" json:"backslash-escape"` + // hide these options for lightning configuration file, they can only be used by LOAD DATA + // https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-field-line-handling + StartingBy string `toml:"-" json:"-"` } type MydumperRuntime struct { diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index b7d6c6fc21903..26fb65a493183 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -47,9 +47,10 @@ type CSVParser struct { blockParser cfg *config.CSVConfig - comma []byte - quote []byte - newLine []byte + comma []byte + quote []byte + newLine []byte + startingBy []byte charsetConvertor *CharsetConvertor // These variables are used with IndexAnyByte to search a byte slice for the @@ -120,6 +121,12 @@ func NewCSVParser( } unquoteStopSet = append(unquoteStopSet, newLineStopSet...) + if len(cfg.StartingBy) > 0 { + if strings.Contains(cfg.StartingBy, terminator) { + return nil, errors.New("starting-by cannot contain (line) terminator") + } + } + escFlavor := backslashEscapeFlavorNone if cfg.BackslashEscape { escFlavor = backslashEscapeFlavorMySQL @@ -138,6 +145,7 @@ func NewCSVParser( comma: []byte(separator), quote: []byte(delimiter), newLine: []byte(terminator), + startingBy: []byte(cfg.StartingBy), escFlavor: escFlavor, quoteByteSet: makeByteSet(quoteStopSet), unquoteByteSet: makeByteSet(unquoteStopSet), @@ -370,11 +378,43 @@ func (parser *CSVParser) readRecord(dst []string) ([]string, error) { isEmptyLine := true whitespaceLine := true + foundStartingByThisLine := false prevToken := csvTokenNewLine var firstToken csvToken outside: for { + // we should drop + // 1. the whole line if it does not contain startingBy + // 2. any character before startingBy + // since we have checked startingBy does not contain terminator, we can + // split at terminator to check the substring contains startingBy. Even + // if the terminator is inside a quoted field which means it's not the + // end of a line, the substring can still be dropped by rule 2. + if len(parser.startingBy) > 0 && !foundStartingByThisLine { + oldPos := parser.pos + content, _, err := parser.ReadUntilTerminator() + if err != nil { + if !(errors.Cause(err) == io.EOF) { + return nil, err + } + if len(content) == 0 { + return nil, err + } + // if we reached EOF, we should still check the content contains + // startingBy and try to put back and parse it. + } + idx := bytes.Index(content, parser.startingBy) + if idx == -1 { + continue + } + foundStartingByThisLine = true + content = content[idx+len(parser.startingBy):] + content = append(content, parser.newLine...) + parser.buf = append(content, parser.buf...) + parser.pos = oldPos + int64(idx+len(parser.startingBy)) + } + content, firstByte, err := parser.readUntil(&parser.unquoteByteSet) if len(content) > 0 { @@ -415,6 +455,7 @@ outside: } whitespaceLine = false case csvTokenNewLine: + foundStartingByThisLine = false // new line = end of record (ignore empty lines) prevToken = firstToken if isEmptyLine { @@ -578,17 +619,21 @@ func (parser *CSVParser) ReadColumns() error { } // ReadUntilTerminator seeks the file until the terminator token is found, and -// returns the file offset beyond the terminator. -// This function is used in strict-format dividing a CSV file. -func (parser *CSVParser) ReadUntilTerminator() (int64, error) { +// returns +// - the content before terminator +// - the file offset beyond the terminator +// - error +// Note that the terminator string pattern may be the content of a field, which +// means it's inside quotes. Caller should make sure to handle this case. +func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) { for { - _, firstByte, err := parser.readUntil(&parser.newLineByteSet) + content, firstByte, err := parser.readUntil(&parser.newLineByteSet) if err != nil { - return 0, err + return content, 0, err } parser.skipBytes(1) if ok, err := parser.tryReadNewLine(firstByte); ok || err != nil { - return parser.pos, err + return content, parser.pos, err } } } diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index da06c15ed39d9..adb057679b3a4 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -55,6 +55,35 @@ func runTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int } } +func runTestCasesCSVIgnoreNLines(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []testCase, ignoreNLines int) { + for _, tc := range cases { + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + assert.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(tc.input), blockBufSize, ioWorkers, false, charsetConvertor) + assert.NoError(t, err) + + for ignoreNLines > 0 { + // IGNORE N LINES will directly find (line) terminator without checking it's inside quotes + _, _, err = parser.ReadUntilTerminator() + if errors.Cause(err) == io.EOF { + assert.Len(t, tc.expected, 0, "input = %q", tc.input) + return + } + assert.NoError(t, err) + ignoreNLines-- + } + + for i, row := range tc.expected { + comment := fmt.Sprintf("input = %q, row = %d", tc.input, i+1) + e := parser.ReadRow() + assert.NoErrorf(t, e, "input = %q, row = %d, error = %s", tc.input, i+1, errors.ErrorStack(e)) + assert.Equal(t, int64(i)+1, parser.LastRow().RowID, comment) + assert.Equal(t, row, parser.LastRow().Row, comment) + } + assert.ErrorIsf(t, errors.Cause(parser.ReadRow()), io.EOF, "input = %q", tc.input) + } +} + func runFailingTestCasesCSV(t *testing.T, cfg *config.MydumperRuntime, blockBufSize int64, cases []string) { for _, tc := range cases { charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) @@ -935,6 +964,211 @@ func TestTerminator(t *testing.T) { runTestCasesCSV(t, &cfg, 1, testCases) } +func TestStartingBy(t *testing.T) { + cfg := config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "xxx", + }, + } + testCases := []testCase{ + { + input: `xxx"abc",1 +something xxx"def",2 +"ghi",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + testCases = []testCase{ + { + input: `xxxabc,1 +something xxxdef,2 +ghi,3 +"bad syntax"aaa`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + // test that special characters appears before StartingBy, and StartingBy only takes effect after once + + testCases = []testCase{ + { + input: `xxx"abc",1 +something xxxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xxxxxx,8 +`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("4")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("5")}, + {types.NewStringDatum("yyy"), types.NewStringDatum("6")}, + {types.NewStringDatum("yyyxxx"), types.NewStringDatum("7")}, + {types.NewStringDatum("xxx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + // test StartingBy contains special characters + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "x,xx", + }, + } + testCases = []testCase{ + { + input: `x,xx"abc",1 +something x,xxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xx,xxxx,8`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("xx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: `x"xx`, + }, + } + testCases = []testCase{ + { + input: `x"xx"abc",1 +something x"xxdef,2 +"ghi",3 +"yyy"xxx"yyy",4 +"yyy",5,xxxyyy,5 +qwe,zzzxxxyyy,6 +"yyyxxx"yyyxxx",7 +yyy",5,xx"xxxx,8 +`, + expected: [][]types.Datum{ + {types.NewStringDatum("abc"), types.NewStringDatum("1")}, + {types.NewStringDatum("def"), types.NewStringDatum("2")}, + {types.NewStringDatum("xx"), types.NewStringDatum("8")}, + }, + }, + } + runTestCasesCSV(t, &cfg, 1, testCases) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + StartingBy: "x\nxx", + }, + } + _, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, nil, 1, ioWorkers, false, nil) + require.ErrorContains(t, err, "starting-by cannot contain (line) terminator") +} + +func TestCallerCanIgnoreNLines(t *testing.T) { + cfg := config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases := []testCase{ + { + input: `1,1 +2,2 +3,3`, + expected: [][]types.Datum{ + {types.NewStringDatum("3"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) + + testCases = []testCase{ + { + input: `"bad syntax"1 +"b",2 +"c",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("c"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases = []testCase{ + { + input: `1,1 +2,2 +3,3`, + expected: [][]types.Datum{}, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 100) + + // test IGNORE N LINES will directly find (line) terminator without checking it's inside quotes + + cfg = config.MydumperRuntime{ + CSV: config.CSVConfig{ + Separator: ",", + Delimiter: `"`, + Terminator: "\n", + }, + } + testCases = []testCase{ + { + input: `"a +",1 +"b +",2 +"c",3`, + expected: [][]types.Datum{ + {types.NewStringDatum("b\n"), types.NewStringDatum("2")}, + {types.NewStringDatum("c"), types.NewStringDatum("3")}, + }, + }, + } + runTestCasesCSVIgnoreNLines(t, &cfg, 1, testCases, 2) +} + func TestCharsetConversion(t *testing.T) { cfg := config.MydumperRuntime{ CSV: config.CSVConfig{ diff --git a/br/pkg/lightning/mydump/region.go b/br/pkg/lightning/mydump/region.go index da3b4d0af1a53..aba71f666be2e 100644 --- a/br/pkg/lightning/mydump/region.go +++ b/br/pkg/lightning/mydump/region.go @@ -431,7 +431,7 @@ func SplitLargeFile( if err = parser.SetPos(endOffset, prevRowIDMax); err != nil { return 0, nil, nil, err } - pos, err := parser.ReadUntilTerminator() + _, pos, err := parser.ReadUntilTerminator() if err != nil { if !errors.ErrorEqual(err, io.EOF) { return 0, nil, nil, err