Skip to content

Commit

Permalink
statistics: move history-related functions into the stats handle (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored and hawkingrei committed Aug 8, 2024
1 parent b608332 commit 5e6c732
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 176 deletions.
157 changes: 8 additions & 149 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -41,12 +40,10 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
Expand Down Expand Up @@ -165,7 +162,7 @@ TASKLOOP:
})
// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
if needGlobalStats {
err = e.handleGlobalStats(globalStatsMap)
err = e.handleGlobalStats(statsHandle, globalStatsMap)
if err != nil {
return err
}
Expand Down Expand Up @@ -432,15 +429,15 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := len(subSctxs)

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
wg := util.NewWaitGroupPool(e.gp)
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh, &e.Ctx().GetSessionVars().SQLKiller)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
wg.Run(func() {
worker.run(ctx1, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot)
worker.run(ctx1, statsHandle, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot)
})
}
tableIDs := map[int64]struct{}{}
Expand All @@ -462,7 +459,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.Ctx(), results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
Expand Down Expand Up @@ -490,6 +487,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -513,7 +511,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
break
}
failpoint.Inject("handleAnalyzeWorkerPanic", nil)
StartAnalyzeJob(e.Ctx(), task.job)
statsHandle.StartAnalyzeJob(task.job)
switch task.taskType {
case colTask:
select {
Expand Down Expand Up @@ -568,147 +566,8 @@ func AddNewAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) {
}
}

// StartAnalyzeJob marks the state of the analyze job as running and sets the start time.
func StartAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil || job.ID == nil {
return
}
job.StartTime = time.Now()
job.Progress.SetLastDumpTime(job.StartTime)
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("StartAnalyzeJob",
zap.Time("start_time", job.StartTime),
zap.Uint64("job id", *job.ID),
)
}
})
}

// UpdateAnalyzeJob updates count of the processed rows when increment reaches a threshold.
func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) {
if job == nil || job.ID == nil {
return
}
delta := job.Progress.Update(rowCount)
if delta == 0 {
return
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, delta, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("UpdateAnalyzeJob",
zap.Int64("increase processed_rows", delta),
zap.Uint64("job id", *job.ID),
)
}
})
}

// FinishAnalyzeMergeJob finishes analyze merge job
func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}

job.EndTime = time.Now()
var sql string
var args []any
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
}
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []any{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("FinishAnalyzeMergeJob",
zap.Time("end_time", job.EndTime),
zap.Uint64("job id", *job.ID),
)
}
})
}

// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time.
func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}
job.EndTime = time.Now()
var sql string
var args []any
// process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job
// is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly.
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
}
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []any{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := sctx.GetRestrictedSQLExecutor()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
failpoint.Inject("DebugAnalyzeJobOperations", func(val failpoint.Value) {
if val.(bool) {
logutil.BgLogger().Info("FinishAnalyzeJob",
zap.Int64("increase processed_rows", job.Progress.GetDeltaCount()),
zap.Time("end_time", job.EndTime),
zap.Uint64("job id", *job.ID),
zap.Error(analyzeErr),
)
}
})
}

