Skip to content

Commit

Permalink
ddl: use latest ts to read record for adding index
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Dec 21, 2022
1 parent a2fa187 commit 38c5d4e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
7 changes: 1 addition & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore())
}

func (b *backfillScheduler) canSkipError(err error) bool {
Expand Down
4 changes: 2 additions & 2 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func SetBatchInsertDeleteRangeSize(i int) {

var NewCopContext4Test = newCopContext

func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage,
batchSize int) ([]*indexRecord, bool, error) {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
}
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool := newCopReqSenderPool(context.Background(), copCtx, store)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
Expand Down
17 changes: 11 additions & 6 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type copReqSenderPool struct {
resultsCh chan idxRecResult
results generic.SyncMap[int, struct{}]

ctx context.Context
copCtx *copContext
startTS uint64
ctx context.Context
copCtx *copContext
store kv.Storage

senders []*copReqSender
wg sync.WaitGroup
Expand Down Expand Up @@ -139,7 +139,12 @@ func (c *copReqSender) run() {
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
ver, err := p.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
Expand Down Expand Up @@ -167,7 +172,7 @@ func (c *copReqSender) run() {
}
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool {
poolSize := copReadChunkPoolSize()
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
Expand All @@ -181,7 +186,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
store: store,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(t, err)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10)
require.NoError(t, err)
require.False(t, done)
require.NoError(t, txn.Rollback())
Expand Down

0 comments on commit 38c5d4e

Please sign in to comment.