Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>

add time log

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

fix ctx

Signed-off-by: yisaer <[email protected]>

add log

Signed-off-by: yisaer <[email protected]>

fix

Signed-off-by: yisaer <[email protected]>

fix

Signed-off-by: yisaer <[email protected]>

fix

Signed-off-by: yisaer <[email protected]>

fix

Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Sep 29, 2022
1 parent c5ba655 commit e304632
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 64 deletions.
36 changes: 19 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,15 @@ type Performance struct {
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -902,15 +903,16 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
42 changes: 42 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ type Domain struct {
sysProcesses SysProcesses

mdlCheckTableInfo *mdlCheckTableInfo

analyzeMu struct {
sync.Mutex
sctxs map[sessionctx.Context]bool
}
}

type mdlCheckTableInfo struct {
Expand Down Expand Up @@ -1568,6 +1573,43 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// ReturnAnalyzeExtraExec returned extra exec for Analyze
func (do *Domain) ReturnAnalyzeExtraExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// GetAnalyzeExtraExec get needed extra exec for analyze
func (do *Domain) GetAnalyzeExtraExec(need int) []sessionctx.Context {
count := 0
r := make([]sessionctx.Context, 0)
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for sctx, used := range do.analyzeMu.sctxs {
if used {
continue
}
r = append(r, sctx)
do.analyzeMu.sctxs[sctx] = true
count++
if count >= need {
break
}
}
return r
}

// SetupAnalyzeExtraExec setups extra exec for Analyze
func (do *Domain) SetupAnalyzeExtraExec(ctxs []sessionctx.Context) {
do.analyzeMu.sctxs = make(map[sessionctx.Context]bool)
for _, ctx := range ctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
Expand Down
118 changes: 91 additions & 27 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -188,8 +189,8 @@ func (e *AnalyzeExec) saveV2AnalyzeOpts() error {
return nil
}

func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {
statsHandle := domain.GetDomain(sctx).StatsHandle()
historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
Expand All @@ -198,7 +199,7 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
return nil
}

is := domain.GetDomain(e.ctx).InfoSchema()
is := domain.GetDomain(sctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
Expand All @@ -217,6 +218,18 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.ctx)
subSctxs := dom.GetAnalyzeExtraExec(partitionStatsConcurrency)
if len(subSctxs) > 0 {
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
dom.ReturnAnalyzeExtraExec(subSctxs)
return err
}
}

statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
var err error
Expand All @@ -235,36 +248,16 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
continue
}
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
handleGlobalStats(needGlobalStats, globalStatsMap, results)

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := e.recordHistoricalStats(results.TableID.TableID); err != nil {
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand All @@ -273,6 +266,52 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
return err
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := len(subSctxs)
wg := &sync.WaitGroup{}
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
wg.Add(partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(wg, saveResultsCh, subSctxs[i], errCh)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
go worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.ctx, results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
saveResultsCh <- results
}
close(saveResultsCh)
wg.Wait()
close(errCh)
if len(errCh) > 0 {
errMsg := make([]string, 0)
for err1 := range errCh {
errMsg = append(errMsg, err1.Error())
}
err = errors.New(strings.Join(errMsg, ","))
}
return err
}

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
defer func() {
Expand Down Expand Up @@ -434,3 +473,28 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
}
}

func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, results *statistics.AnalyzeResults) {
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
}
2 changes: 1 addition & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
// Dump stats to historical storage.
if err := e.recordHistoricalStats(globalStatsID.tableID); err != nil {
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand Down
71 changes: 71 additions & 0 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"sync"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type analyzeSaveStatsWorker struct {
wg *sync.WaitGroup
resultsCh <-chan *statistics.AnalyzeResults
sctx sessionctx.Context
errCh chan<- error
checkHistoricalStatsEnable bool
}

func newAnalyzeSaveStatsWorker(wg *sync.WaitGroup,
resultsCh <-chan *statistics.AnalyzeResults,
sctx sessionctx.Context,
errCh chan<- error) *analyzeSaveStatsWorker {
worker := &analyzeSaveStatsWorker{
wg: wg,
resultsCh: resultsCh,
sctx: sctx,
errCh: errCh,
}
return worker
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
defer func() {
worker.wg.Done()
}()
for results := range worker.resultsCh {
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
return
}
}
}
9 changes: 7 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2752,8 +2752,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
runInBootstrapSession(store, upgrade)
}

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 7+concurrency)
ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2832,7 +2833,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
return nil, err
}

subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = ses[7+concurrency+i]
}
dom.SetupAnalyzeExtraExec(subCtxs2)
dom.DumpFileGcCheckerLoop()
dom.LoadSigningCertLoop()

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,9 @@ type SessionVars struct {

// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

// AnalyzePartitionConcurrency indicates concurrency for partitions in Analyze
AnalyzePartitionConcurrency int
}

// GetPreparedStmtByName returns the prepared statement specified by stmtName.
Expand Down
Loading

0 comments on commit e304632

Please sign in to comment.