func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
FinishAnalyzeJob(sctx, job, analyzeErr)
func finishJobWithLog(statsHandle *handle.Handle, job *statistics.AnalyzeJob, analyzeErr error) {
statsHandle.FinishAnalyzeJob(job, analyzeErr, statistics.TableAnalysisJob)
if job != nil {
var state string
if analyzeErr != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
CMSketch: statistics.NewCMSketch(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth])),
}
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for {
failpoint.Inject("mockKillRunningV1AnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
Expand Down Expand Up @@ -228,7 +229,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
rowCount = respSample.Count + respSample.NullCount
collectors[i].MergeSampleCollector(sc, respSample)
}
UpdateAnalyzeJob(e.ctx, e.job, rowCount)
statsHandle.UpdateAnalyzeJobProgress(e.job, rowCount)
}
timeZone := e.ctx.GetSessionVars().Location()
if hasPkHist(e.handleCols) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,20 +473,21 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
results: make(map[int64]*statistics.AnalyzeResults, len(indexInfos)),
}
var err error
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for panicCnt < statsConcurrncy {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
FinishAnalyzeJob(e.ctx, results.Job, err)
statsHandle.FinishAnalyzeJob(results.Job, err, statistics.TableAnalysisJob)
if isAnalyzeWorkerPanic(err) {
panicCnt++
}
continue
}
FinishAnalyzeJob(e.ctx, results.Job, nil)
statsHandle.FinishAnalyzeJob(results.Job, nil, statistics.TableAnalysisJob)
totalResult.results[results.Ars[0].Hist[0].ID] = results
}
if err != nil {
Expand All @@ -498,6 +499,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
// subIndexWorker receive the task for each index and return the result for them.
func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, resultsCh chan *statistics.AnalyzeResults) {
var task *analyzeTask
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -514,7 +516,7 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re
if !ok {
break
}
StartAnalyzeJob(e.ctx, task.job)
statsHandle.StartAnalyzeJob(task.job)
if task.taskType != idxTask {
resultsCh <- &statistics.AnalyzeResults{
Err: errors.Errorf("incorrect analyze type"),
Expand Down Expand Up @@ -628,6 +630,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
for i := 0; i < l; i++ {
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
for {
data, ok := <-taskCh
if !ok {
Expand All @@ -649,7 +652,7 @@ func (e *AnalyzeColumnsExecV2) subMergeWorker(resultCh chan<- *samplingMergeResu
// Update processed rows.
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
subCollector.Base().FromProto(colResp.RowCollector, e.memTracker)
UpdateAnalyzeJob(e.ctx, e.job, subCollector.Base().Count)
statsHandle.UpdateAnalyzeJobProgress(e.job, subCollector.Base().Count)

// Print collect log.
oldRetCollectorSize := retCollector.Base().MemSize
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand All @@ -35,13 +36,12 @@ type globalStatsKey struct {
// The meaning of value in map is some additional information needed to build global-level stats.
type globalStatsMap map[globalStatsKey]statstypes.GlobalStatsInfo

func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
func (e *AnalyzeExec) handleGlobalStats(statsHandle *handle.Handle, globalStatsMap globalStatsMap) error {
globalStatsTableIDs := make(map[int64]struct{}, len(globalStatsMap))
for globalStatsID := range globalStatsMap {
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}

statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
tableIDs := make(map[int64]struct{}, len(globalStatsTableIDs))
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
Expand All @@ -55,7 +55,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
continue
}
AddNewAnalyzeJob(e.Ctx(), job)
StartAnalyzeJob(e.Ctx(), job)
statsHandle.StartAnalyzeJob(job)

mergeStatsErr := func() error {
globalOpts := e.opts
Expand All @@ -76,7 +76,7 @@ func (e *AnalyzeExec) handleGlobalStats(globalStatsMap globalStatsMap) error {
}
return err
}()
FinishAnalyzeMergeJob(e.Ctx(), job, mergeStatsErr)
statsHandle.FinishAnalyzeJob(job, mergeStatsErr, statistics.GlobalStatsMergeJob)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ func updateIndexResult(
needCMS := cms != nil
respHist := statistics.HistogramFromProto(resp.Hist)
if job != nil {
UpdateAnalyzeJob(ctx, job, int64(respHist.TotalRowCount()))
statsHandle := domain.GetDomain(ctx).StatsHandle()
statsHandle.UpdateAnalyzeJobProgress(job, int64(respHist.TotalRowCount()))
}
hist, err = statistics.MergeHistograms(ctx.GetSessionVars().StmtCtx, hist, respHist, numBuckets, statsVer)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package executor
import (
"context"

"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
Expand Down Expand Up @@ -47,7 +47,7 @@ func newAnalyzeSaveStatsWorker(
return worker
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
func (worker *analyzeSaveStatsWorker) run(ctx context.Context, statsHandle *handle.Handle, analyzeSnapshot bool) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
Expand All @@ -56,19 +56,18 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
}()
for results := range worker.resultsCh {
if err := worker.killer.HandleSignal(); err != nil {
finishJobWithLog(worker.sctx, results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
results.DestroyAndPutToPool()
worker.errCh <- err
return
}
statsHandle := domain.GetDomain(worker.sctx).StatsHandle()
err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
finishJobWithLog(statsHandle, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
finishJobWithLog(statsHandle, results.Job, nil)
}
results.DestroyAndPutToPool()
if err != nil {
Expand Down
Loading

0 comments on commit 5e6c732

Please sign in to comment.