Skip to content

Commit

Permalink
parser: support 'CREATE TABLE ... SELECT' syntax (#4754) (#6851)
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored and zz-jason committed Jun 22, 2018
1 parent 1fbcc10 commit 20c5675
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 37 deletions.
21 changes: 21 additions & 0 deletions ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ type CreateTableStmt struct {
Constraints []*Constraint
Options []*TableOption
Partition *PartitionOptions
OnDuplicate OnDuplicateCreateTableSelectType
Select ResultSetNode
}

// Accept implements Node Accept interface.
Expand Down Expand Up @@ -439,6 +441,14 @@ func (n *CreateTableStmt) Accept(v Visitor) (Node, bool) {
}
n.Constraints[i] = node.(*Constraint)
}
if n.Select != nil {
node, ok := n.Select.Accept(v)
if !ok {
return n, false
}
n.Select = node.(ResultSetNode)
}

return v.Leave(n)
}

Expand Down Expand Up @@ -656,6 +666,17 @@ const (
RowFormatCompact
)

// OnDuplicateCreateTableSelectType is the option that handle unique key values in 'CREATE TABLE ... SELECT'.
// See https://dev.mysql.com/doc/refman/5.7/en/create-table-select.html
type OnDuplicateCreateTableSelectType int

// OnDuplicateCreateTableSelect types
const (
OnDuplicateCreateTableSelectError OnDuplicateCreateTableSelectType = iota
OnDuplicateCreateTableSelectIgnore
OnDuplicateCreateTableSelectReplace
)

