From 4eb6c8e39b9c24d7ea5520ddab0387b3ff61d31c Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Thu, 26 Dec 2019 11:57:39 +0800 Subject: [PATCH] executor: avoid `ProjectoinExec`'s goroutine leak (#14127) (#14226) --- executor/explain.go | 33 ++++++++++--- executor/explain_unit_test.go | 92 +++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 executor/explain_unit_test.go diff --git a/executor/explain.go b/executor/explain.go index 8dca1e894ba41..2e12fd9d417b2 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -17,6 +17,7 @@ import ( "context" "github.com/cznic/mathutil" + "github.com/pingcap/errors" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util/chunk" ) @@ -70,23 +71,39 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } -func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { +func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) { + closed := false + defer func() { + if !closed && e.analyzeExec != nil { + err = e.analyzeExec.Close() + closed = true + } + }() if e.analyzeExec != nil { chk := newFirstChunk(e.analyzeExec) + var nextErr, closeErr error for { - err := e.analyzeExec.Next(ctx, chk) - if err != nil { - return nil, err - } - if chk.NumRows() == 0 { + nextErr = Next(ctx, e.analyzeExec, chk) + if nextErr != nil || chk.NumRows() == 0 { break } } - if err := e.analyzeExec.Close(); err != nil { + closeErr = e.analyzeExec.Close() + closed = true + if nextErr != nil { + if closeErr != nil { + err = errors.New(nextErr.Error() + ", " + closeErr.Error()) + } else { + err = nextErr + } + } else if closeErr != nil { + err = closeErr + } + if err != nil { return nil, err } } - if err := e.explain.RenderResult(); err != nil { + if err = e.explain.RenderResult(); err != nil { return nil, err } if e.analyzeExec != nil { diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go new file mode 100644 index 0000000000000..7c57a8d6908e3 --- /dev/null +++ b/executor/explain_unit_test.go @@ -0,0 +1,92 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "errors" + "testing" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/mock" +) + +var ( + _ Executor = &mockErrorOperator{} +) + +type mockErrorOperator struct { + baseExecutor + toPanic bool + closed bool +} + +func (e *mockErrorOperator) Open(ctx context.Context) error { + return nil +} + +func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error { + if e.toPanic { + panic("next panic") + } else { + return errors.New("next error") + } +} + +func (e *mockErrorOperator) Close() error { + e.closed = true + return errors.New("close error") +} + +func getColumns() []*expression.Column { + return []*expression.Column{ + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, + } +} + +// close() must be called after next() to avoid goroutines leak +func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize + ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize + schema := expression.NewSchema(getColumns()...) + baseExec := newBaseExecutor(ctx, schema, nil) + explainExec := &ExplainExec{ + baseExecutor: baseExec, + explain: nil, + } + // mockErrorOperator returns errors + mockOper := mockErrorOperator{baseExec, false, false} + explainExec.analyzeExec = &mockOper + tmpCtx := context.Background() + _, err := explainExec.generateExplainInfo(tmpCtx) + + expectedStr := "next error, close error" + if err.Error() != expectedStr || !mockOper.closed { + t.Errorf(err.Error()) + } + // mockErrorOperator panic + mockOper = mockErrorOperator{baseExec, true, false} + explainExec.analyzeExec = &mockOper + defer func() { + if panicErr := recover(); panicErr == nil || !mockOper.closed { + t.Errorf("panic test failed: without panic or close() is not called") + } + }() + _, err = explainExec.generateExplainInfo(tmpCtx) +}