Skip to content

Commit

Permalink
determine the new backfill process by index.BackfillState (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Aug 18, 2022
1 parent 484a7a2 commit d47c680
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 39 deletions.
9 changes: 1 addition & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ type backfillWorker struct {
table table.Table
closed bool
priority int
// Mark if it use new backfill flow.
isNewBF bool
}

func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, reorgInfo *reorgInfo) *backfillWorker {
Expand Down Expand Up @@ -698,14 +696,9 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
}
} else {
var newBackfillFlow bool = false
if bc, ok := lightning.BackCtxMgr.Load(job.ID); ok && !bc.NeedRestore() {
newBackfillFlow = true
}
idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc, newBackfillFlow)
idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
idxWorker.isNewBF = newBackfillFlow
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
}
case typeUpdateColumnWorker:
Expand Down
14 changes: 8 additions & 6 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ type addIndexWorker struct {
distinctCheckFlags []bool
}

func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext, newBF bool) *addIndexWorker {
func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *addIndexWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
Expand All @@ -1174,7 +1174,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
break
}
}
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo, newBF)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &addIndexWorker{
baseIndexWorker: baseIndexWorker{
Expand Down Expand Up @@ -1467,18 +1467,20 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
continue
}

// When backfill go new backfill path, but use original worker then no need to lock index key.
if !w.isNewBF {
// When the backfill-merge process is used, the writes from DML are redirected to a temp index.
// Thus, the lock is unnecessary.
if w.index.Meta().BackfillState == model.BackfillStateInapplicable {
// We need to add this lock to make sure pessimistic transaction can realize this operation.
// For the normal pessimistic transaction, it's ok. But if async commmit is used, it may lead to inconsistent data and index.
// For the normal pessimistic transaction, it's ok. But if async commit is used, it may lead to inconsistent data and index.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.key)
if err != nil {
return errors.Trace(err)
}
}

// Create the index.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion)
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData,
table.WithIgnoreAssertion, table.FromBackfill)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
Expand Down
9 changes: 9 additions & 0 deletions table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type CreateIdxOpt struct {
Ctx context.Context
Untouched bool // If true, the index key/value is no need to commit.
IgnoreAssertion bool
FromBackFill bool
}

// CreateIdxOptFunc is defined for the Create() method of Index interface.
Expand All @@ -52,6 +53,14 @@ var WithIgnoreAssertion = func(opt *CreateIdxOpt) {
opt.IgnoreAssertion = true
}

// FromBackfill indicates that the index is created by DDL backfill worker.
// In the lightning backfill process, the index KVs from DML will be redirected to
// the temp index. On the other hand, the index KVs from DDL backfill worker should
// never be redirected to the temp index.
var FromBackfill = func(opt *CreateIdxOpt) {
opt.FromBackFill = true
}

// WithCtx returns a CreateIdxFunc.
// This option is used to pass context.Context.
func WithCtx(ctx context.Context) CreateIdxOptFunc {
Expand Down
37 changes: 12 additions & 25 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package tables

import (
"bytes"
"context"
"sync"

Expand All @@ -42,8 +41,6 @@ type index struct {
// the collation global variable is initialized *after* `NewIndex()`.
initNeedRestoreData sync.Once
needRestoredData bool
// Mark index is in backfill processing.
Isbackfill bool
}

// NeedRestoredData checks whether the index columns needs restored data.
Expand All @@ -58,7 +55,7 @@ func NeedRestoredData(idxCols []*model.IndexColumn, colInfos []*model.ColumnInfo
}

// NewIndex builds a new Index object.
func NewIndex(physicalID int64, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, newBF ...bool) table.Index {
func NewIndex(physicalID int64, tblInfo *model.TableInfo, indexInfo *model.IndexInfo) table.Index {
// The prefix can't encode from tblInfo.ID, because table partition may change the id to partition id.
var prefix kv.Key
if indexInfo.Global {
Expand All @@ -68,16 +65,11 @@ func NewIndex(physicalID int64, tblInfo *model.TableInfo, indexInfo *model.Index
// Otherwise, start with physicalID.
prefix = tablecodec.EncodeTableIndexPrefix(physicalID, indexInfo.ID)
}
var newBackfillFlow bool = false
if len(newBF) > 0 {
newBackfillFlow = newBF[0]
}
index := &index{
idxInfo: indexInfo,
tblInfo: tblInfo,
prefix: prefix,
phyTblID: physicalID,
Isbackfill: newBackfillFlow,
idxInfo: indexInfo,
tblInfo: tblInfo,
prefix: prefix,
phyTblID: physicalID,
}
return index
}
Expand Down Expand Up @@ -118,14 +110,13 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue

var (
tempKey []byte
// The keyVer means the temp index key/value version, it has below three values.
// Use 'i' to be initialize version.
// The keyVer means the temp index key/value version, it has below values.
// Use 'b' to be backfill version.
// Use 'm' to be merge version.
keyVer []byte = []byte("i")
keyVer []byte
)
// Isbackfill set to true, means this is a backfill worker should not write to temp index.
if c.idxInfo.State != model.StatePublic && !c.Isbackfill {
// Only the KVs from DML should be redirected to the temp index.
if c.idxInfo.State != model.StatePublic && !opt.FromBackFill {
switch c.idxInfo.BackfillState {
case model.BackfillStateInapplicable:
// Do nothing.
Expand Down Expand Up @@ -183,9 +174,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic

if !distinct || skipCheck || opt.Untouched {
if !bytes.Equal(keyVer, []byte("i")) {
idxVal = append(idxVal, keyVer...)
}
idxVal = append(idxVal, keyVer...)
err = txn.GetMemBuffer().Set(key, idxVal)
if err != nil {
return nil, err
Expand Down Expand Up @@ -230,9 +219,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
if err != nil || len(value) == 0 {
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
if !bytes.Equal(keyVer, []byte("i")) {
idxVal = append(idxVal, keyVer...)
}
idxVal = append(idxVal, keyVer...)
if lazyCheck {
err = txn.GetMemBuffer().SetWithFlags(key, idxVal, kv.SetPresumeKeyNotExists)
} else {
Expand Down Expand Up @@ -308,7 +295,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
}
var (
tempKey []byte
keyVer []byte = []byte("i")
keyVer []byte
val []byte
)
if c.idxInfo.State != model.StatePublic {
Expand Down

0 comments on commit d47c680

Please sign in to comment.