From e3046327ecc3274c3c59de54be30939c3ea76399 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 28 Sep 2022 20:06:47 +0800 Subject: [PATCH 01/11] impl Signed-off-by: yisaer add time log Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer fix ctx Signed-off-by: yisaer add log Signed-off-by: yisaer fix Signed-off-by: yisaer fix Signed-off-by: yisaer fix Signed-off-by: yisaer fix Signed-off-by: yisaer --- config/config.go | 36 +++++----- domain/domain.go | 42 +++++++++++ executor/analyze.go | 118 ++++++++++++++++++++++++------- executor/analyze_global_stats.go | 2 +- executor/analyze_worker.go | 71 +++++++++++++++++++ session/session.go | 9 ++- sessionctx/variable/session.go | 3 + sessionctx/variable/sysvar.go | 5 ++ sessionctx/variable/tidb_vars.go | 4 ++ statistics/handle/handle.go | 55 +++++++++----- 10 files changed, 281 insertions(+), 64 deletions(-) create mode 100644 executor/analyze_worker.go diff --git a/config/config.go b/config/config.go index ebd78e34495fa..d6b2d0cad6081 100644 --- a/config/config.go +++ b/config/config.go @@ -640,14 +640,15 @@ type Performance struct { ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"` MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` // Deprecated - MemProfileInterval string `toml:"-" json:"-"` - IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` - PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` - GOGC int `toml:"gogc" json:"gogc"` - EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` - StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` - StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` - EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` + MemProfileInterval string `toml:"-" json:"-"` + IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` + PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` + GOGC int `toml:"gogc" json:"gogc"` + EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` + StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"` + StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"` + AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"` + EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"` // The following items are deprecated. We need to keep them here temporarily // to support the upgrade process. They can be removed in future. @@ -902,15 +903,16 @@ var defaultConf = Config{ CommitterConcurrency: defTiKVCfg.CommitterConcurrency, MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour // TODO: set indexUsageSyncLease to 60s. - IndexUsageSyncLease: "0s", - GOGC: 100, - EnforceMPP: false, - PlanReplayerGCLease: "10m", - StatsLoadConcurrency: 5, - StatsLoadQueueSize: 1000, - EnableStatsCacheMemQuota: false, - RunAutoAnalyze: true, - EnableLoadFMSketch: false, + IndexUsageSyncLease: "0s", + GOGC: 100, + EnforceMPP: false, + PlanReplayerGCLease: "10m", + StatsLoadConcurrency: 5, + StatsLoadQueueSize: 1000, + AnalyzePartitionConcurrencyQuota: 16, + EnableStatsCacheMemQuota: false, + RunAutoAnalyze: true, + EnableLoadFMSketch: false, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/domain/domain.go b/domain/domain.go index 1308fb7c7bc43..a4d4e2d9ebca2 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -123,6 +123,11 @@ type Domain struct { sysProcesses SysProcesses mdlCheckTableInfo *mdlCheckTableInfo + + analyzeMu struct { + sync.Mutex + sctxs map[sessionctx.Context]bool + } } type mdlCheckTableInfo struct { @@ -1568,6 +1573,43 @@ func (do *Domain) SetStatsUpdating(val bool) { } } +// ReturnAnalyzeExtraExec returned extra exec for Analyze +func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { + do.analyzeMu.Lock() + defer do.analyzeMu.Unlock() + for _, ctx := range sctxs { + do.analyzeMu.sctxs[ctx] = false + } +} + +// GetAnalyzeExtraExec get needed extra exec for analyze +func (do *Domain) GetAnalyzeExtraExec(need int) []sessionctx.Context { + count := 0 + r := make([]sessionctx.Context, 0) + do.analyzeMu.Lock() + defer do.analyzeMu.Unlock() + for sctx, used := range do.analyzeMu.sctxs { + if used { + continue + } + r = append(r, sctx) + do.analyzeMu.sctxs[sctx] = true + count++ + if count >= need { + break + } + } + return r +} + +// SetupAnalyzeExtraExec setups extra exec for Analyze +func (do *Domain) SetupAnalyzeExtraExec(ctxs []sessionctx.Context) { + do.analyzeMu.sctxs = make(map[sessionctx.Context]bool) + for _, ctx := range ctxs { + do.analyzeMu.sctxs[ctx] = false + } +} + // LoadAndUpdateStatsLoop loads and updates stats info. func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error { if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil { diff --git a/executor/analyze.go b/executor/analyze.go index 6fccec8a9bf5b..0d7958dfa012f 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -20,6 +20,7 @@ import ( "math" "strconv" "strings" + "sync" "time" "github.com/pingcap/errors" @@ -188,8 +189,8 @@ func (e *AnalyzeExec) saveV2AnalyzeOpts() error { return nil } -func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { - statsHandle := domain.GetDomain(e.ctx).StatsHandle() +func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error { + statsHandle := domain.GetDomain(sctx).StatsHandle() historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable() if err != nil { return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) @@ -198,7 +199,7 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { return nil } - is := domain.GetDomain(e.ctx).InfoSchema() + is := domain.GetDomain(sctx).InfoSchema() tbl, existed := is.TableByID(tableID) if !existed { return errors.Errorf("cannot get table by id %d", tableID) @@ -217,6 +218,18 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error { // handleResultsError will handle the error fetch from resultsCh and record it in log func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool, globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { + partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency + if partitionStatsConcurrency > 1 { + dom := domain.GetDomain(e.ctx) + subSctxs := dom.GetAnalyzeExtraExec(partitionStatsConcurrency) + if len(subSctxs) > 0 { + internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) + err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) + dom.ReturnAnalyzeExtraExec(subSctxs) + return err + } + } + statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error @@ -235,36 +248,16 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) continue } - if results.TableID.IsPartitionTable() && needGlobalStats { - for _, result := range results.Ars { - if result.IsIndex == 0 { - // If it does not belong to the statistics of index, we need to set it to -1 to distinguish. - globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)} - histIDs := make([]int64, 0, len(result.Hist)) - for _, hg := range result.Hist { - // It's normal virtual column, skip. - if hg == nil { - continue - } - histIDs = append(histIDs, hg.ID) - } - globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer} - } else { - for _, hg := range result.Hist { - globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID} - globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer} - } - } - } - } - if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { + handleGlobalStats(needGlobalStats, globalStatsMap, results) + + if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { err = err1 logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) // Dump stats to historical storage. - if err := e.recordHistoricalStats(results.TableID.TableID); err != nil { + if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) } } @@ -273,6 +266,52 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return err } +func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool, + subSctxs []sessionctx.Context, + globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { + partitionStatsConcurrency := len(subSctxs) + wg := &sync.WaitGroup{} + saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency) + errCh := make(chan error, partitionStatsConcurrency) + wg.Add(partitionStatsConcurrency) + for i := 0; i < partitionStatsConcurrency; i++ { + worker := newAnalyzeSaveStatsWorker(wg, saveResultsCh, subSctxs[i], errCh) + ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + go worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + } + panicCnt := 0 + var err error + for panicCnt < statsConcurrency { + results, ok := <-resultsCh + if !ok { + break + } + if results.Err != nil { + err = results.Err + if isAnalyzeWorkerPanic(err) { + panicCnt++ + } else { + logutil.Logger(ctx).Error("analyze failed", zap.Error(err)) + } + finishJobWithLog(e.ctx, results.Job, err) + continue + } + handleGlobalStats(needGlobalStats, globalStatsMap, results) + saveResultsCh <- results + } + close(saveResultsCh) + wg.Wait() + close(errCh) + if len(errCh) > 0 { + errMsg := make([]string, 0) + for err1 := range errCh { + errMsg = append(errMsg, err1.Error()) + } + err = errors.New(strings.Join(errMsg, ",")) + } + return err +} + func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) { var task *analyzeTask defer func() { @@ -434,3 +473,28 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy zap.String("cost", job.EndTime.Sub(job.StartTime).String())) } } + +func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, results *statistics.AnalyzeResults) { + if results.TableID.IsPartitionTable() && needGlobalStats { + for _, result := range results.Ars { + if result.IsIndex == 0 { + // If it does not belong to the statistics of index, we need to set it to -1 to distinguish. + globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)} + histIDs := make([]int64, 0, len(result.Hist)) + for _, hg := range result.Hist { + // It's normal virtual column, skip. + if hg == nil { + continue + } + histIDs = append(histIDs, hg.ID) + } + globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer} + } else { + for _, hg := range result.Hist { + globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID} + globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer} + } + } + } + } +} diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index c9ff6217a195c..440f0a104e207 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -83,7 +83,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err)) } // Dump stats to historical storage. - if err := e.recordHistoricalStats(globalStatsID.tableID); err != nil { + if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil { logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) } } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go new file mode 100644 index 0000000000000..630cab9284512 --- /dev/null +++ b/executor/analyze_worker.go @@ -0,0 +1,71 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "sync" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type analyzeSaveStatsWorker struct { + wg *sync.WaitGroup + resultsCh <-chan *statistics.AnalyzeResults + sctx sessionctx.Context + errCh chan<- error + checkHistoricalStatsEnable bool +} + +func newAnalyzeSaveStatsWorker(wg *sync.WaitGroup, + resultsCh <-chan *statistics.AnalyzeResults, + sctx sessionctx.Context, + errCh chan<- error) *analyzeSaveStatsWorker { + worker := &analyzeSaveStatsWorker{ + wg: wg, + resultsCh: resultsCh, + sctx: sctx, + errCh: errCh, + } + return worker +} + +func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + worker.wg.Done() + }() + for results := range worker.resultsCh { + err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) + if err != nil { + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(worker.sctx, results.Job, err) + worker.errCh <- err + } else { + finishJobWithLog(worker.sctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) + if err != nil { + return + } + } +} diff --git a/session/session.go b/session/session.go index bbd819d28bb90..a8419391cd15c 100644 --- a/session/session.go +++ b/session/session.go @@ -2752,8 +2752,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { runInBootstrapSession(store, upgrade) } + analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 7+concurrency) + ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota) if err != nil { return nil, err } @@ -2832,7 +2833,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil { return nil, err } - + subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota) + for i := 0; i < analyzeConcurrencyQuota; i++ { + subCtxs2[i] = ses[7+concurrency+i] + } + dom.SetupAnalyzeExtraExec(subCtxs2) dom.DumpFileGcCheckerLoop() dom.LoadSigningCertLoop() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index db3606baf60b3..9ee4672a92f74 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1288,6 +1288,9 @@ type SessionVars struct { // LastPlanReplayerToken indicates the last plan replayer token LastPlanReplayerToken string + + // AnalyzePartitionConcurrency indicates concurrency for partitions in Analyze + AnalyzePartitionConcurrency int } // GetPreparedStmtByName returns the prepared statement specified by stmtName. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 9e4409b13814e..c5a02a97b323e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1914,6 +1914,11 @@ var defaultSysVars = []*SysVar{ s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzePartitionConcurrency, Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10), + MinValue: 1, MaxValue: 16, SetSession: func(s *SessionVars, val string) error { + s.AnalyzePartitionConcurrency = int(TidbOptInt64(val, DefTiDBAnalyzePartitionConcurrency)) + return nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 2d4c79954fad3..f20b93fa48f20 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -770,6 +770,9 @@ const ( // ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory // limit for ranges. TiDBOptRangeMaxSize = "tidb_opt_range_max_size" + + // TiDBAnalyzePartitionConcurrency indicates concurrency for partitions in Analyze + TiDBAnalyzePartitionConcurrency = "tidb_analyze_partition_concurrency" ) // TiDB vars that have only global scope @@ -1081,6 +1084,7 @@ const ( DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false DefTiDBOptRangeMaxSize = 0 + DefTiDBAnalyzePartitionConcurrency = 4 DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 ) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 00660c9756a68..dedeb0af77ece 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -1240,7 +1240,7 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm } // SaveTableStatsToStorage saves the stats of a table to storage. -func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS, analyzeSnapshot bool) (err error) { +func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) { tableID := results.TableID.GetStatisticsID() statsVer := uint64(0) defer func() { @@ -1250,8 +1250,21 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee }() h.mu.Lock() defer h.mu.Unlock() + return SaveTableStatsToStorage(h.mu.ctx, results, analyzeSnapshot) +} + +// SaveTableStatsToStorage saves the stats of a table to storage. +func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) { + needDumpFMS := results.TableID.IsPartitionTable() + tableID := results.TableID.GetStatisticsID() + statsVer := uint64(0) + defer func() { + if err == nil && statsVer != 0 { + err = recordHistoricalStatsMeta(sctx, tableID, statsVer) + } + }() ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - exec := h.mu.ctx.(sqlexec.SQLExecutor) + exec := sctx.(sqlexec.SQLExecutor) _, err = exec.ExecuteInternal(ctx, "begin pessimistic") if err != nil { return err @@ -1259,7 +1272,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee defer func() { err = finishTransaction(ctx, exec, err) }() - txn, err := h.mu.ctx.Txn(true) + txn, err := sctx.Txn(true) if err != nil { return err } @@ -1272,7 +1285,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee return err } var rows []chunk.Row - rows, err = sqlexec.DrainRecordSet(ctx, rs, h.mu.ctx.GetSessionVars().MaxChunkSize) + rows, err = sqlexec.DrainRecordSet(ctx, rs, sctx.GetSessionVars().MaxChunkSize) if err != nil { return err } @@ -1369,7 +1382,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } - sc := h.mu.ctx.GetSessionVars().StmtCtx + sc := sctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg) if err != nil { @@ -2234,33 +2247,36 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. return version, nil } -// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled. -func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) { - h.mu.Lock() - defer h.mu.Unlock() - val, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) +func checkHistoricalStatsEnable(sctx sessionctx.Context) (enable bool, err error) { + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) if err != nil { return false, errors.Trace(err) } return variable.TiDBOptOn(val), nil } -func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error { +// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled. +func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) { + h.mu.Lock() + defer h.mu.Unlock() + return checkHistoricalStatsEnable(h.mu.ctx) +} + +func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64) error { if tableID == 0 || version == 0 { return errors.Errorf("tableID %d, version %d are invalid", tableID, version) } - historicalStatsEnabled, err := h.CheckHistoricalStatsEnable() + historicalStatsEnabled, err := checkHistoricalStatsEnable(sctx) if err != nil { return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) } if !historicalStatsEnabled { return nil } - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - h.mu.Lock() - defer h.mu.Unlock() - rows, _, err := h.execRestrictedSQL(ctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) + exec := sctx.(sqlexec.SQLExecutor) + rexec := sctx.(sqlexec.RestrictedSQLExecutor) + rows, _, err := rexec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version) if err != nil { return errors.Trace(err) } @@ -2269,7 +2285,6 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error } modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) - exec := h.mu.ctx.(sqlexec.SQLExecutor) _, err = exec.ExecuteInternal(ctx, "begin pessimistic") if err != nil { return errors.Trace(err) @@ -2285,6 +2300,12 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error return nil } +func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error { + h.mu.Lock() + defer h.mu.Unlock() + return recordHistoricalStatsMeta(h.mu.ctx, tableID, version) +} + // InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job. func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error { h.mu.Lock() From 09b8652833d120953b884722daba29e15a2c57c3 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 11:33:40 +0800 Subject: [PATCH 02/11] revise code Signed-off-by: yisaer --- domain/domain.go | 11 +++++++---- executor/analyze.go | 16 ++++++++++------ executor/analyze_worker.go | 8 +------- session/session.go | 2 +- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index a4d4e2d9ebca2..3a940aa7759fa 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1582,8 +1582,11 @@ func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { } } -// GetAnalyzeExtraExec get needed extra exec for analyze -func (do *Domain) GetAnalyzeExtraExec(need int) []sessionctx.Context { +// DemandAnalyzeExec get needed exec for analyze +func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { + if need < 1 { + return nil + } count := 0 r := make([]sessionctx.Context, 0) do.analyzeMu.Lock() @@ -1602,8 +1605,8 @@ func (do *Domain) GetAnalyzeExtraExec(need int) []sessionctx.Context { return r } -// SetupAnalyzeExtraExec setups extra exec for Analyze -func (do *Domain) SetupAnalyzeExtraExec(ctxs []sessionctx.Context) { +// SetupAnalyzeExec setups exec for Analyze Executor +func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) { do.analyzeMu.sctxs = make(map[sessionctx.Context]bool) for _, ctx := range ctxs { do.analyzeMu.sctxs[ctx] = false diff --git a/executor/analyze.go b/executor/analyze.go index 0d7958dfa012f..6626a7431a6f4 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -20,7 +20,6 @@ import ( "math" "strconv" "strings" - "sync" "time" "github.com/pingcap/errors" @@ -219,9 +218,11 @@ func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error { func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool, globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency + // If 'partitionStatsConcurrency' > 1, we will try to demand extra session from Domain to save Analyze results in concurrency. + // If there is no extra session we can use, we will save analyze results in single-thread. if partitionStatsConcurrency > 1 { dom := domain.GetDomain(e.ctx) - subSctxs := dom.GetAnalyzeExtraExec(partitionStatsConcurrency) + subSctxs := dom.DemandAnalyzeExec(partitionStatsConcurrency) if len(subSctxs) > 0 { internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) @@ -230,6 +231,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n } } + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error @@ -270,14 +272,16 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta subSctxs []sessionctx.Context, globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error { partitionStatsConcurrency := len(subSctxs) - wg := &sync.WaitGroup{} + + var wg util.WaitGroupWrapper saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency) errCh := make(chan error, partitionStatsConcurrency) - wg.Add(partitionStatsConcurrency) for i := 0; i < partitionStatsConcurrency; i++ { - worker := newAnalyzeSaveStatsWorker(wg, saveResultsCh, subSctxs[i], errCh) + worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh) ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - go worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + wg.Run(func() { + worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + }) } panicCnt := 0 var err error diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index 630cab9284512..bb9a8e573b6b5 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -16,7 +16,6 @@ package executor import ( "context" - "sync" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" @@ -26,19 +25,17 @@ import ( ) type analyzeSaveStatsWorker struct { - wg *sync.WaitGroup resultsCh <-chan *statistics.AnalyzeResults sctx sessionctx.Context errCh chan<- error checkHistoricalStatsEnable bool } -func newAnalyzeSaveStatsWorker(wg *sync.WaitGroup, +func newAnalyzeSaveStatsWorker( resultsCh <-chan *statistics.AnalyzeResults, sctx sessionctx.Context, errCh chan<- error) *analyzeSaveStatsWorker { worker := &analyzeSaveStatsWorker{ - wg: wg, resultsCh: resultsCh, sctx: sctx, errCh: errCh, @@ -47,9 +44,6 @@ func newAnalyzeSaveStatsWorker(wg *sync.WaitGroup, } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { - defer func() { - worker.wg.Done() - }() for results := range worker.resultsCh { err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) if err != nil { diff --git a/session/session.go b/session/session.go index a8419391cd15c..c6651d1c376bd 100644 --- a/session/session.go +++ b/session/session.go @@ -2837,7 +2837,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { for i := 0; i < analyzeConcurrencyQuota; i++ { subCtxs2[i] = ses[7+concurrency+i] } - dom.SetupAnalyzeExtraExec(subCtxs2) + dom.SetupAnalyzeExec(subCtxs2) dom.DumpFileGcCheckerLoop() dom.LoadSigningCertLoop() From 591ecdd1a159774bc545f9ca8d8236c15322cca9 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 12:18:56 +0800 Subject: [PATCH 03/11] add test Signed-off-by: yisaer --- executor/analyze_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 246016a4082f7..bbe3f5b8d1b1e 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -371,6 +371,9 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) { fmt.Println("testcase ", concurrency) tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'") } From 0e071e11a482516798af74da0ec2f1474e174bcd Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 17:47:48 +0800 Subject: [PATCH 04/11] address the comment Signed-off-by: yisaer --- domain/domain.go | 6 +++--- executor/analyze.go | 18 ++++------------ executor/analyze_worker.go | 36 ++++++++++++++++++++------------ sessionctx/variable/tidb_vars.go | 2 +- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 47de7ceefc95f..339c55e90e20b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// ReturnAnalyzeExtraExec returned extra exec for Analyze -func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { +// AvailableAnalyzeExec make sctxs available +func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { @@ -1595,7 +1595,7 @@ func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { return nil } count := 0 - r := make([]sessionctx.Context, 0) + r := make([]sessionctx.Context, 0, need) do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for sctx, used := range do.analyzeMu.sctxs { diff --git a/executor/analyze.go b/executor/analyze.go index 6626a7431a6f4..3dc81940bfbda 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -226,13 +226,12 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n if len(subSctxs) > 0 { internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.ReturnAnalyzeExtraExec(subSctxs) + dom.AvailableAnalyzeExec(subSctxs) return err } } // save analyze results in single-thread. - statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error for panicCnt < concurrency { @@ -251,19 +250,10 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) - - if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { - err = err1 - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(e.ctx, results.Job, err) - } else { - finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } + err = saveTableStatsToStorage(ctx, e.ctx, results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) + if err != nil { + return err } - invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) } return err } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index bb9a8e573b6b5..e151409061257 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -16,7 +16,6 @@ package executor import ( "context" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" @@ -44,22 +43,33 @@ func newAnalyzeSaveStatsWorker( } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) + worker.errCh <- getAnalyzePanicErr(r) + } + }() for results := range worker.resultsCh { - err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) + err := saveTableStatsToStorage(ctx, worker.sctx, results, analyzeSnapshot) if err != nil { - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(worker.sctx, results.Job, err) worker.errCh <- err - } else { - finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } - } - invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) - if err != nil { return } } } + +func saveTableStatsToStorage(ctx context.Context, sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) error { + err := handle.SaveTableStatsToStorage(sctx, results, analyzeSnapshot) + if err != nil { + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(sctx, results.Job, err) + } else { + finishJobWithLog(sctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(sctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) + return err +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8497b9bcecfa2..84ec0bab0ffd0 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1062,7 +1062,7 @@ const ( DefTiDBRcWriteCheckTs = false DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false - DefTiDBAnalyzePartitionConcurrency = 4 + DefTiDBAnalyzePartitionConcurrency = 1 DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 From d2804d6401c5f4216abef307a609e816eb0985b1 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 17:49:41 +0800 Subject: [PATCH 05/11] address the comment Signed-off-by: yisaer --- config/config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 28fc72a4dc9c1..a6b0306746e88 100644 --- a/config/config.go +++ b/config/config.go @@ -640,7 +640,8 @@ type Performance struct { ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"` MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` // Deprecated - MemProfileInterval string `toml:"-" json:"-"` + MemProfileInterval string `toml:"-" json:"-"` + IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` GOGC int `toml:"gogc" json:"gogc"` From a190c94bccaca9855850be3d0c08f7e603c82c41 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 18:10:32 +0800 Subject: [PATCH 06/11] address the comment Signed-off-by: yisaer --- executor/analyze_worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index e151409061257..c5a610d25de1c 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -16,6 +16,7 @@ package executor import ( "context" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" From 1db76989abd610cb9598efe7dad986fd95a131a0 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 19:20:04 +0800 Subject: [PATCH 07/11] Revert "address the comment" This reverts commit 0e071e11a482516798af74da0ec2f1474e174bcd. --- domain/domain.go | 6 +++--- executor/analyze.go | 18 ++++++++++++---- executor/analyze_worker.go | 35 +++++++++++--------------------- sessionctx/variable/tidb_vars.go | 2 +- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 3c2a49729a452..aec05d16b4798 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// AvailableAnalyzeExec make sctxs available -func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { +// ReturnAnalyzeExtraExec returned extra exec for Analyze +func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { @@ -1595,7 +1595,7 @@ func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { return nil } count := 0 - r := make([]sessionctx.Context, 0, need) + r := make([]sessionctx.Context, 0) do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for sctx, used := range do.analyzeMu.sctxs { diff --git a/executor/analyze.go b/executor/analyze.go index 3dc81940bfbda..6626a7431a6f4 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -226,12 +226,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n if len(subSctxs) > 0 { internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.AvailableAnalyzeExec(subSctxs) + dom.ReturnAnalyzeExtraExec(subSctxs) return err } } // save analyze results in single-thread. + statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 var err error for panicCnt < concurrency { @@ -250,10 +251,19 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) - err = saveTableStatsToStorage(ctx, e.ctx, results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) - if err != nil { - return err + + if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil { + err = err1 + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(e.ctx, results.Job, err) + } else { + finishJobWithLog(e.ctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } } + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) } return err } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index c5a610d25de1c..bb9a8e573b6b5 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -44,33 +44,22 @@ func newAnalyzeSaveStatsWorker( } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { - defer func() { - if r := recover(); r != nil { - logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) - worker.errCh <- getAnalyzePanicErr(r) - } - }() for results := range worker.resultsCh { - err := saveTableStatsToStorage(ctx, worker.sctx, results, analyzeSnapshot) + err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) if err != nil { + logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) + finishJobWithLog(worker.sctx, results.Job, err) worker.errCh <- err - return + } else { + finishJobWithLog(worker.sctx, results.Job, nil) + // Dump stats to historical storage. + if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } } - } -} - -func saveTableStatsToStorage(ctx context.Context, sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) error { - err := handle.SaveTableStatsToStorage(sctx, results, analyzeSnapshot) - if err != nil { - logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err)) - finishJobWithLog(sctx, results.Job, err) - } else { - finishJobWithLog(sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) + if err != nil { + return } } - invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) - return err } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 0ac6a883ffcfe..a1e15d3645cce 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1058,7 +1058,7 @@ const ( DefTiDBRcWriteCheckTs = false DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false - DefTiDBAnalyzePartitionConcurrency = 1 + DefTiDBAnalyzePartitionConcurrency = 4 DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 From c65cc879c83c29f9d888f2a079b0c22ec6e6a928 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 24 Oct 2022 19:22:15 +0800 Subject: [PATCH 08/11] address the comment Signed-off-by: yisaer --- domain/domain.go | 4 ++-- executor/analyze.go | 2 +- sessionctx/variable/tidb_vars.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index aec05d16b4798..c8ecf2793b075 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// ReturnAnalyzeExtraExec returned extra exec for Analyze -func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) { +// AvailableAnalyzeExec returned extra exec for Analyze +func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { diff --git a/executor/analyze.go b/executor/analyze.go index 6626a7431a6f4..ff14ba78fc220 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -226,7 +226,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n if len(subSctxs) > 0 { internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.ReturnAnalyzeExtraExec(subSctxs) + dom.AvailableAnalyzeExec(subSctxs) return err } } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index a1e15d3645cce..0ac6a883ffcfe 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -1058,7 +1058,7 @@ const ( DefTiDBRcWriteCheckTs = false DefTiDBConstraintCheckInPlacePessimistic = true DefTiDBForeignKeyChecks = false - DefTiDBAnalyzePartitionConcurrency = 4 + DefTiDBAnalyzePartitionConcurrency = 1 DefTiDBOptRangeMaxSize = 64 * int64(size.MB) // 64 MB DefTiDBCostModelVer = 1 DefTiDBServerMemoryLimitSessMinSize = 128 << 20 From 9c6e08dad026f44b607f54ba111f8f57f1ef785e Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 25 Oct 2022 11:10:24 +0800 Subject: [PATCH 09/11] address the comment Signed-off-by: yisaer --- domain/domain.go | 8 ++++---- executor/analyze.go | 6 ++++-- executor/analyze_worker.go | 6 ++++++ 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index c8ecf2793b075..60b8d316a1949 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1580,8 +1580,8 @@ func (do *Domain) SetStatsUpdating(val bool) { } } -// AvailableAnalyzeExec returned extra exec for Analyze -func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { +// ReleaseAnalyzeExec returned extra exec for Analyze +func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) { do.analyzeMu.Lock() defer do.analyzeMu.Unlock() for _, ctx := range sctxs { @@ -1589,8 +1589,8 @@ func (do *Domain) AvailableAnalyzeExec(sctxs []sessionctx.Context) { } } -// DemandAnalyzeExec get needed exec for analyze -func (do *Domain) DemandAnalyzeExec(need int) []sessionctx.Context { +// FetchAnalyzeExec get needed exec for analyze +func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context { if need < 1 { return nil } diff --git a/executor/analyze.go b/executor/analyze.go index ff14ba78fc220..66334ad05d647 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -222,11 +222,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n // If there is no extra session we can use, we will save analyze results in single-thread. if partitionStatsConcurrency > 1 { dom := domain.GetDomain(e.ctx) - subSctxs := dom.DemandAnalyzeExec(partitionStatsConcurrency) + subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency) if len(subSctxs) > 0 { + defer func() { + dom.ReleaseAnalyzeExec(subSctxs) + }() internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats) err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh) - dom.AvailableAnalyzeExec(subSctxs) return err } } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index bb9a8e573b6b5..ae30b773e0d1e 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -44,6 +44,12 @@ func newAnalyzeSaveStatsWorker( } func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) { + defer func() { + if r := recover(); r != nil { + logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack")) + worker.errCh <- getAnalyzePanicErr(r) + } + }() for results := range worker.resultsCh { err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot) if err != nil { From 781a61908e18a42f8b218d48c9744b593ffcaa1e Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 25 Oct 2022 11:10:53 +0800 Subject: [PATCH 10/11] address the comment Signed-off-by: yisaer --- executor/analyze_worker.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index ae30b773e0d1e..0297b142dbb14 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -25,10 +25,9 @@ import ( ) type analyzeSaveStatsWorker struct { - resultsCh <-chan *statistics.AnalyzeResults - sctx sessionctx.Context - errCh chan<- error - checkHistoricalStatsEnable bool + resultsCh <-chan *statistics.AnalyzeResults + sctx sessionctx.Context + errCh chan<- error } func newAnalyzeSaveStatsWorker( From 72d7a5914e8aa5f81140f321d891f291b81d43f2 Mon Sep 17 00:00:00 2001 From: yisaer Date: Tue, 25 Oct 2022 11:12:45 +0800 Subject: [PATCH 11/11] address the comment Signed-off-by: yisaer --- sessionctx/variable/sysvar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 7966de918dcb5..73968f0da44c8 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1943,7 +1943,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzePartitionConcurrency, Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10), - MinValue: 1, MaxValue: 16, SetSession: func(s *SessionVars, val string) error { + MinValue: 1, MaxValue: uint64(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota), SetSession: func(s *SessionVars, val string) error { s.AnalyzePartitionConcurrency = int(TidbOptInt64(val, DefTiDBAnalyzePartitionConcurrency)) return nil }},