Skip to content

Commit

Permalink
importinto: import from select parser/planner part (pingcap#49976)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Jan 9, 2024
1 parent 978436a commit edecaa6
Show file tree
Hide file tree
Showing 12 changed files with 7,642 additions and 7,408 deletions.
18 changes: 15 additions & 3 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,14 +1025,26 @@ func (b *executorBuilder) buildImportInto(v *plannercore.ImportInto) exec.Execut
return nil
}

base := exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())
exec, err := newImportIntoExec(base, b.ctx, v, tbl)
var (
selectExec exec.Executor
base exec.BaseExecutor
)
if v.SelectPlan != nil {
selectExec = b.build(v.SelectPlan)
if b.err != nil {
return nil
}
base = exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), selectExec)
} else {
base = exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())
}
executor, err := newImportIntoExec(base, selectExec, b.ctx, v, tbl)
if err != nil {
b.err = err
return nil
}

return exec
return executor
}

func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) exec.Executor {
Expand Down
14 changes: 14 additions & 0 deletions pkg/executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,17 @@ func TestExplainFormatInCtx(t *testing.T) {
}
}
}

func TestExplainImportFromSelect(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key)")
tk.MustExec("create table t_o (a int primary key)")
tk.MustExec("insert into t values (2)")
rs := tk.MustQuery("explain import into t_o from select * from t").Rows()
require.Contains(t, rs[0][0], "ImportInto")
require.Contains(t, rs[1][0], "TableReader")
require.Contains(t, rs[2][0], "TableFullScan")
}
6 changes: 4 additions & 2 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const unknownImportedRowCount = -1
// ImportIntoExec represents a IMPORT INTO executor.
type ImportIntoExec struct {
exec.BaseExecutor
selectExec exec.Executor
userSctx sessionctx.Context
importPlan *importer.Plan
controller *importer.LoadDataController
Expand All @@ -70,8 +71,8 @@ var (
_ exec.Executor = (*ImportIntoExec)(nil)
)

func newImportIntoExec(b exec.BaseExecutor, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (
*ImportIntoExec, error) {
func newImportIntoExec(b exec.BaseExecutor, selectExec exec.Executor, userSctx sessionctx.Context,
plan *plannercore.ImportInto, tbl table.Table) (*ImportIntoExec, error) {
importPlan, err := importer.NewImportPlan(userSctx, plan, tbl)
if err != nil {
return nil, err
Expand All @@ -83,6 +84,7 @@ func newImportIntoExec(b exec.BaseExecutor, userSctx sessionctx.Context, plan *p
}
return &ImportIntoExec{
BaseExecutor: b,
selectExec: selectExec,
userSctx: userSctx,
importPlan: importPlan,
controller: controller,
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,7 @@ func (e *ShowExec) fetchShowSessionStates(ctx context.Context) error {
e.appendRow([]interface{}{stateJSON, tokenJSON})
return nil
}

func fillOneImportJobInfo(info *importer.JobInfo, result *chunk.Chunk, importedRowCount int64) {
fullTableName := utils.EncloseDBAndTable(info.TableSchema, info.TableName)
result.AppendInt64(0, info.ID)
Expand Down
22 changes: 18 additions & 4 deletions pkg/parser/ast/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,7 @@ type ImportIntoStmt struct {
Path string
Format *string
Options []*LoadDataOpt
Select ResultSetNode
}

var _ SensitiveStmtNode = &ImportIntoStmt{}
Expand Down Expand Up @@ -2144,10 +2145,16 @@ func (n *ImportIntoStmt) Restore(ctx *format.RestoreCtx) error {
}
}
ctx.WriteKeyWord(" FROM ")
ctx.WriteString(n.Path)
if n.Format != nil {
ctx.WriteKeyWord(" FORMAT ")
ctx.WriteString(*n.Format)
if n.Select != nil {
if err := n.Select.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while restore ImportIntoStmt.Select")
}
} else {
ctx.WriteString(n.Path)
if n.Format != nil {
ctx.WriteKeyWord(" FORMAT ")
ctx.WriteString(*n.Format)
}
}

if len(n.Options) > 0 {
Expand Down Expand Up @@ -2194,6 +2201,13 @@ func (n *ImportIntoStmt) Accept(v Visitor) (Node, bool) {
}
n.ColumnAssignments[i] = node.(*Assignment)
}
if n.Select != nil {
node, ok := n.Select.Accept(v)
if !ok {
return n, false
}
n.Select = node.(ResultSetNode)
}
return v.Leave(n)
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/parser/ast/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,31 @@ func TestImportIntoRestore(t *testing.T) {
sourceSQL: "IMPORT INTO `t` from '/file.csv' with fields_terminated_by=_UTF8MB4'\t', detached, thread=1",
expectSQL: "IMPORT INTO `t` FROM '/file.csv' WITH fields_terminated_by=_UTF8MB4'\t', detached, thread=1",
},
{
// SelectStmt
sourceSQL: "IMPORT INTO `t` from select * from xx",
expectSQL: "IMPORT INTO `t` FROM SELECT * FROM `xx`",
},
{
// SelectStmtWithClause
sourceSQL: "IMPORT INTO `t` from with `c` as (select * from `xx`) select * from `c` with thread=1",
expectSQL: "IMPORT INTO `t` FROM WITH `c` AS (SELECT * FROM `xx`) SELECT * FROM `c` WITH thread=1",
},
{
// SetOprStmt
sourceSQL: "IMPORT INTO `t` from select * from `xx` union select * from `yy` with thread=1",
expectSQL: "IMPORT INTO `t` FROM SELECT * FROM `xx` UNION SELECT * FROM `yy` WITH thread=1",
},
{
// SetOprStmt
sourceSQL: "IMPORT INTO `t` from with `c` as (select * from `xx`) select * from `c` union select * from `c` with thread=1",
expectSQL: "IMPORT INTO `t` FROM WITH `c` AS (SELECT * FROM `xx`) SELECT * FROM `c` UNION SELECT * FROM `c` WITH thread=1",
},
{
// SubSelect
sourceSQL: "IMPORT INTO `t` from (select * from xx)",
expectSQL: "IMPORT INTO `t` FROM (SELECT * FROM `xx`)",
},
}
extractNodeFunc := func(node Node) Node {
return node.(*ImportIntoStmt)
Expand Down Expand Up @@ -611,3 +636,13 @@ func TestImportIntoSecureText(t *testing.T) {
require.Regexp(t, tc.secured, n.SecureText(), comment)
}
}

func TestImportIntoFromSelectInvalidStmt(t *testing.T) {
p := parser.New()
_, err := p.ParseOneStmt("IMPORT INTO t1(a, @1) FROM select * from t2;", "", "")
require.ErrorContains(t, err, "Cannot use user variable(1) in IMPORT INTO FROM SELECT statement")
_, err = p.ParseOneStmt("IMPORT INTO t1(a, @b) FROM select * from t2;", "", "")
require.ErrorContains(t, err, "Cannot use user variable(b) in IMPORT INTO FROM SELECT statement")
_, err = p.ParseOneStmt("IMPORT INTO t1(a) set a=1 FROM select a from t2;", "", "")
require.ErrorContains(t, err, "Cannot use SET clause in IMPORT INTO FROM SELECT statement.")
}
Loading

0 comments on commit edecaa6

Please sign in to comment.