Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: clean the memory usage of MemTracker when a query ends (#10898) #10970

Merged
merged 3 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")).
Build()
c.Assert(err, IsNil)

Expand Down Expand Up @@ -106,13 +107,29 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(numAllRows, Equals, 2)
err := response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectMemTracker(c *C) {
response, colTypes := s.createSelectNormal(2, 6, c, nil)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 3, 3)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.IsFull(), Equals, true)
err = response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
c.Assert(response.Close(), IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
Expand Down
5 changes: 5 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -49,6 +51,9 @@ type testSuite struct {

func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
Expand Down
45 changes: 31 additions & 14 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand Down Expand Up @@ -103,20 +104,25 @@ func (r *selectResult) fetch(ctx context.Context) {
if err != nil {
result.err = err
} else if resultSubset == nil {
// If the result is drained, the resultSubset would be nil
return
} else {
result.result = resultSubset
if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}
r.memConsume(int64(resultSubset.MemSize()))
}

select {
case r.results <- result:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
case <-ctx.Done():
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
}
}
Expand Down Expand Up @@ -161,24 +167,21 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.memConsume(-int64(re.result.MemSize()))
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
r.selectRespSize = r.selectResp.Size()
r.memConsume(int64(r.selectRespSize))
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down Expand Up @@ -234,13 +237,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
return nil
}

func (r *selectResult) memConsume(bytes int64) {
if r.memTracker != nil {
r.memTracker.Consume(bytes)
}
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
// Close this channel to tell the fetch goroutine to exit.
close(r.closed)
for re := range r.results {
if re.result != nil {
r.memConsume(-int64(re.result.MemSize()))
}
}
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
return r.resp.Close()
}