From ccf27d00246be4bee0882ecba51467e0e07dfeea Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 18 Dec 2019 23:15:25 +0800 Subject: [PATCH 1/9] avoid goroutine leak --- executor/explain.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index 42898da066dbe..685d9c7047faa 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -73,17 +73,22 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { if e.analyzeExec != nil { chk := newFirstChunk(e.analyzeExec) + var next_err, close_err error for { - err := Next(ctx, e.analyzeExec, chk) - if err != nil { - return nil, err + next_err = Next(ctx, e.analyzeExec, chk) + if next_err != nil { + break } if chk.NumRows() == 0 { break } } - if err := e.analyzeExec.Close(); err != nil { - return nil, err + close_err = e.analyzeExec.Close() + if next_err != nil { + return nil, next_err + } + if close_err != nil { + return nil, close_err } } if err := e.explain.RenderResult(); err != nil { From 82d34b273ba1518a9c61ee55e4bf2c0d6b4d68d3 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Fri, 20 Dec 2019 17:59:26 +0800 Subject: [PATCH 2/9] add failpoint test for project --- executor/explain.go | 25 +++++++++++++------------ executor/explain_test.go | 18 ++++++++++++++++++ executor/projection.go | 15 +++++++++++++-- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index 685d9c7047faa..a7c7af0b520d1 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -15,6 +15,7 @@ package executor import ( "context" + "github.com/pingcap/errors" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util/chunk" @@ -73,22 +74,22 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { if e.analyzeExec != nil { chk := newFirstChunk(e.analyzeExec) - var next_err, close_err error + var nextErr, closeErr error for { - next_err = Next(ctx, e.analyzeExec, chk) - if next_err != nil { - break - } - if chk.NumRows() == 0 { + nextErr = Next(ctx, e.analyzeExec, chk) + if nextErr != nil || chk.NumRows() == 0 { break } } - close_err = e.analyzeExec.Close() - if next_err != nil { - return nil, next_err - } - if close_err != nil { - return nil, close_err + closeErr = e.analyzeExec.Close() + if nextErr != nil { + if closeErr != nil { + return nil, errors.New(nextErr.Error() + ", " + closeErr.Error()) + } else { + return nil, nextErr + } + } else if closeErr != nil { + return nil, closeErr } } if err := e.explain.RenderResult(); err != nil { diff --git a/executor/explain_test.go b/executor/explain_test.go index 96c2880beff78..898ef163b979b 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -15,6 +15,7 @@ package executor_test import ( "fmt" + "github.com/pingcap/failpoint" "strings" . "github.com/pingcap/check" @@ -106,6 +107,23 @@ func (s *testSuite1) TestExplainWrite(c *C) { tk.MustQuery("select * from t order by a").Check(testkit.Rows("1", "2")) } +func (s *testSuite1) TestGoroutineLeakInExplainAnalyzeForProjection(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockErrorsInNextOfProjection", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockErrorsInCloseOfProjection", `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockErrorsInNextOfProjection"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockErrorsInCloseOfProjection"), IsNil) + + }() + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk.MustExec("insert into t values(1),(2),(3),(4)") + tk.MustExec("set @@tidb_projection_concurrency = 4") + err := tk.QueryToErr("explain analyze select a*200 from t") + c.Assert(err.Error(), Equals, "goroutines leak in next() of projection, An error in close() of projection") +} + func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") diff --git a/executor/projection.go b/executor/projection.go index 3e142fe704793..480eb79c6dd77 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "github.com/pingcap/failpoint" "sync" "sync/atomic" @@ -209,7 +210,11 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) if err != nil { return err } - + failpoint.Inject("mockErrorsInNextOfProjection", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.New("goroutines leak in next() of projection")) + } + }) mSize := output.chk.MemoryUsage() chk.SwapColumns(output.chk) e.memTracker.Consume(output.chk.MemoryUsage() - mSize) @@ -313,7 +318,13 @@ func (e *ProjectionExec) Close() error { e.runtimeStats.SetConcurrencyInfo("Concurrency", int(e.numWorkers)) } } - return e.baseExecutor.Close() + err := e.baseExecutor.Close() + failpoint.Inject("mockErrorsInCloseOfProjection", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.New("An error in close() of projection")) + } + }) + return err } type projectionInputFetcher struct { From 3c234ea78b3d2f878e990b1f7d0471942ef19462 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Fri, 20 Dec 2019 21:42:44 +0800 Subject: [PATCH 3/9] add unit test for explain --- executor/explain_test.go | 18 --------- executor/explain_unit_test.go | 74 +++++++++++++++++++++++++++++++++++ executor/projection.go | 14 +------ 3 files changed, 75 insertions(+), 31 deletions(-) create mode 100644 executor/explain_unit_test.go diff --git a/executor/explain_test.go b/executor/explain_test.go index 898ef163b979b..96c2880beff78 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -15,7 +15,6 @@ package executor_test import ( "fmt" - "github.com/pingcap/failpoint" "strings" . "github.com/pingcap/check" @@ -107,23 +106,6 @@ func (s *testSuite1) TestExplainWrite(c *C) { tk.MustQuery("select * from t order by a").Check(testkit.Rows("1", "2")) } -func (s *testSuite1) TestGoroutineLeakInExplainAnalyzeForProjection(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockErrorsInNextOfProjection", `return(true)`), IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockErrorsInCloseOfProjection", `return(true)`), IsNil) - defer func() { - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockErrorsInNextOfProjection"), IsNil) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockErrorsInCloseOfProjection"), IsNil) - - }() - tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a int)") - tk.MustExec("insert into t values(1),(2),(3),(4)") - tk.MustExec("set @@tidb_projection_concurrency = 4") - err := tk.QueryToErr("explain analyze select a*200 from t") - c.Assert(err.Error(), Equals, "goroutines leak in next() of projection, An error in close() of projection") -} - func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go new file mode 100644 index 0000000000000..0dfc008d20826 --- /dev/null +++ b/executor/explain_unit_test.go @@ -0,0 +1,74 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "errors" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/mock" + "testing" +) + +var ( + _ Executor = &mockErrorOperator{} +) + +type mockErrorOperator struct { + baseExecutor +} + +func (e *mockErrorOperator) Open(ctx context.Context) error { + return nil +} + +func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error { + return errors.New("next error") +} + +func (e *mockErrorOperator) Close() error { + return errors.New("close error") +} + +func getColumns() []*expression.Column { + return []*expression.Column{ + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, + } +} + +// close() must be called after next() to avoid goroutines leak +func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { + ctx := mock.NewContext() + ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize + ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize + schema := expression.NewSchema(getColumns()...) + baseExec := newBaseExecutor(ctx, schema, nil) + explainExec := &ExplainExec{ + baseExecutor: baseExec, + explain: nil, + } + explainExec.analyzeExec = &mockErrorOperator{baseExec} + tmpCtx := context.Background() + _, err := explainExec.generateExplainInfo(tmpCtx) + + expectedStr := "next error, close error" + if err.Error() != expectedStr { + t.Errorf(err.Error()) + } +} diff --git a/executor/projection.go b/executor/projection.go index 480eb79c6dd77..bf27a651ecba1 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -16,7 +16,6 @@ package executor import ( "context" "fmt" - "github.com/pingcap/failpoint" "sync" "sync/atomic" @@ -210,11 +209,6 @@ func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) if err != nil { return err } - failpoint.Inject("mockErrorsInNextOfProjection", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(errors.New("goroutines leak in next() of projection")) - } - }) mSize := output.chk.MemoryUsage() chk.SwapColumns(output.chk) e.memTracker.Consume(output.chk.MemoryUsage() - mSize) @@ -318,13 +312,7 @@ func (e *ProjectionExec) Close() error { e.runtimeStats.SetConcurrencyInfo("Concurrency", int(e.numWorkers)) } } - err := e.baseExecutor.Close() - failpoint.Inject("mockErrorsInCloseOfProjection", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(errors.New("An error in close() of projection")) - } - }) - return err + return e.baseExecutor.Close() } type projectionInputFetcher struct { From 01286328e897d23536da007ccca68ea00bf4f042 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 10:51:00 +0800 Subject: [PATCH 4/9] format including fils --- executor/explain.go | 4 ++-- executor/explain_unit_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index a7c7af0b520d1..3660e64d0daa5 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -15,11 +15,11 @@ package executor import ( "context" - "github.com/pingcap/errors" + "github.com/cznic/mathutil" + "github.com/pingcap/errors" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/mathutil" ) // ExplainExec represents an explain executor. diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go index 0dfc008d20826..50ada19efca15 100644 --- a/executor/explain_unit_test.go +++ b/executor/explain_unit_test.go @@ -16,6 +16,7 @@ package executor import ( "context" "errors" + "testing" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" @@ -23,7 +24,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/mock" - "testing" ) var ( From 2ff11d38618520a4d2a3828adc32a8fbfbafc5fa Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 13:11:16 +0800 Subject: [PATCH 5/9] handle panic error --- executor/explain.go | 23 ++++++++++++++++++----- executor/explain_unit_test.go | 16 +++++++++++++++- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index 3660e64d0daa5..196497e653ec5 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -71,7 +71,16 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } -func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) { +func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) { + closed := false + defer func() { + // handle panic + recover() + if !closed { + err = e.analyzeExec.Close() + closed = true + } + }() if e.analyzeExec != nil { chk := newFirstChunk(e.analyzeExec) var nextErr, closeErr error @@ -82,17 +91,21 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, erro } } closeErr = e.analyzeExec.Close() + closed = true if nextErr != nil { if closeErr != nil { - return nil, errors.New(nextErr.Error() + ", " + closeErr.Error()) + err = errors.New(nextErr.Error() + ", " + closeErr.Error()) } else { - return nil, nextErr + err = nextErr } } else if closeErr != nil { - return nil, closeErr + err = closeErr + } + if err != nil { + return nil, err } } - if err := e.explain.RenderResult(); err != nil { + if err = e.explain.RenderResult(); err != nil { return nil, err } if e.analyzeExec != nil { diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go index 50ada19efca15..87e77cb889e65 100644 --- a/executor/explain_unit_test.go +++ b/executor/explain_unit_test.go @@ -32,6 +32,7 @@ var ( type mockErrorOperator struct { baseExecutor + toPanic bool } func (e *mockErrorOperator) Open(ctx context.Context) error { @@ -39,6 +40,9 @@ func (e *mockErrorOperator) Open(ctx context.Context) error { } func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error { + if e.toPanic { + panic("next panic") + } return errors.New("next error") } @@ -63,7 +67,8 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { baseExecutor: baseExec, explain: nil, } - explainExec.analyzeExec = &mockErrorOperator{baseExec} + // mockErrorOperator returns errors + explainExec.analyzeExec = &mockErrorOperator{baseExec, false} tmpCtx := context.Background() _, err := explainExec.generateExplainInfo(tmpCtx) @@ -71,4 +76,13 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { if err.Error() != expectedStr { t.Errorf(err.Error()) } + // mockErrorOperator panic + explainExec.analyzeExec = &mockErrorOperator{baseExec, true} + _, err = explainExec.generateExplainInfo(tmpCtx) + + expectedStr = "close error" + if err.Error() != expectedStr { + t.Errorf(err.Error()) + } + } From ad31a60f07c8cf02f892e2a84fb384494538abb9 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 13:23:36 +0800 Subject: [PATCH 6/9] format code --- executor/explain.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index 196497e653ec5..eaac1ffd15f71 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -75,8 +75,7 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, closed := false defer func() { // handle panic - recover() - if !closed { + if panicErr := recover(); panicErr != nil && !closed { err = e.analyzeExec.Close() closed = true } From c2d17ac4f125908828622d07f02ebee080f900c5 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 16:09:26 +0800 Subject: [PATCH 7/9] add panic test --- executor/explain.go | 3 +-- executor/explain_unit_test.go | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index eaac1ffd15f71..ff57f75503006 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -74,8 +74,7 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) { closed := false defer func() { - // handle panic - if panicErr := recover(); panicErr != nil && !closed { + if !closed { err = e.analyzeExec.Close() closed = true } diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go index 87e77cb889e65..7c45b26c07d5d 100644 --- a/executor/explain_unit_test.go +++ b/executor/explain_unit_test.go @@ -33,6 +33,7 @@ var ( type mockErrorOperator struct { baseExecutor toPanic bool + closed bool } func (e *mockErrorOperator) Open(ctx context.Context) error { @@ -42,11 +43,13 @@ func (e *mockErrorOperator) Open(ctx context.Context) error { func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error { if e.toPanic { panic("next panic") + } else { + return errors.New("next error") } - return errors.New("next error") } func (e *mockErrorOperator) Close() error { + e.closed = true return errors.New("close error") } @@ -68,7 +71,7 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { explain: nil, } // mockErrorOperator returns errors - explainExec.analyzeExec = &mockErrorOperator{baseExec, false} + explainExec.analyzeExec = &mockErrorOperator{baseExec, false, false} tmpCtx := context.Background() _, err := explainExec.generateExplainInfo(tmpCtx) @@ -77,12 +80,13 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { t.Errorf(err.Error()) } // mockErrorOperator panic - explainExec.analyzeExec = &mockErrorOperator{baseExec, true} + mockOper := mockErrorOperator{baseExec, true, false} + explainExec.analyzeExec = &mockOper + defer func() { + if panicErr := recover(); panicErr != nil && !mockOper.closed { + t.Errorf(err.Error()) + } + }() _, err = explainExec.generateExplainInfo(tmpCtx) - expectedStr = "close error" - if err.Error() != expectedStr { - t.Errorf(err.Error()) - } - } From d090ccc9b6bbe5cdd55f503c2e965cd0b3aab8ff Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 16:24:45 +0800 Subject: [PATCH 8/9] update panic test --- executor/explain.go | 2 +- executor/explain_unit_test.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/executor/explain.go b/executor/explain.go index ff57f75503006..2e12fd9d417b2 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -74,7 +74,7 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error { func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) { closed := false defer func() { - if !closed { + if !closed && e.analyzeExec != nil { err = e.analyzeExec.Close() closed = true } diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go index 7c45b26c07d5d..6a52e412f19b1 100644 --- a/executor/explain_unit_test.go +++ b/executor/explain_unit_test.go @@ -71,16 +71,17 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { explain: nil, } // mockErrorOperator returns errors - explainExec.analyzeExec = &mockErrorOperator{baseExec, false, false} + mockOper := mockErrorOperator{baseExec, false, false} + explainExec.analyzeExec = &mockOper tmpCtx := context.Background() _, err := explainExec.generateExplainInfo(tmpCtx) expectedStr := "next error, close error" - if err.Error() != expectedStr { + if err.Error() != expectedStr || !mockOper.closed { t.Errorf(err.Error()) } // mockErrorOperator panic - mockOper := mockErrorOperator{baseExec, true, false} + mockOper = mockErrorOperator{baseExec, true, false} explainExec.analyzeExec = &mockOper defer func() { if panicErr := recover(); panicErr != nil && !mockOper.closed { From 488d7e1b73ebd3eea578baaf42a1d477741760b3 Mon Sep 17 00:00:00 2001 From: fzhedu Date: Wed, 25 Dec 2019 16:43:10 +0800 Subject: [PATCH 9/9] update error --- executor/explain_unit_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/executor/explain_unit_test.go b/executor/explain_unit_test.go index 6a52e412f19b1..7c57a8d6908e3 100644 --- a/executor/explain_unit_test.go +++ b/executor/explain_unit_test.go @@ -84,10 +84,9 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { mockOper = mockErrorOperator{baseExec, true, false} explainExec.analyzeExec = &mockOper defer func() { - if panicErr := recover(); panicErr != nil && !mockOper.closed { - t.Errorf(err.Error()) + if panicErr := recover(); panicErr == nil || !mockOper.closed { + t.Errorf("panic test failed: without panic or close() is not called") } }() _, err = explainExec.generateExplainInfo(tmpCtx) - }