From ea26284ea3dd02f026430e4c6e8ac145d1360ad8 Mon Sep 17 00:00:00 2001 From: Aidan <97376271+keeplearning20221@users.noreply.github.com> Date: Tue, 8 Nov 2022 20:13:50 +0800 Subject: [PATCH] *: avoid special cases DATA RACE (#38918) close pingcap/tidb#38914 --- distsql/select_result.go | 2 +- executor/builder.go | 4 +- executor/distsql.go | 2 +- executor/executor.go | 4 +- executor/index_lookup_hash_join.go | 2 +- executor/index_lookup_join.go | 6 +-- executor/index_lookup_merge_join.go | 2 +- executor/index_merge_reader.go | 4 +- executor/join.go | 2 +- executor/joiner.go | 2 +- executor/pipelined_window.go | 2 +- executor/window.go | 2 +- server/conn.go | 2 +- server/conn_stmt.go | 2 +- sessionctx/variable/session.go | 60 +++++++++++++---------------- sessionctx/variable/session_test.go | 31 ++++++--------- testkit/testkit.go | 7 ++-- util/chunk/alloc.go | 25 +++++++++++- util/chunk/alloc_test.go | 19 +++++++++ 19 files changed, 103 insertions(+), 77 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index a2d6215987a32..0e807b360d0ad 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -311,7 +311,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error { if r.respChunkDecoder == nil { r.respChunkDecoder = chunk.NewDecoder( - r.ctx.GetSessionVars().GetNewChunk(r.fieldTypes, 0), + chunk.NewChunkWithCapacity(r.fieldTypes, 0), r.fieldTypes, ) } diff --git a/executor/builder.go b/executor/builder.go index 1b88b6e4047c6..70d91e64137a4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1540,7 +1540,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor e.defaultVal = nil } else { if v.IsFinalAgg() { - e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1) + e.defaultVal = e.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(e), 1, 1, e.AllocPool) } } for _, aggDesc := range v.AggFuncs { @@ -1603,7 +1603,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu } else { // Only do this for final agg, see issue #35295, #30923 if v.IsFinalAgg() { - e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1) + e.defaultVal = e.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(e), 1, 1, e.AllocPool) } } for i, aggDesc := range v.AggFuncs { diff --git a/executor/distsql.go b/executor/distsql.go index 182831bc90021..0cef7e66d441e 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -866,7 +866,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes } }() retTps := w.idxLookup.getRetTpsByHandle() - chk := w.idxLookup.ctx.GetSessionVars().GetNewChunk(retTps, w.idxLookup.maxChunkSize) + chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool) idxID := w.idxLookup.getIndexPlanRootID() if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { if idxID != w.idxLookup.id && w.idxLookup.stats != nil { diff --git a/executor/executor.go b/executor/executor.go index 1fc3a88f3a4fd..3d10ff32a5168 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -132,6 +132,7 @@ type baseExecutor struct { children []Executor retFieldTypes []*types.FieldType runtimeStats *execdetails.BasicRuntimeStats + AllocPool chunk.Allocator } const ( @@ -234,7 +235,7 @@ func newFirstChunk(e Executor) *chunk.Chunk { func tryNewCacheChunk(e Executor) *chunk.Chunk { base := e.base() s := base.ctx.GetSessionVars() - return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize) + return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize, base.AllocPool) } // newList creates a new List to buffer current executor's result. @@ -267,6 +268,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, schema: schema, initCap: ctx.GetSessionVars().InitChunkSize, maxChunkSize: ctx.GetSessionVars().MaxChunkSize, + AllocPool: ctx.GetSessionVars().ChunkPool.Alloc, } if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { if e.id > 0 { diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 9601dffc77900..c54b60749601d 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -418,7 +418,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, innerCtx: e.innerCtx, outerCtx: e.outerCtx, ctx: e.ctx, - executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize), + executorChk: e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize, e.maxChunkSize, e.AllocPool), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 92f195985a191..05cc337d3d7ee 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -226,7 +226,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork outerCtx: e.outerCtx, taskCh: taskCh, ctx: e.ctx, - executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize), + executorChk: e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize, e.maxChunkSize, e.AllocPool), indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, @@ -431,7 +431,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize for requiredRows > task.outerResult.Len() { - chk := ow.ctx.GetSessionVars().GetNewChunk(ow.outerCtx.rowTypes, maxChunkSize) + chk := ow.ctx.GetSessionVars().GetNewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize, ow.executor.base().AllocPool) chk = chk.SetRequiredRows(requiredRows, maxChunkSize) err := Next(ctx, ow.executor, chk) if err != nil { @@ -462,7 +462,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) { } task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks()) for i := range task.encodedLookUpKeys { - task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows()) + task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows(), task.outerResult.GetChunk(i).NumRows(), ow.executor.base().AllocPool) } return task, nil } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 1ba2c2940c3fd..8bd379944c825 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -706,7 +706,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo // fetchNextInnerResult collects a chunk of inner results from inner child executor. func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) { - task.innerResult = imw.ctx.GetSessionVars().GetNewChunk(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize) + task.innerResult = imw.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize, imw.ctx.GetSessionVars().MaxChunkSize, imw.innerExec.base().AllocPool) err = Next(ctx, imw.innerExec, task.innerResult) task.innerIter = chunk.NewIterator4Chunk(task.innerResult) beginRow = task.innerIter.Begin() diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 82c6ab2f50817..0e7eb394710fd 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -508,7 +508,7 @@ func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { - chk := w.sc.GetSessionVars().GetNewChunk(retTypes(w.tableReader), w.maxChunkSize) + chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool) var basic *execdetails.BasicRuntimeStats if be := w.tableReader.base(); be != nil && be.runtimeStats != nil { basic = be.runtimeStats @@ -817,7 +817,7 @@ func (w *partialIndexWorker) fetchHandles( resultCh chan<- *lookupTableTask, finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) { - chk := w.sc.GetSessionVars().GetNewChunk(handleCols.GetFieldsTypes(), w.maxChunkSize) + chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize) var basicStats *execdetails.BasicRuntimeStats if w.stats != nil { if w.idxID != 0 { diff --git a/executor/join.go b/executor/join.go index 48d3e5d5a56f8..87781a49d7c50 100644 --- a/executor/join.go +++ b/executor/join.go @@ -297,7 +297,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load().(bool) { return } - chk := e.ctx.GetSessionVars().GetNewChunk(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize) + chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) err = Next(ctx, e.buildSideExec, chk) if err != nil { e.buildFinished <- errors.Trace(err) diff --git a/executor/joiner.go b/executor/joiner.go index 5fe4d92eba2a2..842135802444f 100644 --- a/executor/joiner.go +++ b/executor/joiner.go @@ -192,7 +192,7 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType, return &antiLeftOuterSemiJoiner{base} case plannercore.LeftOuterJoin, plannercore.RightOuterJoin, plannercore.InnerJoin: if len(base.conditions) > 0 { - base.chk = ctx.GetSessionVars().GetNewChunk(shallowRowType, ctx.GetSessionVars().MaxChunkSize) + base.chk = chunk.NewChunkWithCapacity(shallowRowType, ctx.GetSessionVars().MaxChunkSize) } switch joinType { case plannercore.LeftOuterJoin: diff --git a/executor/pipelined_window.go b/executor/pipelined_window.go index 505cf09f415d7..cda1d9c389fd0 100644 --- a/executor/pipelined_window.go +++ b/executor/pipelined_window.go @@ -217,7 +217,7 @@ func (e *PipelinedWindowExec) fetchChild(ctx context.Context) (EOF bool, err err } // TODO: reuse chunks - resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows) + resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows, e.AllocPool) err = e.copyChk(childResult, resultChk) if err != nil { return false, err diff --git a/executor/window.go b/executor/window.go index ef284344d0c8c..aaa1e51cacc85 100644 --- a/executor/window.go +++ b/executor/window.go @@ -162,7 +162,7 @@ func (e *WindowExec) fetchChild(ctx context.Context) (EOF bool, err error) { return true, nil } - resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows) + resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows, e.AllocPool) err = e.copyChk(childResult, resultChk) if err != nil { return false, err diff --git a/server/conn.go b/server/conn.go index b87eb5579219f..b319679fdbf14 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1121,8 +1121,8 @@ func (cc *clientConn) Run(ctx context.Context) { startTime := time.Now() err = cc.dispatch(ctx, data) + cc.ctx.GetSessionVars().ClearAlloc(&cc.chunkAlloc, err != nil) cc.chunkAlloc.Reset() - cc.ctx.GetSessionVars().ClearAlloc() if err != nil { cc.audit(plugin.Error) // tell the plugin API there was a dispatch error if terror.ErrorEqual(err, io.EOF) { diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 47d0dc241a3ed..436b2d1721ca0 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -307,7 +307,7 @@ const ( func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) { cc.ctx.GetSessionVars().StartTime = time.Now() - cc.ctx.GetSessionVars().ClearAlloc() + cc.ctx.GetSessionVars().ClearAlloc(nil, false) stmtID, fetchSize, err := parseStmtFetchCmd(data) if err != nil { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e046d28fdfc6f..b11b727079630 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -96,6 +96,12 @@ type RetryInfo struct { LastRcReadTS uint64 } +// ReuseChunkPool save Alloc object +type ReuseChunkPool struct { + mu sync.Mutex + Alloc chunk.Allocator +} + // Clean does some clean work. func (r *RetryInfo) Clean() { r.autoIncrementIDs.clean() @@ -1292,10 +1298,7 @@ type SessionVars struct { OptPrefixIndexSingleScan bool // ChunkPool Several chunks and columns are cached - ChunkPool struct { - Lock sync.Mutex - Alloc chunk.Allocator - } + ChunkPool ReuseChunkPool // EnableReuseCheck indicates request chunk whether use chunk alloc EnableReuseCheck bool @@ -1304,34 +1307,18 @@ type SessionVars struct { preUseChunkAlloc bool } -// GetNewChunk Attempt to request memory from the chunk pool -// thread safety -func (s *SessionVars) GetNewChunk(fields []*types.FieldType, capacity int) *chunk.Chunk { - //Chunk memory pool is not set - if s.ChunkPool.Alloc == nil { - return chunk.NewChunkWithCapacity(fields, capacity) - } - s.ChunkPool.Lock.Lock() - defer s.ChunkPool.Lock.Unlock() - if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) { - s.StmtCtx.SetUseChunkAlloc() - } - chk := s.ChunkPool.Alloc.Alloc(fields, capacity, capacity) - return chk -} - // GetNewChunkWithCapacity Attempt to request memory from the chunk pool // thread safety -func (s *SessionVars) GetNewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk { - if s.ChunkPool.Alloc == nil { +func (s *SessionVars) GetNewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int, pool chunk.Allocator) *chunk.Chunk { + if pool == nil { return chunk.New(fields, capacity, maxCachesize) } - s.ChunkPool.Lock.Lock() - defer s.ChunkPool.Lock.Unlock() - if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) { + s.ChunkPool.mu.Lock() + defer s.ChunkPool.mu.Unlock() + if pool.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) { s.StmtCtx.SetUseChunkAlloc() } - chk := s.ChunkPool.Alloc.Alloc(fields, capacity, maxCachesize) + chk := pool.Alloc(fields, capacity, maxCachesize) return chk } @@ -1354,8 +1341,19 @@ func (s *SessionVars) SetAlloc(alloc chunk.Allocator) { } // ClearAlloc indicates stop reuse chunk -func (s *SessionVars) ClearAlloc() { +func (s *SessionVars) ClearAlloc(alloc *chunk.Allocator, b bool) { + if !b { + s.ChunkPool.Alloc = nil + return + } + + // If an error is reported, re-apply for alloc + // Prevent the goroutine left before, affecting the execution of the next sql + // issuse 38918 + s.ChunkPool.mu.Lock() s.ChunkPool.Alloc = nil + s.ChunkPool.mu.Unlock() + *alloc = chunk.NewAllocator() } // GetPreparedStmtByName returns the prepared statement specified by stmtName. @@ -1654,12 +1652,8 @@ func NewSessionVars(hctx HookContext) *SessionVars { ForeignKeyChecks: DefTiDBForeignKeyChecks, HookContext: hctx, EnableReuseCheck: DefTiDBEnableReusechunk, - //useChunkAlloc: DefTiDBUseAlloc, - preUseChunkAlloc: DefTiDBUseAlloc, - ChunkPool: struct { - Lock sync.Mutex - Alloc chunk.Allocator - }{Alloc: nil}, + preUseChunkAlloc: DefTiDBUseAlloc, + ChunkPool: ReuseChunkPool{Alloc: nil}, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 903ecf1bf4fa8..92049902618c2 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -444,10 +444,8 @@ func TestGetReuseChunk(t *testing.T) { require.Nil(t, sessVars.ChunkPool.Alloc) require.False(t, sessVars.GetUseChunkAlloc()) // alloc is nil ,Allocate memory from the system - chk1 := sessVars.GetNewChunk(fieldTypes, 10) + chk1 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10, sessVars.ChunkPool.Alloc) require.NotNil(t, chk1) - chk2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10) - require.NotNil(t, chk2) chunkReuseMap := make(map[*chunk.Chunk]struct{}, 14) columnReuseMap := make(map[*chunk.Column]struct{}, 14) @@ -461,35 +459,28 @@ func TestGetReuseChunk(t *testing.T) { //tries to apply from the cache initCap := 10 - chk1 = sessVars.GetNewChunk(fieldTypes, initCap) + chk1 = sessVars.GetNewChunkWithCapacity(fieldTypes, initCap, initCap, sessVars.ChunkPool.Alloc) require.NotNil(t, chk1) chunkReuseMap[chk1] = struct{}{} for i := 0; i < chk1.NumCols(); i++ { columnReuseMap[chk1.Column(i)] = struct{}{} } - chk2 = sessVars.GetNewChunkWithCapacity(fieldTypes, initCap, initCap) - require.NotNil(t, chk2) - chunkReuseMap[chk2] = struct{}{} - for i := 0; i < chk2.NumCols(); i++ { - columnReuseMap[chk2.Column(i)] = struct{}{} - } alloc.Reset() - chkres1 := sessVars.GetNewChunk(fieldTypes, 10) + chkres1 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10, sessVars.ChunkPool.Alloc) + require.NotNil(t, chkres1) _, exist := chunkReuseMap[chkres1] require.True(t, exist) for i := 0; i < chkres1.NumCols(); i++ { _, exist := columnReuseMap[chkres1.Column(i)] require.True(t, exist) } - chkres2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10) - require.NotNil(t, chkres2) - _, exist = chunkReuseMap[chkres2] - require.True(t, exist) - for i := 0; i < chkres2.NumCols(); i++ { - _, exist := columnReuseMap[chkres2.Column(i)] - require.True(t, exist) - } - sessVars.ClearAlloc() + allocpool := variable.ReuseChunkPool{Alloc: alloc} + + sessVars.ClearAlloc(&allocpool.Alloc, false) + require.Equal(t, alloc, allocpool.Alloc) + + sessVars.ClearAlloc(&allocpool.Alloc, true) + require.NotEqual(t, allocpool.Alloc, alloc) require.Nil(t, sessVars.ChunkPool.Alloc) } diff --git a/testkit/testkit.go b/testkit/testkit.go index 6952f4c36d484..25479bc862d96 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -115,7 +115,6 @@ func (tk *TestKit) Session() session.Session { // MustExec executes a sql statement and asserts nil error. func (tk *TestKit) MustExec(sql string, args ...interface{}) { defer func() { - tk.Session().GetSessionVars().ClearAlloc() if tk.alloc != nil { tk.alloc.Reset() } @@ -138,7 +137,6 @@ func (tk *TestKit) MustExecWithContext(ctx context.Context, sql string, args ... // If expected result is set it asserts the query result equals expected result. func (tk *TestKit) MustQuery(sql string, args ...interface{}) *Result { defer func() { - tk.Session().GetSessionVars().ClearAlloc() if tk.alloc != nil { tk.alloc.Reset() } @@ -271,7 +269,8 @@ func (tk *TestKit) Exec(sql string, args ...interface{}) (sqlexec.RecordSet, err } // ExecWithContext executes a sql statement using the prepared stmt API -func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) { +func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...interface{}) (rs sqlexec.RecordSet, err error) { + defer tk.Session().GetSessionVars().ClearAlloc(&tk.alloc, err != nil) if len(args) == 0 { sc := tk.session.GetSessionVars().StmtCtx prevWarns := sc.GetWarnings() @@ -315,7 +314,7 @@ func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...inte } params := expression.Args2Expressions4Test(args...) tk.Session().GetSessionVars().SetAlloc(tk.alloc) - rs, err := tk.session.ExecutePreparedStmt(ctx, stmtID, params) + rs, err = tk.session.ExecutePreparedStmt(ctx, stmtID, params) if err != nil { return rs, errors.Trace(err) } diff --git a/util/chunk/alloc.go b/util/chunk/alloc.go index 44fbb126a4989..af3385a644389 100644 --- a/util/chunk/alloc.go +++ b/util/chunk/alloc.go @@ -128,9 +128,9 @@ func (a *allocator) Reset() { a.allocated = a.allocated[:0] //column objects and put them to the column allocator for reuse. - for _, pool := range a.columnAlloc.pool { + for id, pool := range a.columnAlloc.pool { for _, col := range pool.allocColumns { - if (len(pool.freeColumns) < a.columnAlloc.freeColumnsPerType) && (!col.avoidReusing) && (cap(col.data) < MaxCachedLen) { + if (len(pool.freeColumns) < a.columnAlloc.freeColumnsPerType) && checkColumnType(id, col) { col.reset() pool.freeColumns = append(pool.freeColumns, col) } @@ -139,6 +139,27 @@ func (a *allocator) Reset() { } } +// checkColumnType check whether the conditions for entering the corresponding queue are met +// column Reset may change type +func checkColumnType(id int, col *Column) bool { + if col.avoidReusing { + return false + } + + if id == varElemLen { + //Take up too much memory, + if cap(col.data) > MaxCachedLen { + return false + } + return col.elemBuf == nil + } + + if col.elemBuf == nil { + return false + } + return id == cap(col.elemBuf) +} + var _ ColumnAllocator = &poolColumnAllocator{} type poolColumnAllocator struct { diff --git a/util/chunk/alloc_test.go b/util/chunk/alloc_test.go index edad5e3008e77..7c09d818d1222 100644 --- a/util/chunk/alloc_test.go +++ b/util/chunk/alloc_test.go @@ -270,3 +270,22 @@ func TestColumnAllocatorLimit(t *testing.T) { alloc = NewAllocator() require.False(t, alloc.CheckReuseAllocSize()) } + +func TestColumnAllocatorCheck(t *testing.T) { + fieldTypes := []*types.FieldType{ + types.NewFieldTypeBuilder().SetType(mysql.TypeFloat).BuildP(), + types.NewFieldTypeBuilder().SetType(mysql.TypeDatetime).BuildP(), + } + InitChunkAllocSize(10, 20) + alloc := NewAllocator() + for i := 0; i < 4; i++ { + alloc.Alloc(fieldTypes, 5, 10) + } + col := alloc.columnAlloc.NewColumn(types.NewFieldTypeBuilder().SetType(mysql.TypeFloat).BuildP(), 10) + col.Reset(types.ETDatetime) + alloc.Reset() + num := alloc.columnAlloc.pool[getFixedLen(types.NewFieldTypeBuilder().SetType(mysql.TypeFloat).BuildP())].Len() + require.Equal(t, num, 4) + num = alloc.columnAlloc.pool[getFixedLen(types.NewFieldTypeBuilder().SetType(mysql.TypeDatetime).BuildP())].Len() + require.Equal(t, num, 4) +}