Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: don't reuse the chunk until underlying memory is not referenced #39382

Merged
merged 3 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 26 additions & 19 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ type copReqSenderPool struct {
senders []*copReqSender
wg sync.WaitGroup

idxBufPool sync.Pool
idxBufPool sync.Pool // []*indexRecord
srcChkPool sync.Pool // *chunk.Chunk
binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk]
}

type copReqSender struct {
Expand All @@ -108,7 +110,6 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
srcChk := renewChunk(nil, p.copCtx.fieldTps)
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -127,9 +128,7 @@ func (c *copReqSender) run() {
var done bool
var total int
for !done {
idxRec := p.idxBufPool.Get().([]*indexRecord)
idxRec = idxRec[:0]
srcChk = renewChunk(srcChk, p.copCtx.fieldTps)
idxRec, srcChk := p.getIndexRecordsAndChunks()
idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
Expand All @@ -141,17 +140,6 @@ func (c *copReqSender) run() {
}
}

// renewChunk creates a new chunk when the reorg batch size is changed.
func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk {
newSize := variable.GetDDLReorgBatchSize()
newCap := int(newSize) * copReadBatchFactor
if oldChk == nil || oldChk.Capacity() != newCap {
return chunk.NewChunkWithCapacity(fts, newCap)
}
oldChk.Reset()
return oldChk
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
Expand All @@ -167,6 +155,12 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
srcChkPool: sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4),
}
}

Expand Down Expand Up @@ -205,12 +199,26 @@ func (c *copReqSenderPool) close() {
close(c.resultsCh)
}

func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk)
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
c.binding.Store(&ir, chk)
return ir[:0], chk
}

// recycleIdxRecords puts the index record slice back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
if len(idxRecs) == 0 {
if idxRecs == nil {
return
}
c.idxBufPool.Put(idxRecs[:0])
if bindingChunk, ok := c.binding.Load(&idxRecs); ok {
c.srcChkPool.Put(bindingChunk)
}
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down Expand Up @@ -410,8 +418,7 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
}
done := chk.NumRows() < chk.Capacity()
return buf, done, nil
return buf, false, nil
}

func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
require.NoError(t, err)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10)
require.NoError(t, err)
require.True(t, done)
require.False(t, done)
require.NoError(t, txn.Rollback())

handles := make([]kv.Handle, 0, len(idxRec))
Expand Down