Skip to content

Commit

Permalink
*: support REORGANIZE PARTITION, part 1 - data reorg / data+index cop…
Browse files Browse the repository at this point in the history
…ying (#38460)

ref #15000, ref #38535
  • Loading branch information
mjonss committed Dec 30, 2022
1 parent fa2a2ba commit 25df00e
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 32 deletions.
11 changes: 9 additions & 2 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3
typeReorgPartitionWorker backfillerType = 4

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
Expand Down Expand Up @@ -238,7 +239,7 @@ type backfillWorker struct {
sessCtx sessionctx.Context
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
table table.Table
table table.PhysicalTable
priority int
tp backfillerType
ctx context.Context
Expand Down Expand Up @@ -540,7 +541,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
}

// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.PhysicalTable,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := make([]*reorgBackfillTask, 0, backfillTaskChanSize)
reorgInfo := scheduler.reorgInfo
Expand Down Expand Up @@ -766,6 +767,12 @@ func (b *backfillScheduler) adjustWorkerSize() error {
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
worker, runner = idxWorker, idxWorker.backfillWorker
case typeReorgPartitionWorker:
partWorker, err := newReorgPartitionWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc)
if err != nil {
return err
}
worker, runner = partWorker, partWorker.backfillWorker
default:
return errors.New("unknown backfill type")
}
Expand Down
40 changes: 35 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,9 +1042,35 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
if tbl, ok := t.(table.PartitionedTable); ok {
done := false
for !done {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
workType := typeReorgPartitionWorker
if reorgInfo.Job.Type != model.ActionReorganizePartition {
workType = typeUpdateColumnWorker
panic("FIXME: See https://github.com/pingcap/tidb/issues/39915")
}
err := w.writePhysicalTableRecord(w.sessPool, p, workType, reorgInfo)
if err != nil {
return err
}
done, err = w.updateReorgInfo(tbl, reorgInfo)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down Expand Up @@ -1075,6 +1101,10 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}
}

if _, ok := t.(table.PartitionedTable); ok {
// TODO: Remove this
panic("FIXME: this got reverted and needs to be back!")
}
// Get the original start handle and end handle.
currentVer, err := getValidCurrentVersion(reorgInfo.d.store)
if err != nil {
Expand Down Expand Up @@ -1185,8 +1215,8 @@ type rowRecord struct {
warning *terror.Error // It's used to record the cast warning of a record.
}

// getNextKey gets next handle of entry that we are going to process.
func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
// getNextHandleKey gets next handle of entry that we are going to process.
func getNextHandleKey(taskRange reorgBackfillTask,
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
Expand Down Expand Up @@ -1236,7 +1266,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
}

logutil.BgLogger().Debug("[ddl] txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, w.getNextKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
Expand Down
4 changes: 2 additions & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,15 @@ func TestTransactionWithWriteOnlyColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)
require.NoError(t, checkErr)
tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.NoError(t, checkErr)
Expand Down
4 changes: 2 additions & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,14 +1214,14 @@ func TestBitDefaultValue(t *testing.T) {
);`)
}

func backgroundExec(s kv.Storage, sql string, done chan error) {
func backgroundExec(s kv.Storage, schema, sql string, done chan error) {
se, err := session.CreateSession4Test(s)
if err != nil {
done <- errors.Trace(err)
return
}
defer se.Close()
_, err = se.Execute(context.Background(), "use test")
_, err = se.Execute(context.Background(), "use "+schema)
if err != nil {
done <- errors.Trace(err)
return
Expand Down
7 changes: 2 additions & 5 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3343,9 +3343,6 @@ func TestPartitionErrorCode(t *testing.T) {
);`)
tk.MustGetDBError("alter table t_part coalesce partition 4;", dbterror.ErrCoalesceOnlyOnHashPartition)

tk.MustGetErrCode(`alter table t_part reorganize partition p0, p1 into (
partition p0 values less than (1980));`, errno.ErrUnsupportedDDLOperation)

tk.MustGetErrCode("alter table t_part check partition p0, p1;", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part optimize partition p0,p1;", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_part rebuild partition p0,p1;", errno.ErrUnsupportedDDLOperation)
Expand Down Expand Up @@ -3757,9 +3754,9 @@ func TestTruncatePartitionMultipleTimes(t *testing.T) {
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
done1 := make(chan error, 1)
go backgroundExec(store, "alter table test.t truncate partition p0;", done1)
go backgroundExec(store, "test", "alter table test.t truncate partition p0;", done1)
done2 := make(chan error, 1)
go backgroundExec(store, "alter table test.t truncate partition p0;", done2)
go backgroundExec(store, "test", "alter table test.t truncate partition p0;", done2)
<-done1
<-done2
require.LessOrEqual(t, errCount, int32(1))
Expand Down
8 changes: 4 additions & 4 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,15 @@ func TestTransactionOnAddDropColumn(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null after a", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null after a", done)
err := <-done
require.NoError(t, err)
require.Nil(t, checkErr)
tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
tk.MustExec("delete from t1")

// test transaction on drop column.
go backgroundExec(store, "alter table t1 drop column c", done)
go backgroundExec(store, "test", "alter table t1 drop column c", done)
err = <-done
require.NoError(t, err)
require.Nil(t, checkErr)
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestAddColumn2(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t1 add column c int not null", done)
go backgroundExec(store, "test", "alter table t1 add column c int not null", done)
err := <-done
require.NoError(t, err)

Expand Down Expand Up @@ -940,7 +940,7 @@ func TestAddColumn2(t *testing.T) {
}
dom.DDL().SetHook(hook)

go backgroundExec(store, "alter table t2 add column b int not null default 3", done)
go backgroundExec(store, "test", "alter table t2 add column b int not null default 3", done)
err = <-done
require.NoError(t, err)
re.Check(testkit.Rows("1 2"))
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestIssue22307(t *testing.T) {
dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(store, "alter table t drop column b;", done)
go backgroundExec(store, "test", "alter table t drop column b;", done)
err := <-done
require.NoError(t, err)
require.EqualError(t, checkErr1, "[planner:1054]Unknown column 'b' in 'where clause'")
Expand Down
11 changes: 10 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,16 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
return true, nil
}

pid, err := findNextPartitionID(reorg.PhysicalTableID, pi.Definitions)
// During data copying, copy data from partitions to be dropped
nextPartitionDefs := pi.DroppingDefinitions
if bytes.Equal(reorg.currElement.TypeKey, meta.IndexElementKey) {
// During index re-creation, process data from partitions to be added
nextPartitionDefs = pi.AddingDefinitions
}
if nextPartitionDefs == nil {
nextPartitionDefs = pi.Definitions
}
pid, err := findNextPartitionID(reorg.PhysicalTableID, nextPartitionDefs)
if err != nil {
// Fatal error, should not run here.
logutil.BgLogger().Error("[ddl] find next partition ID failed", zap.Reflect("table", t), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func testAddIndexRollback(t *testing.T, idxName, addIdxSQL, errMsg string, hasNu
}

done := make(chan error, 1)
go backgroundExec(store, addIdxSQL, done)
go backgroundExec(store, "test", addIdxSQL, done)

times := 0
ticker := time.NewTicker(indexModifyLease / 2)
Expand Down
Loading

0 comments on commit 25df00e

Please sign in to comment.