Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support save partition stats in concurrency #38239

Merged
merged 15 commits into from
Oct 25, 2022
37 changes: 20 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,16 @@ type Performance struct {
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the Deprecated variables are defined bellow here. Besides, why we use the config rather than the system variables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only indicates MemProfileInterval is deprecated. We use config is due to AnalyzePartitionConcurrencyQuota is used to pre-create the session pool in domain when tidb-server started.

MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
MemProfileInterval string `toml:"-" json:"-"`

IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using an SQL variable instead of a config item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use config is due to AnalyzePartitionConcurrencyQuota is used to pre-create the session pool in domain when tidb-server started.

EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -905,15 +907,16 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
45 changes: 45 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ type Domain struct {
sysProcesses SysProcesses

mdlCheckTableInfo *mdlCheckTableInfo

analyzeMu struct {
sync.Mutex
sctxs map[sessionctx.Context]bool
}
}

type mdlCheckTableInfo struct {
Expand Down Expand Up @@ -1575,6 +1580,46 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// ReleaseAnalyzeExec returned extra exec for Analyze
func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// FetchAnalyzeExec get needed exec for analyze
func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context {
if need < 1 {
return nil
}
count := 0
r := make([]sessionctx.Context, 0)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for sctx, used := range do.analyzeMu.sctxs {
if used {
continue
}
r = append(r, sctx)
do.analyzeMu.sctxs[sctx] = true
count++
if count >= need {
break
}
}
return r
}

// SetupAnalyzeExec setups exec for Analyze Executor
func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
do.analyzeMu.sctxs = make(map[sessionctx.Context]bool)
for _, ctx := range ctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
Expand Down
124 changes: 97 additions & 27 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func (e *AnalyzeExec) saveV2AnalyzeOpts() error {
return nil
}

func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {
statsHandle := domain.GetDomain(sctx).StatsHandle()
historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
Expand All @@ -198,7 +198,7 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
return nil
}

is := domain.GetDomain(e.ctx).InfoSchema()
is := domain.GetDomain(sctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
Expand All @@ -217,6 +217,23 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency
// If 'partitionStatsConcurrency' > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.ctx)
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
if len(subSctxs) > 0 {
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
return err
}
}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
var err error
Expand All @@ -235,36 +252,16 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
continue
}
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
handleGlobalStats(needGlobalStats, globalStatsMap, results)

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := e.recordHistoricalStats(results.TableID.TableID); err != nil {
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand All @@ -273,6 +270,54 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
return err
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := len(subSctxs)
Copy link
Contributor

@Reminiscent Reminiscent Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the relation between statsConcurrency and partitionStatsConcurrency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statsConcurrency is the concurrency for building analyze results while partitionStatsConcurrency is the concurrency to save results into storage.


var wg util.WaitGroupWrapper
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
wg.Run(func() {
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
})
}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.ctx, results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
saveResultsCh <- results
}
close(saveResultsCh)
wg.Wait()
close(errCh)
if len(errCh) > 0 {
errMsg := make([]string, 0)
for err1 := range errCh {
errMsg = append(errMsg, err1.Error())
}
err = errors.New(strings.Join(errMsg, ","))
}
return err
}

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
defer func() {
Expand Down Expand Up @@ -434,3 +479,28 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
}
}

func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, results *statistics.AnalyzeResults) {
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
}
2 changes: 1 addition & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
// Dump stats to historical storage.
if err := e.recordHistoricalStats(globalStatsID.tableID); err != nil {
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand Down
3 changes: 3 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency))

tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}
Expand Down
70 changes: 70 additions & 0 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type analyzeSaveStatsWorker struct {
resultsCh <-chan *statistics.AnalyzeResults
sctx sessionctx.Context
errCh chan<- error
}

func newAnalyzeSaveStatsWorker(
resultsCh <-chan *statistics.AnalyzeResults,
sctx sessionctx.Context,
errCh chan<- error) *analyzeSaveStatsWorker {
worker := &analyzeSaveStatsWorker{
resultsCh: resultsCh,
sctx: sctx,
errCh: errCh,
}
return worker
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
worker.errCh <- getAnalyzePanicErr(r)
}
}()
for results := range worker.resultsCh {
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return
}
}
}
9 changes: 7 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2849,8 +2849,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
runInBootstrapSession(store, upgrade)
}

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 7+concurrency)
ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2933,7 +2934,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
return nil, err
}

subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = ses[7+concurrency+i]
}
dom.SetupAnalyzeExec(subCtxs2)
dom.DumpFileGcCheckerLoop()
dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey)

Expand Down
Loading