From 7e32f5b7ca66d7279645e059011ecca3b0206276 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 29 Nov 2022 22:16:36 +0800 Subject: [PATCH 1/8] ddl: clone datum for copr-read and check context done --- ddl/index_cop.go | 53 ++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 6eecd9c0d685e..fcb4f62c0ecd5 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -96,8 +96,6 @@ type copReqSenderPool struct { wg sync.WaitGroup idxBufPool sync.Pool // []*indexRecord - srcChkPool sync.Pool // *chunk.Chunk - binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk] } type copReqSender struct { @@ -110,6 +108,7 @@ type copReqSender struct { func (c *copReqSender) run() { p := c.senderPool defer p.wg.Done() + var srcChk *chunk.Chunk for { if util.HasCancelled(c.ctx) { return @@ -122,24 +121,36 @@ func (c *copReqSender) run() { zap.Int("id", task.id), zap.String("task", task.String())) rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) if err != nil { - p.resultsCh <- idxRecResult{id: task.id, err: err} + p.sendResult(c.ctx, idxRecResult{id: task.id, err: err}) return } var done bool var total int for !done { - idxRec, srcChk := p.getIndexRecordsAndChunks() + idxRec := p.idxBufPool.Get().([]*indexRecord) + srcChk = renewChunk(srcChk, p.copCtx.fieldTps) idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { - p.resultsCh <- idxRecResult{id: task.id, err: err} + p.sendResult(c.ctx, idxRecResult{id: task.id, err: err}) return } total += len(idxRec) - p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total} + p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, done: done, total: total}) } } } +// renewChunk creates a new chunk when the reorg batch size is changed. +func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk { + newSize := variable.GetDDLReorgBatchSize() + newCap := int(newSize) * copReadBatchFactor + if oldChk == nil || oldChk.Capacity() != newCap { + return chunk.NewChunkWithCapacity(fts, newCap) + } + oldChk.Reset() + return oldChk +} + func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { return &copReqSenderPool{ tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), @@ -155,12 +166,6 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64 return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) }, }, - srcChkPool: sync.Pool{ - New: func() any { - return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) - }, - }, - binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4), } } @@ -168,6 +173,13 @@ func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) { c.tasksCh <- task } +func (c *copReqSenderPool) sendResult(ctx context.Context, result idxRecResult) { + select { + case <-ctx.Done(): + case c.resultsCh <- result: + } +} + func (c *copReqSenderPool) adjustSize(n int) { // Add some senders. for i := len(c.senders); i < n; i++ { @@ -199,26 +211,12 @@ func (c *copReqSenderPool) close() { close(c.resultsCh) } -func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { - ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk) - newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor - if chk.Capacity() != newCap { - chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap) - } - chk.Reset() - c.binding.Store(&ir, chk) - return ir[:0], chk -} - // recycleIdxRecords puts the index record slice back to the pool for reuse. func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) { if idxRecs == nil { return } c.idxBufPool.Put(idxRecs[:0]) - if bindingChunk, ok := c.binding.Load(&idxRecs); ok { - c.srcChkPool.Put(bindingChunk) - } } // copContext contains the information that is needed when building a coprocessor request. @@ -449,7 +447,8 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C datumBuf := make([]types.Datum, 0, len(offsets)) for _, offset := range offsets { c := expCols[offset] - datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType())) + rowDt := row.GetDatum(offset, c.GetType()) + datumBuf = append(datumBuf, *rowDt.Clone()) } return datumBuf } From f48381181111cf1c77477ac5c2ae1f55bec6c4aa Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 30 Nov 2022 14:35:24 +0800 Subject: [PATCH 2/8] use sized channel instead of sync.pool to control the memory --- ddl/export_test.go | 2 +- ddl/index.go | 9 +++-- ddl/index_cop.go | 88 +++++++++++++++++++++++++--------------------- 3 files changed, 53 insertions(+), 46 deletions(-) diff --git a/ddl/export_test.go b/ddl/export_test.go index 641d7ce72fc8c..486390f9a6810 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -39,7 +39,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS pool := newCopReqSenderPool(context.Background(), copCtx, startTS) pool.adjustSize(1) pool.tasksCh <- task - idxRec, _, done, err := pool.fetchRowColValsFromCop(*task) + idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task) pool.close() return idxRec, done, err } diff --git a/ddl/index.go b/ddl/index.go index 818dc1eac7738..163e9c407a808 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" @@ -1498,11 +1499,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC var ( idxRecords []*indexRecord + copChunk *chunk.Chunk // only used by the coprocessor request sender. nextKey kv.Key taskDone bool ) if w.copReqSenderPool != nil { - idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange) + idxRecords, copChunk, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange) + defer w.copReqSenderPool.recycleIdxRecordsAndChunk(idxRecords, copChunk) } else { idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) } @@ -1567,10 +1570,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC taskCtx.addedCount++ } - if w.copReqSenderPool != nil { - w.copReqSenderPool.recycleIdxRecords(idxRecords) - } - return nil }) logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index fcb4f62c0ecd5..07034f48f349a 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -46,7 +46,10 @@ import ( // It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. const copReadBatchFactor = 10 -func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { +// copReadConcurrencyFactor is the factor of concurrency of coprocessor read. +const copReadConcurrencyFactor = 10 + +func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { @@ -55,10 +58,10 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) if !ok { logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed", zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) - return nil, handleRange.endKey, true, nil + return nil, nil, handleRange.endKey, true, nil } if rs.err != nil { - return nil, handleRange.startKey, false, rs.err + return nil, nil, handleRange.startKey, false, rs.err } if rs.done { logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", @@ -69,15 +72,15 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) logutil.BgLogger().Info("[ddl-ingest] task is found in results", zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) c.results.Delete(handleRange.id) - return rs.records, handleRange.endKey, true, nil + return rs.records, rs.chunk, handleRange.endKey, true, nil } - return rs.records, handleRange.startKey, false, nil + return rs.records, rs.chunk, handleRange.startKey, false, nil case <-ticker.C: logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty", zap.Int("id", handleRange.id)) if _, found := c.results.Load(handleRange.id); found { c.results.Delete(handleRange.id) - return nil, handleRange.endKey, true, nil + return nil, nil, handleRange.endKey, true, nil } } } @@ -95,7 +98,8 @@ type copReqSenderPool struct { senders []*copReqSender wg sync.WaitGroup - idxBufPool sync.Pool // []*indexRecord + idxBufPool chan []*indexRecord + srcChkPool chan *chunk.Chunk } type copReqSender struct { @@ -108,7 +112,6 @@ type copReqSender struct { func (c *copReqSender) run() { p := c.senderPool defer p.wg.Done() - var srcChk *chunk.Chunk for { if util.HasCancelled(c.ctx) { return @@ -127,45 +130,37 @@ func (c *copReqSender) run() { var done bool var total int for !done { - idxRec := p.idxBufPool.Get().([]*indexRecord) - srcChk = renewChunk(srcChk, p.copCtx.fieldTps) + idxRec, srcChk := p.getIndexRecordsAndChunks() idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { p.sendResult(c.ctx, idxRecResult{id: task.id, err: err}) return } total += len(idxRec) - p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, done: done, total: total}) + p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}) } } } -// renewChunk creates a new chunk when the reorg batch size is changed. -func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk { - newSize := variable.GetDDLReorgBatchSize() - newCap := int(newSize) * copReadBatchFactor - if oldChk == nil || oldChk.Capacity() != newCap { - return chunk.NewChunkWithCapacity(fts, newCap) - } - oldChk.Reset() - return oldChk -} - func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { + poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor) + idxBufPool := make(chan []*indexRecord, poolSize) + srcChkPool := make(chan *chunk.Chunk, poolSize) + for i := 0; i < poolSize; i++ { + idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize()) + srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize())) + } return &copReqSenderPool{ - tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), - resultsCh: make(chan idxRecResult, backfillTaskChanSize), - results: generic.NewSyncMap[int, struct{}](10), - ctx: ctx, - copCtx: copCtx, - startTS: startTS, - senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), - wg: sync.WaitGroup{}, - idxBufPool: sync.Pool{ - New: func() any { - return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) - }, - }, + tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), + resultsCh: make(chan idxRecResult, backfillTaskChanSize), + results: generic.NewSyncMap[int, struct{}](10), + ctx: ctx, + copCtx: copCtx, + startTS: startTS, + senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), + wg: sync.WaitGroup{}, + idxBufPool: idxBufPool, + srcChkPool: srcChkPool, } } @@ -211,12 +206,24 @@ func (c *copReqSenderPool) close() { close(c.resultsCh) } -// recycleIdxRecords puts the index record slice back to the pool for reuse. -func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) { - if idxRecs == nil { +func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { + ir := <-c.idxBufPool + chk := <-c.srcChkPool + newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor + if chk.Capacity() != newCap { + chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap) + } + chk.Reset() + return ir[:0], chk +} + +// recycleIdxRecordsAndChunk puts the index record slice and the chunk back to the pool for reuse. +func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk *chunk.Chunk) { + if idxRecs == nil || chk == nil { return } - c.idxBufPool.Put(idxRecs[:0]) + c.idxBufPool <- idxRecs + c.srcChkPool <- chk } // copContext contains the information that is needed when building a coprocessor request. @@ -448,7 +455,7 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C for _, offset := range offsets { c := expCols[offset] rowDt := row.GetDatum(offset, c.GetType()) - datumBuf = append(datumBuf, *rowDt.Clone()) + datumBuf = append(datumBuf, rowDt) } return datumBuf } @@ -469,6 +476,7 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, type idxRecResult struct { id int records []*indexRecord + chunk *chunk.Chunk err error done bool total int From 360f4adb5e3fd3bde5f44ac7c90e46b81c3183fe Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 30 Nov 2022 17:32:12 +0800 Subject: [PATCH 3/8] close the pools --- ddl/index_cop.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 07034f48f349a..6727bdc5150eb 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -204,6 +204,8 @@ func (c *copReqSenderPool) close() { } c.wg.Wait() close(c.resultsCh) + close(c.idxBufPool) + close(c.srcChkPool) } func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { From 0b9f1041e3c55a7244face9072714f2d63f1cf1b Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 30 Nov 2022 21:04:38 +0800 Subject: [PATCH 4/8] add cancel to copReqSender pool and close result properly --- ddl/index_cop.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 6727bdc5150eb..8fcf62ec6d271 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -92,6 +92,7 @@ type copReqSenderPool struct { results generic.SyncMap[int, struct{}] ctx context.Context + cancel context.CancelFunc copCtx *copContext startTS uint64 @@ -124,7 +125,7 @@ func (c *copReqSender) run() { zap.Int("id", task.id), zap.String("task", task.String())) rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) if err != nil { - p.sendResult(c.ctx, idxRecResult{id: task.id, err: err}) + p.sendResult(idxRecResult{id: task.id, err: err}) return } var done bool @@ -133,11 +134,11 @@ func (c *copReqSender) run() { idxRec, srcChk := p.getIndexRecordsAndChunks() idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { - p.sendResult(c.ctx, idxRecResult{id: task.id, err: err}) + p.sendResult(idxRecResult{id: task.id, err: err}) return } total += len(idxRec) - p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}) + p.sendResult(idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}) } } } @@ -150,11 +151,13 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64 idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize()) srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize())) } + poolCtx, cancel := context.WithCancel(ctx) return &copReqSenderPool{ tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultsCh: make(chan idxRecResult, backfillTaskChanSize), results: generic.NewSyncMap[int, struct{}](10), - ctx: ctx, + ctx: poolCtx, + cancel: cancel, copCtx: copCtx, startTS: startTS, senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), @@ -168,9 +171,9 @@ func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) { c.tasksCh <- task } -func (c *copReqSenderPool) sendResult(ctx context.Context, result idxRecResult) { +func (c *copReqSenderPool) sendResult(result idxRecResult) { select { - case <-ctx.Done(): + case <-c.ctx.Done(): case c.resultsCh <- result: } } @@ -202,6 +205,7 @@ func (c *copReqSenderPool) close() { for _, w := range c.senders { w.cancel() } + c.cancel() c.wg.Wait() close(c.resultsCh) close(c.idxBufPool) @@ -408,7 +412,7 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se return nil, false, errors.Trace(err) } if chk.NumRows() == 0 { - return buf, true, nil + return buf, true, result.Close() } iter := chunk.NewIterator4Chunk(chk) err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk) From 346a34d3637b8637498558a46446016ed0e120fb Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 30 Nov 2022 21:12:36 +0800 Subject: [PATCH 5/8] recycleIdxRecordsAndChunk if an error occurred --- ddl/index_cop.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 8fcf62ec6d271..9ed035666c0c2 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -135,6 +135,7 @@ func (c *copReqSender) run() { idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { p.sendResult(idxRecResult{id: task.id, err: err}) + p.recycleIdxRecordsAndChunk(idxRec, srcChk) return } total += len(idxRec) From 47ad59a76599aeb4df65957fe8b9f31192f3a58e Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 1 Dec 2022 00:43:08 +0800 Subject: [PATCH 6/8] drain the result when closeing the copReqSenderPool --- ddl/index_cop.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 9ed035666c0c2..a533b89b59c95 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -92,7 +92,6 @@ type copReqSenderPool struct { results generic.SyncMap[int, struct{}] ctx context.Context - cancel context.CancelFunc copCtx *copContext startTS uint64 @@ -125,7 +124,7 @@ func (c *copReqSender) run() { zap.Int("id", task.id), zap.String("task", task.String())) rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) if err != nil { - p.sendResult(idxRecResult{id: task.id, err: err}) + p.resultsCh <- idxRecResult{id: task.id, err: err} return } var done bool @@ -134,12 +133,12 @@ func (c *copReqSender) run() { idxRec, srcChk := p.getIndexRecordsAndChunks() idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) if err != nil { - p.sendResult(idxRecResult{id: task.id, err: err}) + p.resultsCh <- idxRecResult{id: task.id, err: err} p.recycleIdxRecordsAndChunk(idxRec, srcChk) return } total += len(idxRec) - p.sendResult(idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total}) + p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total} } } } @@ -152,13 +151,11 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64 idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize()) srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize())) } - poolCtx, cancel := context.WithCancel(ctx) return &copReqSenderPool{ tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), resultsCh: make(chan idxRecResult, backfillTaskChanSize), results: generic.NewSyncMap[int, struct{}](10), - ctx: poolCtx, - cancel: cancel, + ctx: ctx, copCtx: copCtx, startTS: startTS, senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), @@ -172,13 +169,6 @@ func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) { c.tasksCh <- task } -func (c *copReqSenderPool) sendResult(result idxRecResult) { - select { - case <-c.ctx.Done(): - case c.resultsCh <- result: - } -} - func (c *copReqSenderPool) adjustSize(n int) { // Add some senders. for i := len(c.senders); i < n; i++ { @@ -206,13 +196,23 @@ func (c *copReqSenderPool) close() { for _, w := range c.senders { w.cancel() } - c.cancel() + cleanupWg := util.WaitGroupWrapper{} + cleanupWg.Run(c.drainResults) + // Wait for all cop-req senders to exit. c.wg.Wait() close(c.resultsCh) + cleanupWg.Wait() close(c.idxBufPool) close(c.srcChkPool) } +func (c *copReqSenderPool) drainResults() { + // Consume the rest results because the writers are inactive anymore. + for rs := range c.resultsCh { + c.recycleIdxRecordsAndChunk(rs.records, rs.chunk) + } +} + func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) { ir := <-c.idxBufPool chk := <-c.srcChkPool From 66e2bd44eeaecc4d972234ce8b0130c59c25552c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 1 Dec 2022 10:40:08 +0800 Subject: [PATCH 7/8] close result set --- ddl/index_cop.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index a533b89b59c95..b949d568a7fce 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -135,11 +135,13 @@ func (c *copReqSender) run() { if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} p.recycleIdxRecordsAndChunk(idxRec, srcChk) + _ = rs.Close() return } total += len(idxRec) p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total} } + _ = rs.Close() } } @@ -413,7 +415,7 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se return nil, false, errors.Trace(err) } if chk.NumRows() == 0 { - return buf, true, result.Close() + return buf, true, nil } iter := chunk.NewIterator4Chunk(chk) err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk) From fd2af9b9e2fccd2efce22327c6bd2948fadfe284 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 1 Dec 2022 10:44:28 +0800 Subject: [PATCH 8/8] use terror.Call to handle error --- ddl/index_cop.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index b949d568a7fce..950cd91404bc6 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -135,13 +136,14 @@ func (c *copReqSender) run() { if err != nil { p.resultsCh <- idxRecResult{id: task.id, err: err} p.recycleIdxRecordsAndChunk(idxRec, srcChk) + terror.Call(rs.Close) _ = rs.Close() return } total += len(idxRec) p.resultsCh <- idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total} } - _ = rs.Close() + terror.Call(rs.Close) } }