Skip to content

Commit

Permalink
ddl: Exchange partition rollback (#45877)
Browse files Browse the repository at this point in the history
close #45791, close #45920
  • Loading branch information
mjonss authored Aug 10, 2023
1 parent d819ea3 commit c7c7000
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 129 deletions.
118 changes: 118 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3282,6 +3282,124 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
require.NoError(t, err)
}

func TestExchangePartitionMultiTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)

dbName := "ExchangeMultiTable"
tk1.MustExec(`create schema ` + dbName)
tk1.MustExec(`use ` + dbName)
tk1.MustExec(`CREATE TABLE t1 (a int)`)
tk1.MustExec(`CREATE TABLE t2 (a int)`)
tk1.MustExec(`CREATE TABLE tp (a int) partition by hash(a) partitions 3`)
tk1.MustExec(`insert into t1 values (0)`)
tk1.MustExec(`insert into t2 values (3)`)
tk1.MustExec(`insert into tp values (6)`)

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(`use ` + dbName)
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`use ` + dbName)
tk4 := testkit.NewTestKit(t, store)
tk4.MustExec(`use ` + dbName)
waitFor := func(col int, tableName, s string) {
for {
tk4 := testkit.NewTestKit(t, store)
tk4.MustExec(`use test`)
sql := `admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'`
res := tk4.MustQuery(sql).Rows()
if len(res) == 1 && res[0][col] == s {
break
}
time.Sleep(10 * time.Millisecond)
}
}
alterChan1 := make(chan error)
alterChan2 := make(chan error)
tk3.MustExec(`BEGIN`)
tk3.MustExec(`insert into tp values (1)`)
go func() {
alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`)
}()
waitFor(11, "t1", "running")
go func() {
alterChan2 <- tk2.ExecToErr(`alter table tp exchange partition p0 with table t2`)
}()
waitFor(11, "t2", "queueing")
tk3.MustExec(`rollback`)
require.NoError(t, <-alterChan1)
err := <-alterChan2
tk3.MustQuery(`select * from t1`).Check(testkit.Rows("6"))
tk3.MustQuery(`select * from t2`).Check(testkit.Rows("0"))
tk3.MustQuery(`select * from tp`).Check(testkit.Rows("3"))
require.NoError(t, err)
}

func TestExchangePartitionValidation(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

dbName := "ExchangeValidation"
tk.MustExec(`create schema ` + dbName)
tk.MustExec(`use ` + dbName)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name))`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
)
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`,
"[ddl:1737]Found a row that does not match the partition")
tk.MustExec(`insert into t1 values ("2023-08-06","0001")`)
}

func TestExchangePartitionPlacementPolicy(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec(`create schema ExchangePartWithPolicy`)
tk.MustExec(`use ExchangePartWithPolicy`)
tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`)
tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`)
tk.MustExec(`CREATE TABLE t1 (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule1"`)

tk.MustExec(`CREATE TABLE t1p (
d date NOT NULL ,
name varchar(10) NOT NULL,
UNIQUE KEY (d,name)
) PLACEMENT POLICY="rule2"
PARTITION BY RANGE COLUMNS(d)
(PARTITION p202307 VALUES LESS THAN ('2023-08-01'),
PARTITION p202308 VALUES LESS THAN ('2023-09-01'),
PARTITION p202309 VALUES LESS THAN ('2023-10-01'),
PARTITION p202310 VALUES LESS THAN ('2023-11-01'),
PARTITION p202311 VALUES LESS THAN ('2023-12-01'),
PARTITION p202312 VALUES LESS THAN ('2024-01-01'),
PARTITION pfuture VALUES LESS THAN (MAXVALUE))`)

tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`,
"[ddl:1736]Tables have different definitions")
tk.MustExec(`insert into t1 values ("2023-08-06","0000")`)
}

func TestExchangePartitionHook(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4746,7 +4746,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error {
return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name))
}

// NOTE: if nt is temporary table, it should be checked
return nil
}

Expand Down
36 changes: 20 additions & 16 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,24 +1373,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
diff.OldSchemaID = oldSchemaIDs[0]
diff.AffectedOpts = affects
case model.ActionExchangeTablePartition:
var (
ptSchemaID int64
ptTableID int64
partName string
withValidation bool
)
err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.OldTableID = job.TableID
affects := make([]*model.AffectedOption, 1)
affects[0] = &model.AffectedOption{
SchemaID: ptSchemaID,
TableID: ptTableID,
OldTableID: ptTableID,
diff.OldSchemaID = job.SchemaID
if job.SchemaState != model.StatePublic {
diff.TableID = job.TableID
diff.SchemaID = job.SchemaID
} else {
// Update the partitioned table (it is only done in the last state)
var (
ptSchemaID int64
ptTableID int64
ptDefID int64 // Not needed, will reload the whole table
partName string // Not used
withValidation bool // Not used
)
// See ddl.ExchangeTablePartition
err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation)
if err != nil {
return 0, errors.Trace(err)
}
diff.SchemaID = ptSchemaID
diff.TableID = ptTableID
}
diff.AffectedOpts = affects
case model.ActionTruncateTablePartition:
diff.TableID = job.TableID
if len(job.CtxVars) > 0 {
Expand Down
Loading

0 comments on commit c7c7000

Please sign in to comment.