From c12976a436a7954e68c2a26fc0169d2bac77db96 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 29 Nov 2021 11:49:52 +0800 Subject: [PATCH 1/2] *: track the memory usage of IndexJoin more accurate (#29068) (#30091) --- distsql/request_builder.go | 40 +++++++++++++++-- executor/builder.go | 58 ++++++++++++++----------- executor/executor_pkg_test.go | 2 +- executor/executor_test.go | 59 ++++++++++++++++++++++++++ executor/index_lookup_hash_join.go | 30 +++++++++++++ executor/index_lookup_join.go | 48 ++++++++++++++++++--- executor/index_lookup_merge_join.go | 2 +- planner/core/exhaust_physical_plans.go | 6 +++ util/memory/tracker.go | 4 ++ 9 files changed, 213 insertions(+), 36 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index ed97014d6154b..c0ad23066055e 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -17,6 +17,7 @@ import ( "fmt" "math" "sort" + "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -553,13 +554,25 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange { // IndexRangesToKVRanges converts index ranges to "KeyRange". func IndexRangesToKVRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { - return IndexRangesToKVRangesForTables(sc, []int64{tid}, idxID, ranges, fb) + return IndexRangesToKVRangesWithInterruptSignal(sc, tid, idxID, ranges, fb, nil, nil) +} + +// IndexRangesToKVRangesWithInterruptSignal converts index ranges to "KeyRange". +// The process can be interrupted by set `interruptSignal` to true. +func IndexRangesToKVRangesWithInterruptSignal(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { + return indexRangesToKVRangesForTablesWithInterruptSignal(sc, []int64{tid}, idxID, ranges, fb, memTracker, interruptSignal) } // IndexRangesToKVRangesForTables converts indexes ranges to "KeyRange". func IndexRangesToKVRangesForTables(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) { + return indexRangesToKVRangesForTablesWithInterruptSignal(sc, tids, idxID, ranges, fb, nil, nil) +} + +// IndexRangesToKVRangesForTablesWithInterruptSignal converts indexes ranges to "KeyRange". +// The process can be interrupted by set `interruptSignal` to true. +func indexRangesToKVRangesForTablesWithInterruptSignal(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, fb *statistics.QueryFeedback, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { if fb == nil || fb.Hist == nil { - return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges) + return indexRangesToKVWithoutSplit(sc, tids, idxID, ranges, memTracker, interruptSignal) } feedbackRanges := make([]*ranger.Range, 0, len(ranges)) for _, ran := range ranges { @@ -644,18 +657,37 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc return true } -func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) { +func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) { krs := make([]kv.KeyRange, 0, len(ranges)) - for _, ran := range ranges { + const CheckSignalStep = 8 + var estimatedMemUsage int64 + // encodeIndexKey and EncodeIndexSeekKey is time-consuming, thus we need to + // check the interrupt signal periodically. + for i, ran := range ranges { low, high, err := encodeIndexKey(sc, ran) if err != nil { return nil, err } + if i == 0 { + estimatedMemUsage += int64(cap(low) + cap(high)) + } for _, tid := range tids { startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low) endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high) + if i == 0 { + estimatedMemUsage += int64(cap(startKey)) + int64(cap(endKey)) + } krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey}) } + if i%CheckSignalStep == 0 { + if i == 0 && memTracker != nil { + estimatedMemUsage *= int64(len(ranges)) + memTracker.Consume(estimatedMemUsage) + } + if interruptSignal != nil && interruptSignal.Load().(bool) { + return nil, nil + } + } } return krs, nil } diff --git a/executor/builder.go b/executor/builder.go index 2c04de8638624..3a315862d74f3 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unsafe" @@ -54,6 +55,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/timeutil" @@ -2524,6 +2526,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin) indexRanges: v.Ranges, keyOff2IdxOff: v.KeyOff2IdxOff, lastColHelper: v.CompareFilters, + finished: &atomic.Value{}, } childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) e.joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, leftTypes, rightTypes, childrenUsedSchema) @@ -3353,21 +3356,21 @@ type mockPhysicalIndexReader struct { } func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent, - IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { - return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { + return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) } func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent, - IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { switch v := plan.(type) { case *plannercore.PhysicalTableReader: - return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) case *plannercore.PhysicalIndexReader: - return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) case *plannercore.PhysicalIndexLookUpReader: - return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) case *plannercore.PhysicalUnionScan: - return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) // The inner child of IndexJoin might be Projection when a combination of the following conditions is true: // 1. The inner child fetch data using indexLookupReader // 2. PK is not handle @@ -3375,11 +3378,11 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context. // In this case, an extra column tidb_rowid will be appended in the output result of IndexLookupReader(see copTask.doubleReadNeedProj). // Then we need a Projection upon IndexLookupReader to prune the redundant column. case *plannercore.PhysicalProjection: - return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc) + return builder.buildProjectionForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) // Need to support physical selection because after PR 16389, TiDB will push down all the expr supported by TiKV or TiFlash // in predicate push down stage, so if there is an expr which only supported by TiFlash, a physical selection will be added after index read case *plannercore.PhysicalSelection: - childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles) + childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3397,9 +3400,9 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context. func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, - cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder} - reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles) + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3413,7 +3416,7 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, - cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) { + cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { return nil, err @@ -3421,7 +3424,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte tbInfo := e.table.Meta() if v.IsCommonHandle { if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3450,7 +3453,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte return nil, err } pid := p.GetPhysicalID() - tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc) + tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal) if err != nil { return nil, err } @@ -3465,7 +3468,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte kvRanges = make([]kv.KeyRange, 0, len(partitions)*len(lookUpContents)) for _, p := range partitions { pid := p.GetPhysicalID() - tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3631,14 +3634,14 @@ func (builder *dataReaderBuilder) buildTableReaderFromKvRanges(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memoryTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeIndexReader(builder.executorBuilder, v) if err != nil { return nil, err } tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + kvRanges, err := buildKvRangesForIndexJoin(e.ctx, e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal) if err != nil { return nil, err } @@ -3677,7 +3680,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte } func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v) if err != nil { return nil, err @@ -3685,7 +3688,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context tbInfo := e.table.Meta() if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() { - e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + e.kvRanges, err = buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3725,12 +3728,12 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context } func (builder *dataReaderBuilder) buildProjectionForIndexJoin(ctx context.Context, v *plannercore.PhysicalProjection, - lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) { + lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) { physicalIndexLookUp, isDoubleRead := v.Children()[0].(*plannercore.PhysicalIndexLookUpReader) if !isDoubleRead { return nil, errors.Errorf("inner child of Projection should be IndexLookupReader, but got %T", v) } - childExec, err := builder.buildIndexLookUpReaderForIndexJoin(ctx, physicalIndexLookUp, lookUpContents, indexRanges, keyOff2IdxOff, cwc) + childExec, err := builder.buildIndexLookUpReaderForIndexJoin(ctx, physicalIndexLookUp, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal) if err != nil { return nil, err } @@ -3797,7 +3800,7 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin // buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan. func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent, - ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (_ []kv.KeyRange, err error) { + ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) { kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents)) lastPos := len(ranges[0].LowVal) - 1 sc := ctx.GetSessionVars().StmtCtx @@ -3816,7 +3819,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l if indexID == -1 { tmpKvRanges, err = distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, ranges) } else { - tmpKvRanges, err = distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil) + tmpKvRanges, err = distsql.IndexRangesToKVRangesWithInterruptSignal(sc, tableID, indexID, ranges, nil, memTracker, interruptSignal) } if err != nil { return nil, err @@ -3838,7 +3841,12 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l } } } - + if len(kvRanges) != 0 && memTracker != nil { + memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges))) + } + if len(tmpDatumRanges) != 0 && memTracker != nil { + memTracker.Consume(2 * int64(len(tmpDatumRanges)) * types.EstimatedMemUsage(tmpDatumRanges[0].LowVal, len(tmpDatumRanges))) + } if cwc == nil { sort.Slice(kvRanges, func(i, j int) bool { return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0 @@ -3854,7 +3862,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l if indexID == -1 { return distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges) } - return distsql.IndexRangesToKVRanges(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil) + return distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil, memTracker, interruptSignal) } func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) Executor { diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 5b81b1afce095..0c8505703fb2a 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -195,7 +195,7 @@ func (s *testExecSuite) TestBuildKvRangesForIndexJoinWithoutCwc(c *C) { keyOff2IdxOff := []int{1, 3} ctx := mock.NewContext() - kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil) + kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil) c.Assert(err, IsNil) // Check the kvRanges is in order. for i, kvRange := range kvRanges { diff --git a/executor/executor_test.go b/executor/executor_test.go index d78ae6bdf55b3..f33554fec287f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "math" + "math/rand" "net" "os" "strconv" @@ -8754,3 +8755,61 @@ func (s *testSuite) TestIssue26532(c *C) { tk.MustQuery("select greatest(\"2020-01-01 01:01:01\" ,\"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2020-01-01 01:01:01", "")) tk.MustQuery("select least(\"2020-01-01 01:01:01\" , \"2019-01-01 01:01:01\" )union select null;").Sort().Check(testkit.Rows("2019-01-01 01:01:01", "")) } + +func (s *testSerialSuite) TestIssue28650(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, c int, b char(50), index(a,c,b));") + tk.MustExec("set tidb_enable_rate_limit_action=off;") + + wg := &sync.WaitGroup{} + sql := `explain analyze + select /*+ stream_agg(@sel_1) stream_agg(@sel_3) %s(@sel_2 t2)*/ count(1) from + ( + SELECT t2.a AS t2_external_user_ext_id, t2.b AS t2_t1_ext_id FROM t2 INNER JOIN (SELECT t1.a AS d_t1_ext_id FROM t1 GROUP BY t1.a) AS anon_1 ON anon_1.d_t1_ext_id = t2.a WHERE t2.c = 123 AND t2.b + IN ("%s") ) tmp` + + wg.Add(1) + sqls := make([]string, 2) + go func() { + defer wg.Done() + inElems := make([]string, 1000) + for i := 0; i < len(inElems); i++ { + inElems[i] = fmt.Sprintf("wm_%dbDgAAwCD-v1QB%dxky-g_dxxQCw", rand.Intn(100), rand.Intn(100)) + } + sqls[0] = fmt.Sprintf(sql, "inl_join", strings.Join(inElems, "\",\"")) + sqls[1] = fmt.Sprintf(sql, "inl_hash_join", strings.Join(inElems, "\",\"")) + }() + + tk.MustExec("insert into t1 select rand()*400;") + for i := 0; i < 10; i++ { + tk.MustExec("insert into t1 select rand()*400 from t1;") + } + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + wg.Wait() + for _, sql := range sqls { + tk.MustExec("set @@tidb_mem_quota_query = 1073741824") // 1GB + c.Assert(tk.QueryToErr(sql), IsNil) + tk.MustExec("set @@tidb_mem_quota_query = 33554432") // 32MB, out of memory during executing + c.Assert(strings.Contains(tk.QueryToErr(sql).Error(), "Out Of Memory Quota!"), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query = 65536") // 64KB, out of memory during building the plan + func() { + defer func() { + r := recover() + c.Assert(r, NotNil) + err := errors.Errorf("%v", r) + c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) + }() + tk.MustExec(sql) + }() + } +} diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 62fa460c39111..4665839c1ffea 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -151,6 +152,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } + e.finished.Store(false) e.startWorkers(ctx) return nil } @@ -200,6 +202,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { + e.IndexLookUpJoin.finished.Store(true) err := errors.New(fmt.Sprintf("%v", r)) if !e.keepOuterOrder { e.resultCh <- &indexHashJoinResult{err: err} @@ -208,6 +211,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { e.taskCh <- task } if e.cancelFunc != nil { + e.IndexLookUpJoin.ctxCancelReason.Store(err) e.cancelFunc() } } @@ -244,6 +248,9 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): + if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } req.SwapColumns(result.chk) @@ -273,6 +280,9 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): + if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } req.SwapColumns(result.chk) @@ -318,6 +328,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { close(e.joinChkResourceCh[i]) } e.joinChkResourceCh = nil + e.finished.Store(false) return e.baseExecutor.Close() } @@ -431,6 +442,8 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, + lookup: &e.IndexLookUpJoin, + memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1), }, taskCh: taskCh, joiner: e.joiners[workerID], @@ -440,6 +453,14 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask, joinKeyBuf: make([]byte, 1), outerRowStatus: make([]outerRowStatusFlag, 0, e.maxChunkSize), } + iw.memTracker.AttachTo(e.memTracker) + if len(copiedRanges) != 0 { + // We should not consume this memory usage in `iw.memTracker`. The + // memory usage of inner worker will be reset the end of iw.handleTask. + // While the life cycle of this memory consumption exists throughout the + // whole active period of inner worker. + e.ctx.GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges))) + } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual // inner worker to avoid data race when the inner workers is running @@ -583,6 +604,9 @@ func (iw *indexHashJoinInnerWorker) handleHashJoinInnerWorkerPanic(r interface{} } func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexHashJoinTask, joinResult *indexHashJoinResult, h hash.Hash64, resultCh chan *indexHashJoinResult) error { + defer func() { + iw.memTracker.Consume(-iw.memTracker.BytesConsumed()) + }() var joinStartTime time.Time if iw.stats != nil { start := time.Now() @@ -630,6 +654,9 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i select { case resultCh <- joinResult: case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) @@ -778,6 +805,9 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind select { case resultCh <- joinResult: case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 6f62fcff339cb..2c58838a60f97 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -81,7 +81,9 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. - stats *indexLookUpJoinRuntimeStats + stats *indexLookUpJoinRuntimeStats + ctxCancelReason atomic.Value + finished *atomic.Value } type outerCtx struct { @@ -142,11 +144,13 @@ type innerWorker struct { outerCtx outerCtx ctx sessionctx.Context executorChk *chunk.Chunk + lookup *IndexLookUpJoin indexRanges []*ranger.Range nextColCompareFilters *plannercore.ColWithCmpFuncManager keyOff2IdxOff []int stats *innerWorkerRuntimeStats + memTracker *memory.Tracker } // Open implements the Executor interface. @@ -158,6 +162,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) + e.finished.Store(false) if e.runtimeStats != nil { e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) @@ -219,6 +224,16 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork indexRanges: copiedRanges, keyOff2IdxOff: e.keyOff2IdxOff, stats: innerStats, + lookup: e, + memTracker: memory.NewTracker(memory.LabelForIndexJoinInnerWorker, -1), + } + iw.memTracker.AttachTo(e.memTracker) + if len(copiedRanges) != 0 { + // We should not consume this memory usage in `iw.memTracker`. The + // memory usage of inner worker will be reset the end of iw.handleTask. + // While the life cycle of this memory consumption exists throughout the + // whole active period of inner worker. + e.ctx.GetSessionVars().StmtCtx.MemTracker.Consume(2 * types.EstimatedMemUsage(copiedRanges[0].LowVal, len(copiedRanges))) } if e.lastColHelper != nil { // nextCwf.TmpConstant needs to be reset for every individual @@ -295,6 +310,9 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): + if err := e.ctxCancelReason.Load(); err != nil { + return nil, err.(error) + } return nil, ctx.Err() } if task == nil { @@ -307,6 +325,9 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): + if err := e.ctxCancelReason.Load(); err != nil { + return nil, err.(error) + } return nil, ctx.Err() } @@ -330,13 +351,16 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { defer trace.StartRegion(ctx, "IndexLookupJoinOuterWorker").End() defer func() { if r := recover(); r != nil { + ow.lookup.finished.Store(true) buf := make([]byte, 4096) stackSize := runtime.Stack(buf, false) buf = buf[:stackSize] logutil.Logger(ctx).Error("outerWorker panicked", zap.String("stack", string(buf))) task := &lookUpJoinTask{doneCh: make(chan error, 1)} - task.doneCh <- errors.Errorf("%v", r) - ow.pushToChan(ctx, task, ow.resultCh) + err := errors.Errorf("%v", r) + task.doneCh <- err + ow.lookup.ctxCancelReason.Store(err) + ow.lookup.cancelFunc() } close(ow.resultCh) close(ow.innerCh) @@ -447,12 +471,16 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { var task *lookUpJoinTask defer func() { if r := recover(); r != nil { + iw.lookup.finished.Store(true) buf := make([]byte, 4096) stackSize := runtime.Stack(buf, false) buf = buf[:stackSize] logutil.Logger(ctx).Error("innerWorker panicked", zap.String("stack", string(buf))) + err := errors.Errorf("%v", r) // "task != nil" is guaranteed when panic happened. - task.doneCh <- errors.Errorf("%v", r) + task.doneCh <- err + iw.lookup.ctxCancelReason.Store(err) + iw.lookup.cancelFunc() } wg.Done() }() @@ -486,6 +514,9 @@ func (iw *innerWorker) handleTask(ctx context.Context, task *lookUpJoinTask) err atomic.AddInt64(&iw.stats.totalTime, int64(time.Since(start))) }() } + defer func() { + iw.memTracker.Consume(-iw.memTracker.BytesConsumed()) + }() lookUpContents, err := iw.constructLookupContent(task) if err != nil { return err @@ -519,6 +550,9 @@ func (iw *innerWorker) constructLookupContent(task *lookUpJoinTask) ([]*indexJoi if err != nil { return nil, err } + if rowIdx == 0 { + iw.lookup.memTracker.Consume(types.EstimatedMemUsage(dLookUpKey, numRows)) + } if dHashKey == nil { // Append null to make looUpKeys the same length as outer Result. task.encodedLookUpKeys[chkIdx].AppendNull(0) @@ -643,7 +677,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start))) }() } - innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true) + innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true, iw.memTracker, iw.lookup.finished) if innerExec != nil { defer terror.Call(innerExec.Close) } @@ -657,6 +691,9 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa for { select { case <-ctx.Done(): + if err := iw.lookup.ctxCancelReason.Load(); err != nil { + return err.(error) + } return ctx.Err() default: } @@ -725,6 +762,7 @@ func (e *IndexLookUpJoin) Close() error { e.workerWg.Wait() e.memTracker = nil e.task = nil + e.finished.Store(false) return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index a16ffca0a771d..83133eb1f6a00 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -501,7 +501,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i] } } - imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false) + imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false, nil, nil) if imw.innerExec != nil { defer terror.Call(imw.innerExec.Close) } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 1f017ea3654d9..7493d29f64921 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1494,6 +1494,9 @@ func (ijHelper *indexJoinBuildHelper) buildTemplateRange(matchedKeyCnt int, eqAn if len(oneColumnRan) == 0 { return nil, true, nil } + if sc.MemTracker != nil { + sc.MemTracker.Consume(2 * types.EstimatedMemUsage(oneColumnRan[0].LowVal, len(oneColumnRan))) + } for _, ran := range ranges { ran.LowVal[i] = oneColumnRan[0].LowVal[0] ran.HighVal[i] = oneColumnRan[0].HighVal[0] @@ -1507,6 +1510,9 @@ func (ijHelper *indexJoinBuildHelper) buildTemplateRange(matchedKeyCnt int, eqAn newRange.HighVal[i] = oneColumnRan[ranIdx].HighVal[0] newRanges = append(newRanges, newRange) } + if sc.MemTracker != nil && len(newRanges) != 0 { + sc.MemTracker.Consume(2 * types.EstimatedMemUsage(newRanges[0].LowVal, len(newRanges))) + } ranges = append(ranges, newRanges...) } j++ diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 2525ee76c2e0a..52cd9f08e2de9 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -493,4 +493,8 @@ const ( LabelForSimpleTask int = -18 // LabelForCTEStorage represents the label of CTE storage LabelForCTEStorage int = -19 + // LabelForIndexJoinInnerWorker represents the label of IndexJoin InnerWorker + LabelForIndexJoinInnerWorker int = -20 + // LabelForIndexJoinOuterWorker represents the label of IndexJoin OuterWorker + LabelForIndexJoinOuterWorker int = -21 ) From 1a34d8168f14bcc2fcf2e8b71d21972db6a68779 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Tue, 30 Nov 2021 16:27:53 +0800 Subject: [PATCH 2/2] executor: send a task with error to the resultCh when panic happen (#30214) (#30240) --- executor/index_lookup_hash_join.go | 1 + executor/index_lookup_join.go | 3 +++ executor/join_test.go | 36 ++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 4665839c1ffea..26b7682e6a76d 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -336,6 +336,7 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End() defer close(ow.innerCh) for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { err = errors.New("mockIndexHashJoinOuterWorkerErr") diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 2c58838a60f97..3a0d5d10ca25c 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -26,6 +26,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" @@ -359,6 +360,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { task := &lookUpJoinTask{doneCh: make(chan error, 1)} err := errors.Errorf("%v", r) task.doneCh <- err + ow.pushToChan(ctx, task, ow.resultCh) ow.lookup.ctxCancelReason.Store(err) ow.lookup.cancelFunc() } @@ -367,6 +369,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }() for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err diff --git a/executor/join_test.go b/executor/join_test.go index a5ca48e7b1976..26b80c08827fc 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2609,3 +2609,39 @@ func (s *testSuiteJoinSerial) TestIssue25902(c *C) { tk.MustQuery("select * from tt1 where ts in (select ts from tt2);").Check(testkit.Rows()) tk.MustExec("set @@session.time_zone = @tmp;") } + +func (s *testSuiteJoinSerial) TestIssue30211(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, index(a));") + func() { + fpName := "github.com/pingcap/tidb/executor/TestIssue30211" + c.Assert(failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + }() + tk.MustExec("insert into t1 values(1),(2);") + tk.MustExec("insert into t2 values(1),(1),(2),(2);") + tk.MustExec("set @@tidb_mem_quota_query=8000;") + tk.MustExec("set tidb_index_join_batch_size = 1;") + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) +}