From 076e3b29bb5f76b45d5be02a978d17477dc7e827 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 19 Jun 2019 09:53:18 +0800 Subject: [PATCH] executor: fix the issue that `pb` memory cannot be released quickly (#10815) (#10832) --- executor/memory_test.go | 109 ++++++++++++++++++++++++++++++++++++++ store/tikv/coprocessor.go | 29 +++++----- 2 files changed, 126 insertions(+), 12 deletions(-) create mode 100644 executor/memory_test.go diff --git a/executor/memory_test.go b/executor/memory_test.go new file mode 100644 index 0000000000000..42cadd883dbec --- /dev/null +++ b/executor/memory_test.go @@ -0,0 +1,109 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "context" + "fmt" + "runtime" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" +) + +var _ = SerialSuites(&testMemoryLeak{}) + +type testMemoryLeak struct { + store kv.Storage + domain *domain.Domain +} + +func (s *testMemoryLeak) SetUpSuite(c *C) { + var err error + s.store, err = mockstore.NewMockTikvStore() + c.Assert(err, IsNil) + s.domain, err = session.BootstrapSession(s.store) + c.Assert(err, IsNil) +} + +func (s *testMemoryLeak) TestPBMemoryLeak(c *C) { + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "create database test_mem") + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test_mem") + c.Assert(err, IsNil) + + // prepare data + totalSize := uint64(256 << 20) // 256MB + blockSize := uint64(8 << 10) // 8KB + delta := totalSize / 5 + numRows := totalSize / blockSize + _, err = se.Execute(context.Background(), fmt.Sprintf("create table t (c varchar(%v))", blockSize)) + c.Assert(err, IsNil) + defer func() { + _, err = se.Execute(context.Background(), "drop table t") + c.Assert(err, IsNil) + }() + sql := fmt.Sprintf("insert into t values (space(%v))", blockSize) + for i := uint64(0); i < numRows; i++ { + _, err = se.Execute(context.Background(), sql) + c.Assert(err, IsNil) + } + + // read data + runtime.GC() + allocatedBegin, inUseBegin := s.readMem() + records, err := se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + record := records[0] + rowCnt := 0 + chk := record.NewRecordBatch() + for { + c.Assert(record.Next(context.Background(), chk), IsNil) + rowCnt += chk.NumRows() + if chk.NumRows() == 0 { + break + } + } + c.Assert(rowCnt, Equals, int(numRows)) + + // check memory before close + runtime.GC() + allocatedAfter, inUseAfter := s.readMem() + c.Assert(allocatedAfter-allocatedBegin, GreaterEqual, totalSize) + c.Assert(s.memDiff(inUseAfter, inUseBegin), Less, delta) + + se.Close() + runtime.GC() + allocatedFinal, inUseFinal := s.readMem() + c.Assert(allocatedFinal-allocatedAfter, Less, delta) + c.Assert(s.memDiff(inUseFinal, inUseAfter), Less, delta) +} + +func (s *testMemoryLeak) readMem() (allocated, heapInUse uint64) { + var stat runtime.MemStats + runtime.ReadMemStats(&stat) + return stat.TotalAlloc, stat.HeapInuse +} + +func (s *testMemoryLeak) memDiff(m1, m2 uint64) uint64 { + if m1 > m2 { + return m1 - m2 + } + return m2 - m1 +} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 6f3976582ceff..a0c7e37eb084e 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -404,8 +404,8 @@ type copIteratorTaskSender struct { } type copResponse struct { - pbResp *coprocessor.Response - execdetails.ExecDetails + pbResp *coprocessor.Response + detail *execdetails.ExecDetails startKey kv.Key err error respSize int64 @@ -427,7 +427,7 @@ func (rs *copResponse) GetStartKey() kv.Key { } func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { - return &rs.ExecDetails + return rs.detail } // MemSize returns how many bytes of memory this response use @@ -438,9 +438,11 @@ func (rs *copResponse) MemSize() int64 { // ignore rs.err rs.respSize += int64(cap(rs.startKey)) - rs.respSize += int64(sizeofExecDetails) - if rs.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) + if rs.detail != nil { + rs.respSize += int64(sizeofExecDetails) + if rs.detail.CommitDetail != nil { + rs.respSize += int64(sizeofCommitDetails) + } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -805,19 +807,22 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon } else { resp.startKey = task.ranges.at(0).StartKey } - resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + if resp.detail == nil { + resp.detail = new(execdetails.ExecDetails) + } + resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond if rpcCtx != nil { - resp.CalleeAddress = rpcCtx.Addr + resp.detail.CalleeAddress = rpcCtx.Addr } if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { if handleTime := pbDetails.HandleTime; handleTime != nil { - resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond - resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond + resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond + resp.detail.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond } if scanDetail := pbDetails.ScanDetail; scanDetail != nil { if scanDetail.Write != nil { - resp.TotalKeys += scanDetail.Write.Total - resp.ProcessedKeys += scanDetail.Write.Processed + resp.detail.TotalKeys += scanDetail.Write.Total + resp.detail.ProcessedKeys += scanDetail.Write.Processed } } }