Skip to content

Commit

Permalink
executor: fix the issue that pb memory cannot be released quickly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and zz-jason committed Jun 19, 2019
1 parent 3c2ee56 commit 076e3b2
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 12 deletions.
109 changes: 109 additions & 0 deletions executor/memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"fmt"
"runtime"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
)

var _ = SerialSuites(&testMemoryLeak{})

type testMemoryLeak struct {
store kv.Storage
domain *domain.Domain
}

func (s *testMemoryLeak) SetUpSuite(c *C) {
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
s.domain, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testMemoryLeak) TestPBMemoryLeak(c *C) {
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "create database test_mem")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test_mem")
c.Assert(err, IsNil)

// prepare data
totalSize := uint64(256 << 20) // 256MB
blockSize := uint64(8 << 10) // 8KB
delta := totalSize / 5
numRows := totalSize / blockSize
_, err = se.Execute(context.Background(), fmt.Sprintf("create table t (c varchar(%v))", blockSize))
c.Assert(err, IsNil)
defer func() {
_, err = se.Execute(context.Background(), "drop table t")
c.Assert(err, IsNil)
}()
sql := fmt.Sprintf("insert into t values (space(%v))", blockSize)
for i := uint64(0); i < numRows; i++ {
_, err = se.Execute(context.Background(), sql)
c.Assert(err, IsNil)
}

// read data
runtime.GC()
allocatedBegin, inUseBegin := s.readMem()
records, err := se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)
record := records[0]
rowCnt := 0
chk := record.NewRecordBatch()
for {
c.Assert(record.Next(context.Background(), chk), IsNil)
rowCnt += chk.NumRows()
if chk.NumRows() == 0 {
break
}
}
c.Assert(rowCnt, Equals, int(numRows))

// check memory before close
runtime.GC()
allocatedAfter, inUseAfter := s.readMem()
c.Assert(allocatedAfter-allocatedBegin, GreaterEqual, totalSize)
c.Assert(s.memDiff(inUseAfter, inUseBegin), Less, delta)

se.Close()
runtime.GC()
allocatedFinal, inUseFinal := s.readMem()
c.Assert(allocatedFinal-allocatedAfter, Less, delta)
c.Assert(s.memDiff(inUseFinal, inUseAfter), Less, delta)
}

func (s *testMemoryLeak) readMem() (allocated, heapInUse uint64) {
var stat runtime.MemStats
runtime.ReadMemStats(&stat)
return stat.TotalAlloc, stat.HeapInuse
}

func (s *testMemoryLeak) memDiff(m1, m2 uint64) uint64 {
if m1 > m2 {
return m1 - m2
}
return m2 - m1
}
29 changes: 17 additions & 12 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ type copIteratorTaskSender struct {
}

type copResponse struct {
pbResp *coprocessor.Response
execdetails.ExecDetails
pbResp *coprocessor.Response
detail *execdetails.ExecDetails
startKey kv.Key
err error
respSize int64
Expand All @@ -427,7 +427,7 @@ func (rs *copResponse) GetStartKey() kv.Key {
}

func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails {
return &rs.ExecDetails
return rs.detail
}

// MemSize returns how many bytes of memory this response use
Expand All @@ -438,9 +438,11 @@ func (rs *copResponse) MemSize() int64 {

// ignore rs.err
rs.respSize += int64(cap(rs.startKey))
rs.respSize += int64(sizeofExecDetails)
if rs.CommitDetail != nil {
rs.respSize += int64(sizeofCommitDetails)
if rs.detail != nil {
rs.respSize += int64(sizeofExecDetails)
if rs.detail.CommitDetail != nil {
rs.respSize += int64(sizeofCommitDetails)
}
}
if rs.pbResp != nil {
// Using a approximate size since it's hard to get a accurate value.
Expand Down Expand Up @@ -805,19 +807,22 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
} else {
resp.startKey = task.ranges.at(0).StartKey
}
resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
if resp.detail == nil {
resp.detail = new(execdetails.ExecDetails)
}
resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
if rpcCtx != nil {
resp.CalleeAddress = rpcCtx.Addr
resp.detail.CalleeAddress = rpcCtx.Addr
}
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.detail.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
}
if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
if scanDetail.Write != nil {
resp.TotalKeys += scanDetail.Write.Total
resp.ProcessedKeys += scanDetail.Write.Processed
resp.detail.TotalKeys += scanDetail.Write.Total
resp.detail.ProcessedKeys += scanDetail.Write.Processed
}
}
}
Expand Down

0 comments on commit 076e3b2

Please sign in to comment.