Skip to content

Commit

Permalink
executor: support load data with ignore lines (#7576)
Browse files Browse the repository at this point in the history
  • Loading branch information
mccxj authored and shenli committed Sep 6, 2018
1 parent c564ebb commit b2bfd8f
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 27 deletions.
13 changes: 7 additions & 6 deletions ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,13 @@ func (n *Assignment) Accept(v Visitor) (Node, bool) {
type LoadDataStmt struct {
dmlNode

IsLocal bool
Path string
Table *TableName
Columns []*ColumnName
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IsLocal bool
Path string
Table *TableName
Columns []*ColumnName
FieldsInfo *FieldsClause
LinesInfo *LinesClause
IgnoreLines uint64
}

// Accept implements Node Accept interface.
Expand Down
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor {
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
},
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ type testCase struct {

func checkCases(tests []testCase, ld *executor.LoadDataInfo,
c *C, tk *testkit.TestKit, ctx sessionctx.Context, selectSQL, deleteSQL string) {
origin := ld.IgnoreLines
for _, tt := range tests {
ld.IgnoreLines = origin
c.Assert(ctx.NewTxn(), IsNil)
ctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true
ctx.GetSessionVars().StmtCtx.BadNullAsWarning = true
Expand Down
19 changes: 12 additions & 7 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
type LoadDataInfo struct {
*InsertValues

row []types.Datum
Path string
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
Ctx sessionctx.Context
columns []*table.Column
row []types.Datum
Path string
Table table.Table
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64
Ctx sessionctx.Context
columns []*table.Column
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
Expand Down Expand Up @@ -235,6 +236,10 @@ func (e *LoadDataInfo) InsertData(prevData, curData []byte) ([]byte, bool, error
curData = nil
}

if e.IgnoreLines > 0 {
e.IgnoreLines--
continue
}
cols, err := e.getFieldsFromLine(line)
if err != nil {
return nil, false, errors.Trace(err)
Expand Down
19 changes: 19 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,25 @@ func (s *testSuite) TestLoadDataSpecifiedColumns(c *C) {
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite) TestLoadDataIgnoreLines(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test; drop table if exists load_data_test;")
tk.MustExec("CREATE TABLE load_data_test (id INT NOT NULL PRIMARY KEY, value TEXT NOT NULL) CHARACTER SET utf8")
tk.MustExec("load data local infile '/tmp/nonexistence.csv' into table load_data_test ignore 1 lines")
ctx := tk.Se.(sessionctx.Context)
ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo)
c.Assert(ok, IsTrue)
defer ctx.SetValue(executor.LoadDataVarKey, nil)
c.Assert(ld, NotNil)
tests := []testCase{
{nil, []byte("1\tline1\n2\tline2\n"), []string{"2|line2"}, nil},
{nil, []byte("1\tline1\n2\tline2\n3\tline3\n"), []string{"2|line2", "3|line3"}, nil},
}
deleteSQL := "delete from load_data_test"
selectSQL := "select * from load_data_test;"
checkCases(tests, ld, c, tk, ctx, selectSQL, deleteSQL)
}

func (s *testSuite) TestBatchInsertDelete(c *C) {
originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
defer func() {
Expand Down
15 changes: 13 additions & 2 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ import (
OptBinMod "Optional BINARY mode"
OptCharset "Optional Character setting"
OptCollate "Optional Collate setting"
IgnoreLines "Ignore num(int) lines"
NUM "A number"
NumList "Some numbers"
LengthNum "Field length num(uint64)"
Expand Down Expand Up @@ -6894,12 +6895,13 @@ RevokeStmt:
* See https://dev.mysql.com/doc/refman/5.7/en/load-data.html
*******************************************************************************************/
LoadDataStmt:
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines ColumnNameListOptWithBrackets
"LOAD" "DATA" LocalOpt "INFILE" stringLit "INTO" "TABLE" TableName CharsetOpt Fields Lines IgnoreLines ColumnNameListOptWithBrackets
{
x := &ast.LoadDataStmt{
Path: $5,
Table: $8.(*ast.TableName),
Columns: $12.([]*ast.ColumnName),
Columns: $13.([]*ast.ColumnName),
IgnoreLines:$12.(uint64),
}
if $3 != nil {
x.IsLocal = true
Expand All @@ -6913,6 +6915,15 @@ LoadDataStmt:
$$ = x
}

IgnoreLines:
{
$$ = uint64(0)
}
| "IGNORE" NUM "LINES"
{
$$ = getUint64FromNUM($2)
}

CharsetOpt:
{}
| "CHARACTER" "SET" CharsetName
Expand Down
5 changes: 5 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ func (s *testParserSuite) TestDMLStmt(c *C) {
{"load data local infile '/tmp/t.csv' into table t character set utf8 fields terminated by 'ab' lines terminated by 'xy' (a,b)", true},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' lines terminated by 'xy' (a,b)", true},
{"load data local infile '/tmp/t.csv' into table t (a,b) fields terminated by 'ab'", false},
{"load data local infile '/tmp/t.csv' into table t ignore 1 lines", true},
{"load data local infile '/tmp/t.csv' into table t ignore -1 lines", false},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' (a,b) ignore 1 lines", false},
{"load data local infile '/tmp/t.csv' into table t lines starting by 'ab' terminated by 'xy' ignore 1 lines", true},
{"load data local infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b' escaped by '*' ignore 1 lines (a,b)", true},

// select for update
{"SELECT * from t for update", true},
Expand Down
13 changes: 7 additions & 6 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,13 @@ type Analyze struct {
type LoadData struct {
baseSchemaProducer

IsLocal bool
Path string
Table *ast.TableName
Columns []*ast.ColumnName
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IsLocal bool
Path string
Table *ast.TableName
Columns []*ast.ColumnName
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
IgnoreLines uint64

GenCols InsertGeneratedColumns
}
Expand Down
13 changes: 7 additions & 6 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,12 +1268,13 @@ func (b *planBuilder) buildSelectPlanOfInsert(insert *ast.InsertStmt, insertPlan

func (b *planBuilder) buildLoadData(ld *ast.LoadDataStmt) (Plan, error) {
p := &LoadData{
IsLocal: ld.IsLocal,
Path: ld.Path,
Table: ld.Table,
Columns: ld.Columns,
FieldsInfo: ld.FieldsInfo,
LinesInfo: ld.LinesInfo,
IsLocal: ld.IsLocal,
Path: ld.Path,
Table: ld.Table,
Columns: ld.Columns,
FieldsInfo: ld.FieldsInfo,
LinesInfo: ld.LinesInfo,
IgnoreLines: ld.IgnoreLines,
}
tableInfo := p.Table.TableInfo
tableInPlan, ok := b.is.TableByID(tableInfo.ID)
Expand Down

0 comments on commit b2bfd8f

Please sign in to comment.