diff --git a/ddl/backfilling.go b/ddl/backfilling.go index d1035bad084bd..0f0910e1caf28 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return } - ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope) - if err != nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) - return - } - b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver) + b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore()) } func (b *backfillScheduler) canSkipError(err error) bool { diff --git a/ddl/export_test.go b/ddl/export_test.go index 486390f9a6810..3ea26fb04290c 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -28,7 +28,7 @@ func SetBatchInsertDeleteRangeSize(i int) { var NewCopContext4Test = newCopContext -func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64, +func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage, batchSize int) ([]*indexRecord, bool, error) { variable.SetDDLReorgBatchSize(int32(batchSize)) task := &reorgBackfillTask{ @@ -36,7 +36,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS startKey: startKey, endKey: endKey, } - pool := newCopReqSenderPool(context.Background(), copCtx, startTS) + pool := newCopReqSenderPool(context.Background(), copCtx, store) pool.adjustSize(1) pool.tasksCh <- task idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 0a04ac63eb190..fab097727139b 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -103,9 +103,9 @@ type copReqSenderPool struct { resultsCh chan idxRecResult results generic.SyncMap[int, struct{}] - ctx context.Context - copCtx *copContext - startTS uint64 + ctx context.Context + copCtx *copContext + store kv.Storage senders []*copReqSender wg sync.WaitGroup @@ -139,7 +139,12 @@ func (c *copReqSender) run() { curTaskID = task.id logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", zap.Int("id", task.id), zap.String("task", task.String())) - rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) + ver, err := p.store.CurrentVersion(kv.GlobalTxnScope) + if err != nil { + p.resultsCh <- idxRecResult{id: task.id, err: err} + return + } + rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} return @@ -167,7 +172,7 @@ func (c *copReqSender) run() { } } -func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { +func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool { poolSize := copReadChunkPoolSize() idxBufPool := make(chan []*indexRecord, poolSize) srcChkPool := make(chan *chunk.Chunk, poolSize) @@ -181,7 +186,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64 results: generic.NewSyncMap[int, struct{}](10), ctx: ctx, copCtx: copCtx, - startTS: startTS, + store: store, senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), wg: sync.WaitGroup{}, idxBufPool: idxBufPool, diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 80e37f6a74121..38bced0b6678d 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -43,7 +43,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { endKey := startKey.PrefixNext() txn, err := store.Begin() require.NoError(t, err) - idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10) + idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10) require.NoError(t, err) require.False(t, done) require.NoError(t, txn.Rollback())