Skip to content

Commit

Permalink
use sized channel instead of sync.pool to control the memory
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Nov 30, 2022
1 parent 7e32f5b commit f483811
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 46 deletions.
2 changes: 1 addition & 1 deletion ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, done, err := pool.fetchRowColValsFromCop(*task)
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
pool.close()
return idxRec, done, err
}
Expand Down
9 changes: 4 additions & 5 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
Expand Down Expand Up @@ -1498,11 +1499,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC

var (
idxRecords []*indexRecord
copChunk *chunk.Chunk // only used by the coprocessor request sender.
nextKey kv.Key
taskDone bool
)
if w.copReqSenderPool != nil {
idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
idxRecords, copChunk, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange)
defer w.copReqSenderPool.recycleIdxRecordsAndChunk(idxRecords, copChunk)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
Expand Down Expand Up @@ -1567,10 +1570,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
taskCtx.addedCount++
}

if w.copReqSenderPool != nil {
w.copReqSenderPool.recycleIdxRecords(idxRecords)
}

return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)
Expand Down
88 changes: 48 additions & 40 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ import (
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) {
// copReadConcurrencyFactor is the factor of concurrency of coprocessor read.
const copReadConcurrencyFactor = 10

func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, *chunk.Chunk, kv.Key, bool, error) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
Expand All @@ -55,10 +58,10 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
if !ok {
logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
if rs.err != nil {
return nil, handleRange.startKey, false, rs.err
return nil, nil, handleRange.startKey, false, rs.err
}
if rs.done {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task",
Expand All @@ -69,15 +72,15 @@ func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask)
logutil.BgLogger().Info("[ddl-ingest] task is found in results",
zap.Int("id", handleRange.id), zap.String("task", handleRange.String()))
c.results.Delete(handleRange.id)
return rs.records, handleRange.endKey, true, nil
return rs.records, rs.chunk, handleRange.endKey, true, nil
}
return rs.records, handleRange.startKey, false, nil
return rs.records, rs.chunk, handleRange.startKey, false, nil
case <-ticker.C:
logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty",
zap.Int("id", handleRange.id))
if _, found := c.results.Load(handleRange.id); found {
c.results.Delete(handleRange.id)
return nil, handleRange.endKey, true, nil
return nil, nil, handleRange.endKey, true, nil
}
}
}
Expand All @@ -95,7 +98,8 @@ type copReqSenderPool struct {
senders []*copReqSender
wg sync.WaitGroup

idxBufPool sync.Pool // []*indexRecord
idxBufPool chan []*indexRecord
srcChkPool chan *chunk.Chunk
}

type copReqSender struct {
Expand All @@ -108,7 +112,6 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var srcChk *chunk.Chunk
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -127,45 +130,37 @@ func (c *copReqSender) run() {
var done bool
var total int
for !done {
idxRec := p.idxBufPool.Get().([]*indexRecord)
srcChk = renewChunk(srcChk, p.copCtx.fieldTps)
idxRec, srcChk := p.getIndexRecordsAndChunks()
idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec)
if err != nil {
p.sendResult(c.ctx, idxRecResult{id: task.id, err: err})
return
}
total += len(idxRec)
p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, done: done, total: total})
p.sendResult(c.ctx, idxRecResult{id: task.id, records: idxRec, chunk: srcChk, done: done, total: total})
}
}
}

// renewChunk creates a new chunk when the reorg batch size is changed.
func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk {
newSize := variable.GetDDLReorgBatchSize()
newCap := int(newSize) * copReadBatchFactor
if oldChk == nil || oldChk.Capacity() != newCap {
return chunk.NewChunkWithCapacity(fts, newCap)
}
oldChk.Reset()
return oldChk
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
poolSize := int(variable.GetDDLReorgWorkerCounter() * copReadConcurrencyFactor)
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
idxBufPool <- make([]*indexRecord, 0, copReadBatchFactor*variable.GetDDLReorgBatchSize())
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.fieldTps, int(copReadBatchFactor*variable.GetDDLReorgBatchSize()))
}
return &copReqSenderPool{
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: sync.Pool{
New: func() any {
return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultsCh: make(chan idxRecResult, backfillTaskChanSize),
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
srcChkPool: srcChkPool,
}
}

Expand Down Expand Up @@ -211,12 +206,24 @@ func (c *copReqSenderPool) close() {
close(c.resultsCh)
}

// recycleIdxRecords puts the index record slice back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
if idxRecs == nil {
func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Chunk) {
ir := <-c.idxBufPool
chk := <-c.srcChkPool
newCap := int(variable.GetDDLReorgBatchSize()) * copReadBatchFactor
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
return ir[:0], chk
}

// recycleIdxRecordsAndChunk puts the index record slice and the chunk back to the pool for reuse.
func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk *chunk.Chunk) {
if idxRecs == nil || chk == nil {
return
}
c.idxBufPool.Put(idxRecs[:0])
c.idxBufPool <- idxRecs
c.srcChkPool <- chk
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down Expand Up @@ -448,7 +455,7 @@ func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.C
for _, offset := range offsets {
c := expCols[offset]
rowDt := row.GetDatum(offset, c.GetType())
datumBuf = append(datumBuf, *rowDt.Clone())
datumBuf = append(datumBuf, rowDt)
}
return datumBuf
}
Expand All @@ -469,6 +476,7 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
type idxRecResult struct {
id int
records []*indexRecord
chunk *chunk.Chunk
err error
done bool
total int
Expand Down

0 comments on commit f483811

Please sign in to comment.