From 9af0f035065d6456307dc298eb776087522354e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 22 Aug 2022 12:28:53 +0800 Subject: [PATCH 1/4] *: only create snapshot interceptor for temporary table when needed (#37225) close pingcap/tidb#37223 --- executor/temporary_table_test.go | 18 ++++- infoschema/builder.go | 19 +++++ infoschema/infoschema.go | 15 ++++ infoschema/infoschema_test.go | 124 +++++++++++++++++++++++++++++++ sessiontxn/isolation/base.go | 10 ++- sessiontxn/staleread/provider.go | 5 +- table/temptable/interceptor.go | 4 + 7 files changed, 192 insertions(+), 3 deletions(-) diff --git a/executor/temporary_table_test.go b/executor/temporary_table_test.go index a6c0b0f4c0736..ebc8aff0bae95 100644 --- a/executor/temporary_table_test.go +++ b/executor/temporary_table_test.go @@ -30,12 +30,29 @@ func TestTemporaryTableNoNetwork(t *testing.T) { t.Run("global", func(t *testing.T) { assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) { tk.MustExec("create global temporary table tmp_t (id int primary key, a int, b int, index(a)) on commit delete rows") + tk.MustExec("begin") + }) + }) + + t.Run("global create and then truncate", func(t *testing.T) { + assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) { + tk.MustExec("create global temporary table tmp_t (id int primary key, a int, b int, index(a)) on commit delete rows") + tk.MustExec("truncate table tmp_t") + tk.MustExec("begin") }) }) t.Run("local", func(t *testing.T) { assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) { tk.MustExec("create temporary table tmp_t (id int primary key, a int, b int, index(a))") + tk.MustExec("begin") + }) + }) + + t.Run("local and create table inside txn", func(t *testing.T) { + assertTemporaryTableNoNetwork(t, func(tk *testkit.TestKit) { + tk.MustExec("begin") + tk.MustExec("create temporary table tmp_t (id int primary key, a int, b int, index(a))") }) }) } @@ -61,7 +78,6 @@ func assertTemporaryTableNoNetwork(t *testing.T, createTable func(*testkit.TestK require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy")) }() - tk.MustExec("begin") tk.MustExec("insert into tmp_t values (1, 1, 1)") tk.MustExec("insert into tmp_t values (2, 2, 2)") diff --git a/infoschema/builder.go b/infoschema/builder.go index 9ac3a203151a4..ff3366bc25149 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -697,6 +697,10 @@ func (b *Builder) applyCreateTable(m *meta.Meta, dbInfo *model.DBInfo, tableID i }) b.is.sortedTablesBuckets[bucketIdx] = sortedTbls + if tblInfo.TempTableType != model.TempTableNone { + b.addTemporaryTable(tableID) + } + newTbl, ok := b.is.TableByID(tableID) if ok { dbInfo.Tables = append(dbInfo.Tables, newTbl.Meta()) @@ -750,6 +754,11 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ // Remove the table in sorted table slice. b.is.sortedTablesBuckets[bucketIdx] = append(sortedTbls[0:idx], sortedTbls[idx+1:]...) + // Remove the table in temporaryTables + if b.is.temporaryTableIDs != nil { + delete(b.is.temporaryTableIDs, tableID) + } + // The old DBInfo still holds a reference to old table info, we need to remove it. for i, tblInfo := range dbInfo.Tables { if tblInfo.ID == tableID { @@ -895,10 +904,20 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF schTbls.tables[t.Name.L] = tbl sortedTbls := b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] b.is.sortedTablesBuckets[tableBucketIdx(t.ID)] = append(sortedTbls, tbl) + if tblInfo := tbl.Meta(); tblInfo.TempTableType != model.TempTableNone { + b.addTemporaryTable(tblInfo.ID) + } } return nil } +func (b *Builder) addTemporaryTable(tblID int64) { + if b.is.temporaryTableIDs == nil { + b.is.temporaryTableIDs = make(map[int64]struct{}) + } + b.is.temporaryTableIDs[tblID] = struct{}{} +} + type virtualTableDriver struct { *model.DBInfo TableFromMeta tableFromMetaFunc diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 04dc3c4395284..5fccf0e60558e 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -60,6 +60,8 @@ type InfoSchema interface { AllPlacementBundles() []*placement.Bundle // AllPlacementPolicies returns all placement policies AllPlacementPolicies() []*model.PolicyInfo + // HasTemporaryTable returns whether information schema has temporary table + HasTemporaryTable() bool } type sortedTables []table.Table @@ -94,6 +96,9 @@ type infoSchema struct { // sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount). sortedTablesBuckets []sortedTables + // temporaryTables stores the temporary table ids + temporaryTableIDs map[int64]struct{} + // schemaMetaVersion is the version of schema, and we should check version when change schema. schemaMetaVersion int64 } @@ -302,6 +307,11 @@ func (is *infoSchema) FindTableByPartitionID(partitionID int64) (table.Table, *m return nil, nil, nil } +// HasTemporaryTable returns whether information schema has temporary table +func (is *infoSchema) HasTemporaryTable() bool { + return len(is.temporaryTableIDs) != 0 +} + func (is *infoSchema) Clone() (result []*model.DBInfo) { for _, v := range is.schemaMap { result = append(result, v.dbInfo.Clone()) @@ -567,3 +577,8 @@ func (ts *TemporaryTableAttachedInfoSchema) SchemaByTable(tableInfo *model.Table return ts.InfoSchema.SchemaByTable(tableInfo) } + +// HasTemporaryTable returns whether information schema has temporary table +func (ts *TemporaryTableAttachedInfoSchema) HasTemporaryTable() bool { + return ts.LocalTemporaryTables.Count() > 0 || ts.InfoSchema.HasTemporaryTable() +} diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index c7629f65f113e..27a6aadd497cc 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -315,6 +315,130 @@ func genGlobalID(store kv.Storage) (int64, error) { return globalID, errors.Trace(err) } +func TestBuildSchemaWithGlobalTemporaryTable(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + is := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema) + require.False(t, is.HasTemporaryTable()) + db, ok := is.SchemaByName(model.NewCIStr("test")) + require.True(t, ok) + + doChange := func(changes ...func(m *meta.Meta, builder *infoschema.Builder)) infoschema.InfoSchema { + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{db}, nil, is.SchemaMetaVersion()) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + err = kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + for _, change := range changes { + change(m, builder) + } + return nil + }) + require.NoError(t, err) + return builder.Build() + } + + createGlobalTemporaryTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) { + return func(m *meta.Meta, builder *infoschema.Builder) { + err := m.CreateTableOrView(db.ID, &model.TableInfo{ + ID: tblID, + TempTableType: model.TempTableGlobal, + State: model.StatePublic, + }) + require.NoError(t, err) + _, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: db.ID, TableID: tblID}) + require.NoError(t, err) + } + } + + dropTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) { + return func(m *meta.Meta, builder *infoschema.Builder) { + err := m.DropTableOrView(db.ID, tblID) + require.NoError(t, err) + _, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionDropTable, SchemaID: db.ID, TableID: tblID}) + require.NoError(t, err) + } + } + + truncateGlobalTemporaryTableChange := func(tblID, newTblID int64) func(m *meta.Meta, builder *infoschema.Builder) { + return func(m *meta.Meta, builder *infoschema.Builder) { + err := m.DropTableOrView(db.ID, tblID) + require.NoError(t, err) + + err = m.CreateTableOrView(db.ID, &model.TableInfo{ + ID: newTblID, + TempTableType: model.TempTableGlobal, + State: model.StatePublic, + }) + require.NoError(t, err) + _, err = builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionTruncateTable, SchemaID: db.ID, OldTableID: tblID, TableID: newTblID}) + require.NoError(t, err) + } + } + + alterTableChange := func(tblID int64) func(m *meta.Meta, builder *infoschema.Builder) { + return func(m *meta.Meta, builder *infoschema.Builder) { + _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionAddColumn, SchemaID: db.ID, TableID: tblID}) + require.NoError(t, err) + } + } + + // create table + tbID, err := genGlobalID(store) + require.NoError(t, err) + newIS := doChange( + createGlobalTemporaryTableChange(tbID), + ) + require.True(t, newIS.HasTemporaryTable()) + + // full load + newDB, ok := newIS.SchemaByName(model.NewCIStr("test")) + require.True(t, ok) + builder, err := infoschema.NewBuilder(store, nil).InitWithDBInfos([]*model.DBInfo{newDB}, newIS.AllPlacementPolicies(), newIS.SchemaMetaVersion()) + require.NoError(t, err) + require.True(t, builder.Build().HasTemporaryTable()) + + // create and then drop + tbID, err = genGlobalID(store) + require.NoError(t, err) + require.False(t, doChange( + createGlobalTemporaryTableChange(tbID), + dropTableChange(tbID), + ).HasTemporaryTable()) + + // create and then alter + tbID, err = genGlobalID(store) + require.NoError(t, err) + require.True(t, doChange( + createGlobalTemporaryTableChange(tbID), + alterTableChange(tbID), + ).HasTemporaryTable()) + + // create and truncate + tbID, err = genGlobalID(store) + require.NoError(t, err) + newTbID, err := genGlobalID(store) + require.NoError(t, err) + require.True(t, doChange( + createGlobalTemporaryTableChange(tbID), + truncateGlobalTemporaryTableChange(tbID, newTbID), + ).HasTemporaryTable()) + + // create two and drop one + tbID, err = genGlobalID(store) + require.NoError(t, err) + tbID2, err := genGlobalID(store) + require.NoError(t, err) + require.True(t, doChange( + createGlobalTemporaryTableChange(tbID), + createGlobalTemporaryTableChange(tbID2), + dropTableChange(tbID), + ).HasTemporaryTable()) +} + func TestBuildBundle(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 48cfdfc54afe6..a363809c7cc31 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -202,6 +202,11 @@ func (p *baseTxnContextProvider) OnStmtRetry(ctx context.Context) error { func (p *baseTxnContextProvider) OnLocalTemporaryTableCreated() { p.infoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, p.infoSchema) p.sctx.GetSessionVars().TxnCtx.InfoSchema = p.infoSchema + if p.txn != nil && p.txn.Valid() { + if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil { + p.txn.SetOption(kv.SnapInterceptor, interceptor) + } + } } // OnStmtErrorForNextAction is the hook that should be called when a new statement get an error @@ -258,7 +263,10 @@ func (p *baseTxnContextProvider) ActivateTxn() (kv.Transaction, error) { if readReplicaType.IsFollowerRead() { txn.SetOption(kv.ReplicaRead, readReplicaType) } - txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema)) + + if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema); interceptor != nil { + txn.SetOption(kv.SnapInterceptor, interceptor) + } if sessVars.StmtCtx.WeakConsistency { txn.SetOption(kv.IsolationLevel, kv.RC) diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 93638cbaa04cb..4d0eb4a1b9619 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -124,7 +124,10 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error { TxnScope: txnScope, }, } - txn.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(p.sctx, is)) + + if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil { + txn.SetOption(kv.SnapInterceptor, interceptor) + } p.is = is err = p.sctx.GetSessionVars().SetSystemVar(variable.TiDBSnapshot, "") diff --git a/table/temptable/interceptor.go b/table/temptable/interceptor.go index e260c7628edde..f5059c1730050 100644 --- a/table/temptable/interceptor.go +++ b/table/temptable/interceptor.go @@ -41,6 +41,10 @@ type TemporaryTableSnapshotInterceptor struct { // SessionSnapshotInterceptor creates a new snapshot interceptor for temporary table data fetch func SessionSnapshotInterceptor(sctx sessionctx.Context, is infoschema.InfoSchema) kv.SnapshotInterceptor { + if !is.HasTemporaryTable() { + return nil + } + return NewTemporaryTableSnapshotInterceptor( is, getSessionData(sctx), From aaf0613b7387276c26d83abcf683ec6f5fbef1e1 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 22 Aug 2022 14:36:20 +0800 Subject: [PATCH 2/4] doc: add design doc for stats lru cache (#36804) ref pingcap/tidb#34052 --- docs/design/2022-08-02-stats-lru-cache.md | 71 +++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 docs/design/2022-08-02-stats-lru-cache.md diff --git a/docs/design/2022-08-02-stats-lru-cache.md b/docs/design/2022-08-02-stats-lru-cache.md new file mode 100644 index 0000000000000..8e5ecfadd5130 --- /dev/null +++ b/docs/design/2022-08-02-stats-lru-cache.md @@ -0,0 +1,71 @@ +# Proposal: Stats LRU Cache + +- Author(s): [Yisaer](https://github.com/yisaer) +- Tracking Issue: https://github.com/pingcap/tidb/issues/34052 + +## Abstract + +This proposes a design of how to maintain stats cache by lru according to memory usage + +## Background + +Previously, tidb maintained the all the indices' stats and some columns' stats which is needed during query. +As the maintained stats grows, the total memory usage of the stats will increase and makes tidb server OOM. +Thus we use lru to maintain stats cache in order to keep memory safe. + +### Goal + +- Use LRU to maintain the stats cache in memory +- Keep the total memory usage of stats cache under the quota +- Support loading stats back into memory when tidb server needs it + +### Non-Goals + +- Considering the stats cache are in memory, we don't provide changing stats cache into LRU without restarting + +## Proposal + +### Stats Cache Interface + +We will provide a Stats Cache Interface which is implemented by LRU Cache and Map Cache. +If the tidb server didn't enable Stats LRU Cache, it will use Map Cache by default. Also, we will provide config and global session variable to control whether enable Stats LRU Cache and the capacity of it. + +```go +// statsCacheInner is the interface to manage the statsCache, it can be implemented by map, lru cache or other structures. +type statsCacheInner interface { + Get(int64) (*statistics.Table, bool) + Put(int64, *statistics.Table) + Del(int64) + Cost() int64 + Len() int + SetCapacity(int64) +} +``` + +### Stats LRU Cache Policy + +For column or index stats, we maintained following data structure for stats: + +- `Histogram` +- `TopN` +- `CMSketch` + +And we will also maintain status for each column and index stats in order to indicate its stats loading status like following: + +- `AllLoaded` +- `OnlyCMSEvcited` +- `OnlyHistRemained` +- `AllEvicted` + +When the column or index stats load all data structures into memory, the status will be `AllLoaded`. +When the Stats LRU Cache memory usage exceeds the quota, the LRU Cache will select one column or index stats to evict the data structures by following rules to reduce the memory usage: + +- If the status is `AllLoaded`, it will discard the `CMSketch` and the status will be changed into `OnlyCMSEvcited` +- If the status is `OnlyCMSEvcited`, it will discard the `TopN` and the status will be changed into `OnlyHistRemained` +- If the status is `OnlyHistRemained`, it will discard the `Histogram` and the status will be changed into `AllEvicted` + +### Sync Stats Compatibility + +Previously tidb server has Sync Stats and asynchronously Loading Histograms in order to load column stats into memory during query. +As Stats LRU Cache Policy may evict index stats in memory, we also need Sync Stats and asynchronously Loading Histograms to support loading index stats according to its loading status to keep compatible. +During the query optimization, tidb server will collect the used columns and indices, if their stats are not fully loaded, tidb-server will try to load their stats back into the memory. From 4cf7eeee64df15ec24e56b92e95bfcaba181cbd9 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 22 Aug 2022 15:24:20 +0800 Subject: [PATCH 3/4] planner: add warn log for sync stats (#36956) --- executor/adapter.go | 8 ++++ executor/executor.go | 1 + planner/core/plan_stats.go | 9 ++++ planner/core/plan_stats_test.go | 20 +++++++-- sessionctx/stmtctx/stmtctx.go | 33 +++++++++++++- sessionctx/variable/session.go | 4 ++ sessionctx/variable/session_test.go | 1 + sessionctx/variable/tidb_vars.go | 2 +- statistics/handle/handle.go | 2 +- statistics/handle/handle_hist.go | 62 ++++++++++++++++----------- statistics/handle/handle_hist_test.go | 4 +- 11 files changed, 114 insertions(+), 32 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 6ee10da05f93a..4a648f4029c42 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1296,7 +1296,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { ExecRetryCount: a.retryCount, IsExplicitTxn: sessVars.TxnCtx.IsExplicit, IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0, + IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed, } + failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) { + if val.(bool) { + if !slowItems.IsSyncStatsFailed { + panic("isSyncStatsFailed should be true") + } + } + }) if a.retryCount > 0 { slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime) } diff --git a/executor/executor.go b/executor/executor.go index 678eb3d39c0f2..fc177713d70c9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.EnableOptimizeTrace = false sc.OptimizeTracer = nil sc.OptimizerCETrace = nil + sc.IsSyncStatsFailed = false sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow diff --git a/planner/core/plan_stats.go b/planner/core/plan_stats.go index 4fbf5e47720b9..4a8e38b01aa82 100644 --- a/planner/core/plan_stats.go +++ b/planner/core/plan_stats.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" + "go.uber.org/zap" ) type collectPredicateColumnsPoint struct{} @@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl if len(predicateColumns) > 0 { plan.SCtx().UpdateColStatsUsage(predicateColumns) } + if !histNeeded { + return plan, nil + } histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns) histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices) if histNeeded && len(histNeededItems) > 0 { @@ -85,6 +90,8 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI var timeout = time.Duration(waitTime) err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout) if err != nil { + logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err)) + stmtCtx.IsSyncStatsFailed = true return handleTimeout(stmtCtx) } return nil @@ -100,6 +107,8 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) { if success { return true, nil } + logutil.BgLogger().Warn("SyncWaitStatsLoad failed") + stmtCtx.IsSyncStatsFailed = true err := handleTimeout(stmtCtx) return false, err } diff --git a/planner/core/plan_stats_test.go b/planner/core/plan_stats_test.go index 84678bc7ca6c2..8f23858cdabc4 100644 --- a/planner/core/plan_stats_test.go +++ b/planner/core/plan_stats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/parser" @@ -27,7 +28,9 @@ import ( "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -259,15 +262,26 @@ func TestPlanStatsLoadTimeout(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false} - resultCh := make(chan model.TableItemID, 1) + resultCh := make(chan stmtctx.StatsLoadResult, 1) timeout := time.Duration(1<<63 - 1) - dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full - stmt, err := p.ParseOneStmt("select * from t where c>1", "", "") + task := &handle.NeededItemTask{ + TableItemID: neededColumn, + ResultCh: resultCh, + ToTimeout: time.Now().Local().Add(timeout), + } + dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full + sql := "select * from t where c>1" + stmt, err := p.ParseOneStmt(sql, "", "") require.NoError(t, err) tk.MustExec("set global tidb_stats_load_pseudo_timeout=false") _, _, err = planner.Optimize(context.TODO(), ctx, stmt, is) require.Error(t, err) // fail sql for timeout when pseudo=false + tk.MustExec("set global tidb_stats_load_pseudo_timeout=true") + require.NoError(t, failpoint.Enable("github.com/pingcap/executor/assertSyncStatsFailed", `return(true)`)) + tk.MustExec(sql) // not fail sql for timeout when pseudo=true + failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed") + plan, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) require.NoError(t, err) // not fail sql for timeout when pseudo=true switch pp := plan.(type) { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index a13b80a78ae69..98f2c6b27a244 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -15,6 +15,7 @@ package stmtctx import ( + "bytes" "encoding/json" "math" "strconv" @@ -293,7 +294,7 @@ type StatementContext struct { // NeededItems stores the columns/indices whose stats are needed for planner. NeededItems []model.TableItemID // ResultCh to receive stats loading results - ResultCh chan model.TableItemID + ResultCh chan StatsLoadResult // Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple. Fallback bool // LoadStartTime is to record the load start time to calculate latency @@ -310,6 +311,9 @@ type StatementContext struct { IsSQLRegistered atomic2.Bool // IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL. IsSQLAndPlanRegistered atomic2.Bool + + // IsSyncStatsFailed indicates whether any failure happened during sync stats + IsSyncStatsFailed bool } // StmtHints are SessionVars related sql hints. @@ -1014,3 +1018,30 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) { fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress)) return fields } + +// StatsLoadResult indicates result for StatsLoad +type StatsLoadResult struct { + Item model.TableItemID + Error error +} + +// HasError returns whether result has error +func (r StatsLoadResult) HasError() bool { + return r.Error != nil +} + +// ErrorMsg returns StatsLoadResult err msg +func (r StatsLoadResult) ErrorMsg() string { + if r.Error == nil { + return "" + } + b := bytes.NewBufferString("tableID:") + b.WriteString(strconv.FormatInt(r.Item.TableID, 10)) + b.WriteString(", id:") + b.WriteString(strconv.FormatInt(r.Item.ID, 10)) + b.WriteString(", isIndex:") + b.WriteString(strconv.FormatBool(r.Item.IsIndex)) + b.WriteString(", err:") + b.WriteString(r.Error.Error()) + return b.String() +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a9bdcb8a605c0..4a03cc331773d 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -2526,6 +2526,8 @@ const ( SlowLogIsExplicitTxn = "IsExplicitTxn" // SlowLogIsWriteCacheTable is used to indicate whether writing to the cache table need to wait for the read lock to expire. SlowLogIsWriteCacheTable = "IsWriteCacheTable" + // SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats + SlowLogIsSyncStatsFailed = "IsSyncStatsFailed" ) // GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary. @@ -2568,6 +2570,7 @@ type SlowQueryLogItems struct { ResultRows int64 IsExplicitTxn bool IsWriteCacheTable bool + IsSyncStatsFailed bool } // SlowLogFormat uses for formatting slow log. @@ -2732,6 +2735,7 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { writeSlowLogItem(&buf, SlowLogResultRows, strconv.FormatInt(logItems.ResultRows, 10)) writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ)) writeSlowLogItem(&buf, SlowLogIsExplicitTxn, strconv.FormatBool(logItems.IsExplicitTxn)) + writeSlowLogItem(&buf, SlowLogIsSyncStatsFailed, strconv.FormatBool(logItems.IsSyncStatsFailed)) if s.StmtCtx.WaitLockLeaseTime > 0 { writeSlowLogItem(&buf, SlowLogIsWriteCacheTable, strconv.FormatBool(logItems.IsWriteCacheTable)) } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 0776dd6083fae..5e47a9da45197 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -232,6 +232,7 @@ func TestSlowLogFormat(t *testing.T) { # Result_rows: 12345 # Succ: true # IsExplicitTxn: true +# IsSyncStatsFailed: false # IsWriteCacheTable: true` sql := "select * from t;" _, digest := parser.NormalizeDigest(sql) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e1bae81b229d5..dbd38bd774ba9 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -971,7 +971,7 @@ const ( DefTiDBPersistAnalyzeOptions = true DefTiDBEnableColumnTracking = false DefTiDBStatsLoadSyncWait = 0 - DefTiDBStatsLoadPseudoTimeout = false + DefTiDBStatsLoadPseudoTimeout = true DefSysdateIsNow = false DefTiDBEnableMutationChecker = false DefTiDBTxnAssertionLevel = AssertionOffStr diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index fdf592f364610..6c5ca30353176 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{} + handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} err := handle.RefreshVars() if err != nil { return nil, err diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index 44423794fbbfa..8e87308e22c64 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -46,14 +46,14 @@ type StatsLoad struct { SubCtxs []sessionctx.Context NeededItemsCh chan *NeededItemTask TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan model.TableItemID + WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult } // NeededItemTask represents one needed column/indices with expire time. type NeededItemTask struct { TableItemID model.TableItemID ToTimeout time.Time - ResultCh chan model.TableItemID + ResultCh chan stmtctx.StatsLoadResult } // SendLoadRequests send neededColumns requests @@ -64,9 +64,14 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems } sc.StatsLoad.Timeout = timeout sc.StatsLoad.NeededItems = remainedItems - sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems)) - for _, col := range remainedItems { - err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout) + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + for _, item := range remainedItems { + task := &NeededItemTask{ + TableItemID: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + err := h.AppendNeededItem(task, timeout) if err != nil { return err } @@ -80,7 +85,12 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { if len(sc.StatsLoad.NeededItems) <= 0 { return true } + var errorMsgs []string defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } sc.StatsLoad.NeededItems = nil }() resultCheckMap := map[model.TableItemID]struct{}{} @@ -94,7 +104,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { select { case result, ok := <-sc.StatsLoad.ResultCh: if ok { - delete(resultCheckMap, result) + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) if len(resultCheckMap) == 0 { metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) return true @@ -104,6 +117,7 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { } case <-timer.C: metrics.SyncLoadTimeoutCounter.Inc() + logutil.BgLogger().Warn("SyncWaitStatsLoad timeout") return false } } @@ -134,9 +148,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode } // AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one. -func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error { - toTimout := time.Now().Local().Add(timeout) - task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh} +func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout) } @@ -202,11 +214,12 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC } func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) { - item := task.TableItemID + result := stmtctx.StatsLoadResult{Item: task.TableItemID} + item := result.Item oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } var err error @@ -214,22 +227,22 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.col = col } // to avoid duplicated handling in concurrent scenario - working := h.setWorking(task.TableItemID, task.ResultCh) + working := h.setWorking(result.Item, task.ResultCh) if !working { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } // refresh statsReader to get latest stats @@ -238,6 +251,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC needUpdate := false wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader) if err != nil { + result.Error = err return task, err } if item.IsIndex { @@ -251,9 +265,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) } - h.finishWorking(item) + h.finishWorking(result) return nil, nil } @@ -425,7 +439,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede } // writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) { +func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) @@ -466,7 +480,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery())) } -func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool { +func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { h.StatsLoad.Lock() defer h.StatsLoad.Unlock() chList, ok := h.StatsLoad.WorkingColMap[item] @@ -477,20 +491,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) return false } - chList = []chan model.TableItemID{} + chList = []chan stmtctx.StatsLoadResult{} chList = append(chList, resultCh) h.StatsLoad.WorkingColMap[item] = chList return true } -func (h *Handle) finishWorking(item model.TableItemID) { +func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) { h.StatsLoad.Lock() defer h.StatsLoad.Unlock() - if chList, ok := h.StatsLoad.WorkingColMap[item]; ok { + if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { list := chList[1:] for _, ch := range list { - h.writeToResultChan(ch, item) + h.writeToResultChan(ch, result) } } - delete(h.StatsLoad.WorkingColMap, item) + delete(h.StatsLoad.WorkingColMap, result.Item) } diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index c3f16503e3b27..6ac047ac99427 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -221,10 +221,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh require.True(t, ok1) - require.Equal(t, neededColumns[0], rs1) + require.Equal(t, neededColumns[0], rs1.Item) rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh require.True(t, ok2) - require.Equal(t, neededColumns[0], rs2) + require.Equal(t, neededColumns[0], rs2.Item) stat = h.GetTableStats(tableInfo) hg = stat.Columns[tableInfo.Columns[2].ID].Histogram From e0da196b414667a80d8432a7ad433b2219e71f03 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Mon, 22 Aug 2022 15:48:20 +0800 Subject: [PATCH 4/4] planner: support HashJoin cost detail (#37012) ref pingcap/tidb#36962 --- planner/core/plan_cost.go | 25 ++- planner/core/plan_cost_detail.go | 215 ++++++++++++++++++++++++++ planner/core/plan_cost_detail_test.go | 74 +++++---- planner/core/task.go | 6 +- planner/implementation/join.go | 2 +- util/tracing/opt_trace.go | 28 ++-- 6 files changed, 296 insertions(+), 54 deletions(-) diff --git a/planner/core/plan_cost.go b/planner/core/plan_cost.go index a2afe704fc23e..93d5fc88a659a 100644 --- a/planner/core/plan_cost.go +++ b/planner/core/plan_cost.go @@ -928,7 +928,7 @@ func (p *PhysicalMergeJoin) GetPlanCost(taskType property.TaskType, option *Plan } // GetCost computes cost of hash join operator itself. -func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint64) float64 { +func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint64, op *physicalOptimizeOp) float64 { buildCnt, probeCnt := lCnt, rCnt build := p.children[0] // Taking the right as the inner for right join or using the outer to build a hash table. @@ -946,9 +946,13 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint if isMPP && p.ctx.GetSessionVars().CostModelVersion == modelVer2 { cpuFactor = sessVars.GetTiFlashCPUFactor() // use the dedicated TiFlash CPU Factor on modelVer2 } + diskFactor := sessVars.GetDiskFactor() + memoryFactor := sessVars.GetMemoryFactor() + concurrencyFactor := sessVars.GetConcurrencyFactor() + cpuCost := buildCnt * cpuFactor - memoryCost := buildCnt * sessVars.GetMemoryFactor() - diskCost := buildCnt * sessVars.GetDiskFactor() * rowSize + memoryCost := buildCnt * memoryFactor + diskCost := buildCnt * diskFactor * rowSize // Number of matched row pairs regarding the equal join conditions. helper := &fullJoinRowCountHelper{ cartesian: false, @@ -982,7 +986,7 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint // Cost of querying hash table is cheap actually, so we just compute the cost of // evaluating `OtherConditions` and joining row pairs. probeCost := numPairs * cpuFactor - probeDiskCost := numPairs * sessVars.GetDiskFactor() * rowSize + probeDiskCost := numPairs * diskFactor * rowSize // Cost of evaluating outer filter. if len(p.LeftConditions)+len(p.RightConditions) > 0 { // Input outer count for the above compution should be adjusted by SelectionFactor. @@ -993,7 +997,7 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint diskCost += probeDiskCost probeCost /= float64(p.Concurrency) // Cost of additional concurrent goroutines. - cpuCost += probeCost + float64(p.Concurrency+1)*sessVars.GetConcurrencyFactor() + cpuCost += probeCost + float64(p.Concurrency+1)*concurrencyFactor // Cost of traveling the hash table to resolve missing matched cases when building the hash table from the outer table if p.UseOuterToBuild { if spill { @@ -1002,7 +1006,7 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint } else { cpuCost += buildCnt * cpuFactor / float64(p.Concurrency) } - diskCost += buildCnt * sessVars.GetDiskFactor() * rowSize + diskCost += buildCnt * diskFactor * rowSize } if spill { @@ -1010,6 +1014,12 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint } else { diskCost = 0 } + if op != nil { + setPhysicalHashJoinCostDetail(p, op, spill, buildCnt, probeCnt, cpuFactor, rowSize, numPairs, + cpuCost, probeCost, memoryCost, diskCost, probeDiskCost, + diskFactor, memoryFactor, concurrencyFactor, + memQuota) + } return cpuCost + memoryCost + diskCost } @@ -1027,7 +1037,8 @@ func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, option *PlanC } p.planCost += childCost } - p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), taskType == property.MppTaskType, costFlag) + p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), + taskType == property.MppTaskType, costFlag, option.tracer) p.planCostInit = true return p.planCost, nil } diff --git a/planner/core/plan_cost_detail.go b/planner/core/plan_cost_detail.go index 6dab5cf9cc21a..08c701d7ad3a3 100644 --- a/planner/core/plan_cost_detail.go +++ b/planner/core/plan_cost_detail.go @@ -26,6 +26,12 @@ const ( RowCountLbl = "rowCount" // RowSizeLbl indicates rowSize RowSizeLbl = "rowSize" + // BuildRowCountLbl indicates rowCount on build side + BuildRowCountLbl = "buildRowCount" + // ProbeRowCountLbl indicates rowCount on probe side + ProbeRowCountLbl = "probeRowCount" + // NumPairsLbl indicates numPairs + NumPairsLbl = "numPairs" // NetworkFactorLbl indicates networkFactor NetworkFactorLbl = "networkFactor" @@ -33,9 +39,21 @@ const ( SeekFactorLbl = "seekFactor" // ScanFactorLbl indicates for scanFactor ScanFactorLbl = "scanFactor" + // SelectionFactorLbl indicates selection factor + SelectionFactorLbl = "selectionFactor" + // CPUFactorLbl indicates cpu factor + CPUFactorLbl = "cpuFactor" + // MemoryFactorLbl indicates mem factor + MemoryFactorLbl = "memoryFactor" + // DiskFactorLbl indicates disk factor + DiskFactorLbl = "diskFactor" + // ConcurrencyFactorLbl indicates for concurrency factor + ConcurrencyFactorLbl = "concurrencyFactor" // ScanConcurrencyLbl indicates sql scan concurrency ScanConcurrencyLbl = "scanConcurrency" + // HashJoinConcurrencyLbl indicates concurrency for hash join + HashJoinConcurrencyLbl = "hashJoinConcurrency" // NetSeekCostLbl indicates netSeek cost NetSeekCostLbl = "netSeekCost" @@ -43,6 +61,30 @@ const ( TablePlanCostLbl = "tablePlanCost" // IndexPlanCostLbl indicates indexPlan cost IndexPlanCostLbl = "indexPlanCost" + + // ProbeCostDetailLbl indicates probeCost + ProbeCostDetailLbl = "probeCostDetail" + // ProbeCostDescLbl indicates description for probe cost + ProbeCostDescLbl = "probeCostDesc" + // CPUCostDetailLbl indicates cpuCost detail + CPUCostDetailLbl = "cpuCostDetail" + // CPUCostDescLbl indicates description for cpu cost + CPUCostDescLbl = "cpuCostDesc" + // MemCostDetailLbl indicates mem cost detail + MemCostDetailLbl = "memCostDetail" + // MemCostDescLbl indicates description for mem cost + MemCostDescLbl = "memCostDesc" + // DiskCostDetailLbl indicates disk cost detail + DiskCostDetailLbl = "diskCostDetail" + // DiskCostDescLbl indicates description for disk cost + DiskCostDescLbl = "diskCostDesc" + // ProbeDiskCostLbl indicates probe disk cost detail + ProbeDiskCostLbl = "probeDiskCostDetail" + // ProbeDiskCostDescLbl indicates description for probe disk cost + ProbeDiskCostDescLbl = "probeDiskCostDesc" + + // MemQuotaLbl indicates memory quota + MemQuotaLbl = "memQuota" ) func setPointGetPlanCostDetail(p *PointGetPlan, opt *physicalOptimizeOp, @@ -134,3 +176,176 @@ func setPhysicalIndexReaderCostDetail(p *PhysicalIndexReader, opt *physicalOptim RowCountLbl, RowSizeLbl, NetworkFactorLbl, NetSeekCostLbl, ScanConcurrencyLbl)) opt.appendPlanCostDetail(detail) } + +func setPhysicalHashJoinCostDetail(p *PhysicalHashJoin, opt *physicalOptimizeOp, spill bool, + buildCnt, probeCnt, cpuFactor, rowSize, numPairs, + cpuCost, probeCPUCost, memCost, diskCost, probeDiskCost, + diskFactor, memoryFactor, concurrencyFactor float64, + memQuota int64) { + if opt == nil { + return + } + detail := tracing.NewPhysicalPlanCostDetail(p.ID(), p.TP()) + diskCostDetail := &HashJoinDiskCostDetail{ + Spill: spill, + UseOuterToBuild: p.UseOuterToBuild, + BuildRowCount: buildCnt, + DiskFactor: diskFactor, + RowSize: rowSize, + ProbeDiskCost: &HashJoinProbeDiskCostDetail{ + SelectionFactor: SelectionFactor, + NumPairs: numPairs, + HasConditions: len(p.LeftConditions)+len(p.RightConditions) > 0, + Cost: probeDiskCost, + }, + Cost: diskCost, + } + memoryCostDetail := &HashJoinMemoryCostDetail{ + Spill: spill, + MemQuota: memQuota, + RowSize: rowSize, + BuildRowCount: buildCnt, + MemoryFactor: memoryFactor, + Cost: memCost, + } + cpuCostDetail := &HashJoinCPUCostDetail{ + BuildRowCount: buildCnt, + CPUFactor: cpuFactor, + ConcurrencyFactor: concurrencyFactor, + ProbeCost: &HashJoinProbeCostDetail{ + NumPairs: numPairs, + HasConditions: len(p.LeftConditions)+len(p.RightConditions) > 0, + SelectionFactor: SelectionFactor, + ProbeRowCount: probeCnt, + Cost: probeCPUCost, + }, + HashJoinConcurrency: p.Concurrency, + Spill: spill, + Cost: cpuCost, + UseOuterToBuild: p.UseOuterToBuild, + } + + // record cpu cost detail + detail.AddParam(CPUCostDetailLbl, cpuCostDetail). + AddParam(CPUCostDescLbl, cpuCostDetail.desc()). + AddParam(ProbeCostDescLbl, cpuCostDetail.probeCostDesc()) + // record memory cost detail + detail.AddParam(MemCostDetailLbl, memoryCostDetail). + AddParam(MemCostDescLbl, memoryCostDetail.desc()) + // record disk cost detail + detail.AddParam(DiskCostDetailLbl, diskCostDetail). + AddParam(DiskCostDescLbl, diskCostDetail.desc()). + AddParam(ProbeDiskCostDescLbl, diskCostDetail.probeDesc()) + + detail.SetDesc(fmt.Sprintf("%s+%s+%s+all children cost", CPUCostDetailLbl, MemCostDetailLbl, DiskCostDetailLbl)) + opt.appendPlanCostDetail(detail) +} + +// HashJoinProbeCostDetail indicates probe cpu cost detail +type HashJoinProbeCostDetail struct { + NumPairs float64 `json:"numPairs"` + HasConditions bool `json:"hasConditions"` + SelectionFactor float64 `json:"selectionFactor"` + ProbeRowCount float64 `json:"probeRowCount"` + Cost float64 `json:"cost"` +} + +// HashJoinCPUCostDetail indicates cpu cost detail +type HashJoinCPUCostDetail struct { + BuildRowCount float64 `json:"buildRowCount"` + CPUFactor float64 `json:"cpuFactor"` + ConcurrencyFactor float64 `json:"concurrencyFactor"` + ProbeCost *HashJoinProbeCostDetail `json:"probeCost"` + HashJoinConcurrency uint `json:"hashJoinConcurrency"` + Spill bool `json:"spill"` + Cost float64 `json:"cost"` + UseOuterToBuild bool `json:"useOuterToBuild"` +} + +func (h *HashJoinCPUCostDetail) desc() string { + var cpuCostDesc string + buildCostDesc := fmt.Sprintf("%s*%s", BuildRowCountLbl, CPUFactorLbl) + cpuCostDesc = fmt.Sprintf("%s+%s+(%s+1)*%s)", buildCostDesc, ProbeCostDetailLbl, HashJoinConcurrencyLbl, ConcurrencyFactorLbl) + if h.UseOuterToBuild { + if h.Spill { + cpuCostDesc = fmt.Sprintf("%s+%s", cpuCostDesc, buildCostDesc) + } else { + cpuCostDesc = fmt.Sprintf("%s+%s/%s", cpuCostDesc, buildCostDesc, HashJoinConcurrencyLbl) + } + } + return cpuCostDesc +} + +func (h *HashJoinCPUCostDetail) probeCostDesc() string { + var probeCostDesc string + if h.ProbeCost.HasConditions { + probeCostDesc = fmt.Sprintf("(%s*%s*%s+%s*%s)/%s", + NumPairsLbl, CPUFactorLbl, SelectionFactorLbl, + ProbeRowCountLbl, CPUFactorLbl, HashJoinConcurrencyLbl) + } else { + probeCostDesc = fmt.Sprintf("(%s*%s)/%s", + NumPairsLbl, CPUFactorLbl, + HashJoinConcurrencyLbl) + } + return probeCostDesc +} + +// HashJoinMemoryCostDetail indicates memory cost detail +type HashJoinMemoryCostDetail struct { + Spill bool `json:"spill"` + MemQuota int64 `json:"memQuota"` + RowSize float64 `json:"rowSize"` + BuildRowCount float64 `json:"buildRowCount"` + MemoryFactor float64 `json:"memoryFactor"` + Cost float64 `json:"cost"` +} + +func (h *HashJoinMemoryCostDetail) desc() string { + memCostDesc := fmt.Sprintf("%s*%s", BuildRowCountLbl, MemoryFactorLbl) + if h.Spill { + memCostDesc = fmt.Sprintf("%s*%s/(%s*%s)", memCostDesc, MemQuotaLbl, RowSizeLbl, BuildRowCountLbl) + } + return memCostDesc +} + +// HashJoinProbeDiskCostDetail indicates probe disk cost detail +type HashJoinProbeDiskCostDetail struct { + SelectionFactor float64 `json:"selectionFactor"` + NumPairs float64 `json:"numPairs"` + HasConditions bool `json:"hasConditions"` + Cost float64 `json:"cost"` +} + +// HashJoinDiskCostDetail indicates disk cost detail +type HashJoinDiskCostDetail struct { + Spill bool `json:"spill"` + UseOuterToBuild bool `json:"useOuterToBuild"` + BuildRowCount float64 `json:"buildRowCount"` + DiskFactor float64 `json:"diskFactor"` + RowSize float64 `json:"rowSize"` + ProbeDiskCost *HashJoinProbeDiskCostDetail `json:"probeDiskCost"` + Cost float64 `json:"cost"` +} + +func (h *HashJoinDiskCostDetail) desc() string { + if !h.Spill { + return "" + } + buildDiskCost := fmt.Sprintf("%s*%s*%s", BuildRowCountLbl, DiskFactorLbl, RowSizeLbl) + desc := fmt.Sprintf("%s+%s", buildDiskCost, ProbeDiskCostLbl) + if h.UseOuterToBuild { + desc = fmt.Sprintf("%s+%s", desc, buildDiskCost) + } + return desc +} + +func (h *HashJoinDiskCostDetail) probeDesc() string { + if !h.Spill { + return "" + } + desc := fmt.Sprintf("%s*%s*%s", NumPairsLbl, DiskFactorLbl, RowSizeLbl) + if h.ProbeDiskCost.HasConditions { + desc = fmt.Sprintf("%s*%s", desc, SelectionFactorLbl) + } + return desc +} diff --git a/planner/core/plan_cost_detail_test.go b/planner/core/plan_cost_detail_test.go index 8f1fadae88781..34584773aa6e8 100644 --- a/planner/core/plan_cost_detail_test.go +++ b/planner/core/plan_cost_detail_test.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser" - "github.com/pingcap/tidb/planner/core" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/hint" @@ -40,66 +40,78 @@ func TestPlanCostDetail(t *testing.T) { assertLbls []string tp string }{ + { + tp: plancodec.TypeHashJoin, + sql: "select /*+ HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.k = t2.k where t1.a = 1;", + assertLbls: []string{ + plannercore.CPUCostDetailLbl, + plannercore.CPUCostDescLbl, + plannercore.ProbeCostDescLbl, + plannercore.MemCostDetailLbl, + plannercore.MemCostDescLbl, + plannercore.DiskCostDetailLbl, + }, + }, { tp: plancodec.TypePointGet, sql: "select * from t where a = 1", assertLbls: []string{ - core.RowSizeLbl, - core.NetworkFactorLbl, - core.SeekFactorLbl, + plannercore.RowSizeLbl, + plannercore.NetworkFactorLbl, + plannercore.SeekFactorLbl, }, }, { tp: plancodec.TypeBatchPointGet, sql: "select * from t where a = 1 or a = 2 or a = 3", assertLbls: []string{ - core.RowCountLbl, - core.RowSizeLbl, - core.NetworkFactorLbl, - core.SeekFactorLbl, - core.ScanConcurrencyLbl, + plannercore.RowCountLbl, + plannercore.RowSizeLbl, + plannercore.NetworkFactorLbl, + plannercore.SeekFactorLbl, + plannercore.ScanConcurrencyLbl, }, }, { tp: plancodec.TypeTableFullScan, sql: "select * from t", assertLbls: []string{ - core.RowCountLbl, - core.RowSizeLbl, - core.ScanFactorLbl, + plannercore.RowCountLbl, + plannercore.RowSizeLbl, + plannercore.ScanFactorLbl, }, }, { tp: plancodec.TypeTableReader, sql: "select * from t", assertLbls: []string{ - core.RowCountLbl, - core.RowSizeLbl, - core.NetworkFactorLbl, - core.NetSeekCostLbl, - core.TablePlanCostLbl, - core.ScanConcurrencyLbl, + plannercore.RowCountLbl, + plannercore.RowSizeLbl, + plannercore.NetworkFactorLbl, + plannercore.NetSeekCostLbl, + plannercore.TablePlanCostLbl, + plannercore.ScanConcurrencyLbl, }, }, { tp: plancodec.TypeIndexFullScan, sql: "select b from t", assertLbls: []string{ - core.RowCountLbl, - core.RowSizeLbl, - core.ScanFactorLbl, + plannercore.RowCountLbl, + plannercore.RowSizeLbl, + plannercore.ScanFactorLbl, }, }, { tp: plancodec.TypeIndexReader, sql: "select b from t", assertLbls: []string{ - core.RowCountLbl, - core.RowSizeLbl, - core.NetworkFactorLbl, - core.NetSeekCostLbl, - core.IndexPlanCostLbl, - core.ScanConcurrencyLbl, + plannercore.RowCountLbl, + plannercore.RowSizeLbl, + plannercore.NetworkFactorLbl, + plannercore.NetSeekCostLbl, + plannercore.IndexPlanCostLbl, + plannercore.ScanConcurrencyLbl, }, }, } @@ -121,16 +133,16 @@ func TestPlanCostDetail(t *testing.T) { func optimize(t *testing.T, sql string, p *parser.Parser, ctx sessionctx.Context, dom *domain.Domain) map[int]*tracing.PhysicalPlanCostDetail { stmt, err := p.ParseOneStmt(sql, "", "") require.NoError(t, err) - err = core.Preprocess(ctx, stmt, core.WithPreprocessorReturn(&core.PreprocessorReturn{InfoSchema: dom.InfoSchema()})) + err = plannercore.Preprocess(ctx, stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: dom.InfoSchema()})) require.NoError(t, err) - sctx := core.MockContext() + sctx := plannercore.MockContext() sctx.GetSessionVars().StmtCtx.EnableOptimizeTrace = true sctx.GetSessionVars().EnableNewCostInterface = true - builder, _ := core.NewPlanBuilder().Init(sctx, dom.InfoSchema(), &hint.BlockHintProcessor{}) + builder, _ := plannercore.NewPlanBuilder().Init(sctx, dom.InfoSchema(), &hint.BlockHintProcessor{}) domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(dom.InfoSchema()) plan, err := builder.Build(context.TODO(), stmt) require.NoError(t, err) - _, _, err = core.DoOptimize(context.TODO(), sctx, builder.GetOptFlag(), plan.(core.LogicalPlan)) + _, _, err = plannercore.DoOptimize(context.TODO(), sctx, builder.GetOptFlag(), plan.(plannercore.LogicalPlan)) require.NoError(t, err) return sctx.GetSessionVars().StmtCtx.OptimizeTracer.Physical.PhysicalPlanCostDetails } diff --git a/planner/core/task.go b/planner/core/task.go index 132af680b8be0..da23f255df563 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -329,7 +329,7 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task { p.SetChildren(lTask.plan(), rTask.plan()) task := &rootTask{ p: p, - cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), false, 0), + cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), false, 0, nil), } p.cost = task.cost() return task @@ -550,7 +550,7 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task { outerTask = rTask } task := &mppTask{ - cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0), + cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0, nil), p: p, partTp: outerTask.partTp, hashCols: outerTask.hashCols, @@ -581,7 +581,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...task) task { tblColHists: rTask.tblColHists, indexPlanFinished: true, tablePlan: p, - cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0), + cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0, nil), } p.cost = task.cst return task diff --git a/planner/implementation/join.go b/planner/implementation/join.go index d601632ec25c1..3c514a26d8a47 100644 --- a/planner/implementation/join.go +++ b/planner/implementation/join.go @@ -29,7 +29,7 @@ func (impl *HashJoinImpl) CalcCost(_ float64, children ...memo.Implementation) f hashJoin := impl.plan.(*plannercore.PhysicalHashJoin) // The children here are only used to calculate the cost. hashJoin.SetChildren(children[0].GetPlan(), children[1].GetPlan()) - selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), false, 0) + selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), false, 0, nil) impl.cost = selfCost + children[0].GetCost() + children[1].GetCost() return impl.cost } diff --git a/util/tracing/opt_trace.go b/util/tracing/opt_trace.go index 7520c5cc0d4b8..8112379534b46 100644 --- a/util/tracing/opt_trace.go +++ b/util/tracing/opt_trace.go @@ -232,44 +232,48 @@ func (tracer *OptimizeTracer) RecordFinalPlan(final *PlanTrace) { // PhysicalPlanCostDetail indicates cost detail type PhysicalPlanCostDetail struct { - id int - tp string - params map[string]interface{} - desc string + ID int `json:"id"` + TP string `json:"type"` + Params map[string]interface{} `json:"params"` + Desc string `json:"desc"` } // NewPhysicalPlanCostDetail creates a cost detail func NewPhysicalPlanCostDetail(id int, tp string) *PhysicalPlanCostDetail { return &PhysicalPlanCostDetail{ - id: id, - tp: tp, - params: make(map[string]interface{}), + ID: id, + TP: tp, + Params: make(map[string]interface{}), } } // AddParam adds param func (d *PhysicalPlanCostDetail) AddParam(k string, v interface{}) *PhysicalPlanCostDetail { - d.params[k] = v + // discard empty param value + if s, ok := v.(string); ok && len(s) < 1 { + return d + } + d.Params[k] = v return d } // SetDesc sets desc func (d *PhysicalPlanCostDetail) SetDesc(desc string) { - d.desc = desc + d.Desc = desc } // GetPlanID gets plan id func (d *PhysicalPlanCostDetail) GetPlanID() int { - return d.id + return d.ID } // GetPlanType gets plan type func (d *PhysicalPlanCostDetail) GetPlanType() string { - return d.tp + return d.TP } // Exists checks whether key exists in params func (d *PhysicalPlanCostDetail) Exists(k string) bool { - _, ok := d.params[k] + _, ok := d.Params[k] return ok }