Skip to content

Commit

Permalink
ddl: fix unexpect fail when create expression index
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Dec 12, 2022
1 parent 3884b28 commit d18cf71
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
12 changes: 11 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -805,12 +806,21 @@ func (b *backfillScheduler) Close() {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sess *session, sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
m, err := sess.Txn(true)
if err != nil {
return err
}
dbInfo, err := meta.NewMeta(m).GetDatabase(job.SchemaID)
if err != nil {
return err
}
sessCtx.GetSessionVars().CurrentDB = dbInfo.Name.O
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
err := w.writePhysicalTableRecord(w.sessPool, p, typeUpdateColumnWorker, reorgInfo)
err := w.writePhysicalTableRecord(w.sess, w.sessPool, p, typeUpdateColumnWorker, reorgInfo)
if err != nil {
return err
}
Expand All @@ -1059,7 +1059,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err
return nil
}
if tbl, ok := t.(table.PhysicalTable); ok {
return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sess, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
}
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
Expand Down
6 changes: 3 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1592,10 +1592,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo)
}
logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexWorker, reorgInfo)
}

// addTableIndex handles the add index reorganization state for a table.
Expand Down Expand Up @@ -1804,7 +1804,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
// cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition.
func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeCleanUpIndexWorker, reorgInfo)
}

// cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions.
Expand Down

0 comments on commit d18cf71

Please sign in to comment.