Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: reuse chunk for copr-read and check context done #39473

Merged
merged 23 commits into from
Dec 1, 2022
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7e32f5b
ddl: clone datum for copr-read and check context done
tangenta Nov 29, 2022
f483811
use sized channel instead of sync.pool to control the memory
tangenta Nov 30, 2022
8bda001
Merge branch 'master' into add-index-clone-dt
tangenta Nov 30, 2022
3dc7d19
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Nov 30, 2022
360f4ad
close the pools
tangenta Nov 30, 2022
e583a9c
Merge branch 'master' into add-index-clone-dt
hawkingrei Nov 30, 2022
0b9f104
add cancel to copReqSender pool and close result properly
tangenta Nov 30, 2022
2ee5db5
Merge branch 'master' into add-index-clone-dt
hawkingrei Nov 30, 2022
346a34d
recycleIdxRecordsAndChunk if an error occurred
tangenta Nov 30, 2022
47ad59a
drain the result when closeing the copReqSenderPool
tangenta Nov 30, 2022
cc2d0bf
Merge branch 'master' into add-index-clone-dt
tangenta Nov 30, 2022
66e2bd4
close result set
tangenta Dec 1, 2022
fd2af9b
use terror.Call to handle error
tangenta Dec 1, 2022
9a4d264
Merge branch 'master' into add-index-clone-dt
tangenta Dec 1, 2022
c6f6f4e
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
637c9d1
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
2a5620d
Merge branch 'master' into add-index-clone-dt
tangenta Dec 1, 2022
434d568
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
5b1a550
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
0cae781
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
11f4f6a
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
bafe0cc
Merge branch 'master' into add-index-clone-dt
ti-chi-bot Dec 1, 2022
bcf7f34
Merge branch 'master' into add-index-clone-dt
hawkingrei Dec 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ type copReqSenderPool struct {
wg sync.WaitGroup

idxBufPool sync.Pool // []*indexRecord
srcChkPool sync.Pool // *chunk.Chunk
binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk]
}

type copReqSender struct {
Expand All @@ -110,6 +108,7 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var srcChk *chunk.Chunk
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -122,24 +121,36 @@ func (c *copReqSender) run() {
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.sendResult(c.ctx, idxRecResult{id: task.id, err: err})
return
}
var done bool
var total int
for !done {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
idxRec, srcChk := p.getIndexRecordsAndChunks()
idxRec := p.idxBufPool.Get().([]*indexRecord)
srcChk = renewChunk(srcChk, p.copCtx.fieldTps)
idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
p.sendResult(c.ctx, idxRecResult{id: task.id, err: err})
return
}
total += len(idxRec)
p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total}
p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, done: done, total: total})
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// renewChunk creates a new chunk when the reorg batch size is changed.
func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk {
newSize := variable.GetDDLReorgBatchSize()
newCap := int(newSize) * copReadBatchFactor
if oldChk == nil || oldChk.Capacity() != newCap {
return chunk.NewChunkWithCapacity(fts, newCap)
}
oldChk.Reset()
return oldChk
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
Expand All @@ -155,19 +166,20 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
srcChkPool: sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4),
}
}

func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) {
c.tasksCh <- task
}

func (c *copReqSenderPool) sendResult(ctx context.Context, result idxRecResult) {
select {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
case c.resultsCh <- result:
}
}

func (c *copReqSenderPool) adjustSize(n int) {
// Add some senders.
for i := len(c.senders); i < n; i++ {
Expand Down Expand Up @@ -199,26 +211,12 @@ func (c *copReqSenderPool) close() {
close(c.resultsCh)
}

func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir, chk := c.idxBufPool.Get().([]*indexRecord), c.srcChkPool.Get().(*chunk.Chunk)
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
c.binding.Store(&ir, chk)
return ir[:0], chk
}

// recycleIdxRecords puts the index record slice back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
if idxRecs == nil {
return
}
c.idxBufPool.Put(idxRecs[:0])
if bindingChunk, ok := c.binding.Load(&idxRecs); ok {
c.srcChkPool.Put(bindingChunk)
}
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down Expand Up @@ -449,7 +447,8 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
datumBuf := make([]types.Datum, 0, len(offsets))
for _, offset := range offsets {
c := expCols[offset]
datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType()))
rowDt := row.GetDatum(offset, c.GetType())
datumBuf = append(datumBuf, *rowDt.Clone())
}
return datumBuf
}
Expand Down