// TableOption is used for parsing table option from SQL.
type TableOption struct {
Tp TableOptionType
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ var (
ErrWrongTableName = terror.ClassDDL.New(codeWrongTableName, mysql.MySQLErrName[mysql.ErrWrongTableName])
// ErrWrongColumnName returns for wrong column name.
ErrWrongColumnName = terror.ClassDDL.New(codeWrongColumnName, mysql.MySQLErrName[mysql.ErrWrongColumnName])
// ErrTableMustHaveColumns returns for missing column when creating a table.
ErrTableMustHaveColumns = terror.ClassDDL.New(codeTableMustHaveColumns, mysql.MySQLErrName[mysql.ErrTableMustHaveColumns])
// ErrWrongNameForIndex returns for wrong index name.
ErrWrongNameForIndex = terror.ClassDDL.New(codeWrongNameForIndex, mysql.MySQLErrName[mysql.ErrWrongNameForIndex])
// ErrUnknownCharacterSet returns unknown character set.
Expand Down Expand Up @@ -564,6 +566,7 @@ const (
codeErrTooLongIndexComment = terror.ErrCode(mysql.ErrTooLongIndexComment)
codeUnknownCharacterSet = terror.ErrCode(mysql.ErrUnknownCharacterSet)
codeCantCreateTable = terror.ErrCode(mysql.ErrCantCreateTable)
codeTableMustHaveColumns = terror.ErrCode(mysql.ErrTableMustHaveColumns)
)

func init() {
Expand Down Expand Up @@ -592,6 +595,7 @@ func init() {
codeWrongColumnName: mysql.ErrWrongColumnName,
codeWrongKeyColumn: mysql.ErrWrongKeyColumn,
codeWrongNameForIndex: mysql.ErrWrongNameForIndex,
codeTableMustHaveColumns: mysql.ErrTableMustHaveColumns,
codeTooManyFields: mysql.ErrTooManyFields,
codeErrTooLongIndexComment: mysql.ErrTooLongIndexComment,
codeUnknownCharacterSet: mysql.ErrUnknownCharacterSet,
Expand Down
31 changes: 23 additions & 8 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *testDBSuite) TestMySQLErrorCode(c *C) {
// drop database
sql = "drop database db_not_exist"
s.testErrorCode(c, sql, tmysql.ErrDBDropExists)
// crate table
// create table
s.tk.MustExec("create table test_error_code_succ (c1 int, c2 int, c3 int, primary key(c3))")
sql = "create table test_error_code_succ (c1 int, c2 int, c3 int)"
s.testErrorCode(c, sql, tmysql.ErrTableExists)
Expand Down Expand Up @@ -179,6 +179,10 @@ func (s *testDBSuite) TestMySQLErrorCode(c *C) {
s.testErrorCode(c, sql, tmysql.ErrUnknownCharacterSet)
sql = "create table test_error_code (a int not null ,b int not null,c int not null, d int not null, foreign key (b, c) references product(id));"
s.testErrorCode(c, sql, tmysql.ErrWrongFkDef)
sql = "create table test_error_code_2;"
s.testErrorCode(c, sql, tmysql.ErrTableMustHaveColumns)
sql = "create table test_error_code_2 (unique(c1));"
s.testErrorCode(c, sql, tmysql.ErrTableMustHaveColumns)
sql = "create table test_error_code_2(c1 int, c2 int, c3 int, primary key(c1), primary key(c2));"
s.testErrorCode(c, sql, tmysql.ErrMultiplePriKey)
sql = "create table test_error_code_3(pt blob ,primary key (pt));"
Expand Down Expand Up @@ -1477,18 +1481,27 @@ func (s *testDBSuite) TestCreateTableWithLike(c *C) {
s.tk.MustExec("insert into t set c2=1")
s.tk.MustExec("create table t1 like ctwl_db.t")
s.tk.MustExec("insert into t1 set c2=11")
s.tk.MustExec("create table t2 (like ctwl_db.t1)")
s.tk.MustExec("insert into t2 set c2=12")
s.tk.MustQuery("select * from t").Check(testkit.Rows("10 1"))
s.tk.MustQuery("select * from t1").Check(testkit.Rows("1 11"))
s.tk.MustQuery("select * from t2").Check(testkit.Rows("1 12"))
ctx := s.tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("ctwl_db"), model.NewCIStr("t1"))
tbl1, err := is.TableByName(model.NewCIStr("ctwl_db"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
c.Assert(tblInfo.ForeignKeys, IsNil)
c.Assert(tblInfo.PKIsHandle, Equals, true)
col := tblInfo.Columns[0]
tbl1Info := tbl1.Meta()
c.Assert(tbl1Info.ForeignKeys, IsNil)
c.Assert(tbl1Info.PKIsHandle, Equals, true)
col := tbl1Info.Columns[0]
hasNotNull := tmysql.HasNotNullFlag(col.Flag)
c.Assert(hasNotNull, IsTrue)
tbl2, err := is.TableByName(model.NewCIStr("ctwl_db"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
tbl2Info := tbl2.Meta()
c.Assert(tbl2Info.ForeignKeys, IsNil)
c.Assert(tbl2Info.PKIsHandle, Equals, true)
c.Assert(tmysql.HasNotNullFlag(tbl2Info.Columns[0].Flag), IsTrue)

// for different databases
s.tk.MustExec("create database ctwl_db1")
Expand All @@ -1497,15 +1510,17 @@ func (s *testDBSuite) TestCreateTableWithLike(c *C) {
s.tk.MustExec("insert into t1 set c2=11")
s.tk.MustQuery("select * from t1").Check(testkit.Rows("1 11"))
is = domain.GetDomain(ctx).InfoSchema()
tbl, err = is.TableByName(model.NewCIStr("ctwl_db1"), model.NewCIStr("t1"))
tbl1, err = is.TableByName(model.NewCIStr("ctwl_db1"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
c.Assert(tbl.Meta().ForeignKeys, IsNil)
c.Assert(tbl1.Meta().ForeignKeys, IsNil)

// for failure cases
failSQL := fmt.Sprintf("create table t1 like test_not_exist.t")
s.testErrorCode(c, failSQL, tmysql.ErrNoSuchTable)
failSQL = fmt.Sprintf("create table t1 like test.t_not_exist")
s.testErrorCode(c, failSQL, tmysql.ErrNoSuchTable)
failSQL = fmt.Sprintf("create table t1 (like test_not_exist.t)")
s.testErrorCode(c, failSQL, tmysql.ErrNoSuchTable)
failSQL = fmt.Sprintf("create table test_not_exis.t1 like ctwl_db.t")
s.testErrorCode(c, failSQL, tmysql.ErrBadDB)
failSQL = fmt.Sprintf("create table t1 like ctwl_db.t")
Expand Down
132 changes: 103 additions & 29 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ import (
ConstraintKeywordOpt "Constraint Keyword or empty"
CreateIndexStmtUnique "CREATE INDEX optional UNIQUE clause"
CreateTableOptionListOpt "create table option list opt"
CreateTableSelectOpt "Select/Union statement in CREATE TABLE ... SELECT"
DatabaseOption "CREATE Database specification"
DatabaseOptionList "CREATE Database specification list"
DatabaseOptionListOpt "CREATE Database specification list opt"
Expand Down Expand Up @@ -653,6 +654,7 @@ import (
JoinType "join type"
KillOrKillTiDB "Kill or Kill TiDB"
LikeEscapeOpt "like escape option"
LikeTableWithOrWithoutParen "LIKE table_name or ( LIKE table_name )"
LimitClause "LIMIT clause"
LimitOption "Limit option could be integer or parameter marker."
Lines "Lines clause"
Expand All @@ -663,6 +665,7 @@ import (
NoWriteToBinLogAliasOpt "NO_WRITE_TO_BINLOG alias LOCAL or empty"
ObjectType "Grant statement object type"
OnDuplicateKeyUpdate "ON DUPLICATE KEY UPDATE value list"
DuplicateOpt "[IGNORE|REPLACE] in CREATE TABLE ... SELECT statement"
OptFull "Full or empty"
Order "ORDER BY clause optional collation specification"
OrderBy "ORDER BY clause"
Expand Down Expand Up @@ -719,6 +722,7 @@ import (
TableAsNameOpt "table alias name optional"
TableElement "table definition element"
TableElementList "table definition element list"
TableElementListOpt "table definition element list optional"
TableFactor "table factor"
TableLock "Table name and lock type"
TableLockList "Table lock list"
Expand Down Expand Up @@ -799,6 +803,7 @@ import (
TableOptimizerHintList "Table level optimizer hint list"

%type <ident>
AsOpt "AS or EmptyString"
KeyOrIndex "{KEY|INDEX}"
ColumnKeywordOpt "Column keyword or empty"
PrimaryOpt "Optional primary keyword"
Expand Down Expand Up @@ -855,6 +860,8 @@ import (
%precedence set
%precedence lowerThanInsertValues
%precedence insertValues
%precedence lowerThanCreateTableSelect
%precedence createTableSelect
%precedence lowerThanKey
%precedence key

Expand Down Expand Up @@ -1727,42 +1734,26 @@ DatabaseOptionList:
* PRIMARY KEY (P_Id)
* )
*******************************************************************/

CreateTableStmt:
"CREATE" "TABLE" IfNotExists TableName '(' TableElementList ')' CreateTableOptionListOpt PartitionOpt
"CREATE" "TABLE" IfNotExists TableName TableElementListOpt CreateTableOptionListOpt PartitionOpt DuplicateOpt AsOpt CreateTableSelectOpt
{
tes := $6.([]interface {})
var columnDefs []*ast.ColumnDef
var constraints []*ast.Constraint
for _, te := range tes {
switch te := te.(type) {
case *ast.ColumnDef:
columnDefs = append(columnDefs, te)
case *ast.Constraint:
constraints = append(constraints, te)
}
}
if len(columnDefs) == 0 {
yylex.Errorf("Column Definition List can't be empty.")
return 1
}
var part *ast.PartitionOptions
if $9 != nil {
part = $9.(*ast.PartitionOptions)
}
$$ = &ast.CreateTableStmt{
Table: $4.(*ast.TableName),
IfNotExists: $3.(bool),
Cols: columnDefs,
Constraints: constraints,
Options: $8.([]*ast.TableOption),
Partition: part,
stmt := $5.(*ast.CreateTableStmt)
stmt.Table = $4.(*ast.TableName)
stmt.IfNotExists = $3.(bool)
stmt.Options = $6.([]*ast.TableOption)
if $7 != nil {
stmt.Partition = $7.(*ast.PartitionOptions)
}
stmt.OnDuplicate = $8.(ast.OnDuplicateCreateTableSelectType)
stmt.Select = $10.(*ast.CreateTableStmt).Select
$$ = stmt
}
| "CREATE" "TABLE" IfNotExists TableName "LIKE" TableName
| "CREATE" "TABLE" IfNotExists TableName LikeTableWithOrWithoutParen
{
$$ = &ast.CreateTableStmt{
Table: $4.(*ast.TableName),
ReferTable: $6.(*ast.TableName),
ReferTable: $5.(*ast.TableName),
IfNotExists: $3.(bool),
}
}
Expand Down Expand Up @@ -1814,6 +1805,7 @@ PartitionNumOpt:
{}

PartitionDefinitionListOpt:
/* empty */ %prec lowerThanCreateTableSelect
{
$$ = nil
}
Expand Down Expand Up @@ -1876,6 +1868,57 @@ PartDefStorageOpt:
| "ENGINE" eq Identifier
{}

DuplicateOpt:
{
$$ = ast.OnDuplicateCreateTableSelectError
}
| "IGNORE"
{
$$ = ast.OnDuplicateCreateTableSelectIgnore
}
| "REPLACE"
{
$$ = ast.OnDuplicateCreateTableSelectReplace
}

AsOpt:
{}
| "AS"
{}

CreateTableSelectOpt:
/* empty */
{
$$ = &ast.CreateTableStmt{}
}
|
SelectStmt
{
$$ = &ast.CreateTableStmt{Select: $1}
}
|
UnionStmt
{
$$ = &ast.CreateTableStmt{Select: $1}
}
|
SubSelect %prec createTableSelect
// TODO: We may need better solution as issue #320.
{
$$ = &ast.CreateTableStmt{Select: $1}
}

LikeTableWithOrWithoutParen:
"LIKE" TableName
{
$$ = $2
}
|
'(' "LIKE" TableName ')'
{
$$ = $3
}

/*******************************************************************
*
* Create View Statement
Expand Down Expand Up @@ -5610,6 +5653,36 @@ TableElementList:
}
}

TableElementListOpt:
/* empty */ %prec lowerThanCreateTableSelect
{
var columnDefs []*ast.ColumnDef
var constraints []*ast.Constraint
$$ = &ast.CreateTableStmt{
Cols: columnDefs,
Constraints: constraints,
}
}
|
'(' TableElementList ')'
{
tes := $2.([]interface {})
var columnDefs []*ast.ColumnDef
var constraints []*ast.Constraint
for _, te := range tes {
switch te := te.(type) {
case *ast.ColumnDef:
columnDefs = append(columnDefs, te)
case *ast.Constraint:
constraints = append(constraints, te)
}
}
$$ = &ast.CreateTableStmt{
Cols: columnDefs,
Constraints: constraints,
}
}

TableOption:
"ENGINE" StringName
{
Expand Down Expand Up @@ -5702,6 +5775,7 @@ AlterTableOptionListOpt:
| TableOptionList %prec higherThanComma

CreateTableOptionListOpt:
/* empty */ %prec lowerThanCreateTableSelect
{
$$ = []*ast.TableOption{}
}
Expand Down
17 changes: 17 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,7 +1566,24 @@ func (s *testParserSuite) TestDDL(c *C) {
{"create table if not exists `t` (`id` int not null auto_increment comment '消息ID', primary key `pk_id` (`id`) );", true},
// Create table with like.
{"create table a like b", true},
{"create table a (like b)", true},
{"create table if not exists a like b", true},
{"create table if not exists a (like b)", true},
{"create table if not exists a like (b)", false},
{"create table a (t int) like b", false},
{"create table a (t int) like (b)", false},
// Create table with select statement
{"create table a select * from b", true},
{"create table a as select * from b", true},
{"create table a (m int, n datetime) as select * from b", true},
{"create table a (unique(n)) as select n from b", true},
{"create table a ignore as select n from b", true},
{"create table a replace as select n from b", true},
{"create table a (m int) replace as (select n as m from b union select n+1 as m from c group by 1 limit 2)", true},

// Create table with no option is valid for parser
{"create table a", true},

{"create table t (a timestamp default now)", false},
{"create table t (a timestamp default now())", true},
{"create table t (a timestamp default now() on update now)", false},
Expand Down
8 changes: 8 additions & 0 deletions plan/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) {
}
}
}
if stmt.Select != nil {
// FIXME: a temp error noticing 'not implemented' (issue 4754)
p.err = errors.New("'CREATE TABLE ... SELECT' is not implemented yet")
return
} else if len(stmt.Cols) == 0 && stmt.ReferTable == nil {
p.err = ddl.ErrTableMustHaveColumns
return
}
}

func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
Expand Down
Loading

0 comments on commit 20c5675

Please sign in to comment.