Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: track cluster-level analyze jobs and make it persistent #32215

Merged
merged 51 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fbd4ce3
add mysql.analyze_jobs
xuyifangreeneyes Feb 9, 2022
851824c
updte table
xuyifangreeneyes Feb 11, 2022
dc571af
Merge branch 'master' into analyze-status
xuyifangreeneyes Feb 14, 2022
284f841
use mysql.analyze_jobs to implement show analyze status
xuyifangreeneyes Feb 15, 2022
c3fa553
add tidb address and proc id in analyze_jobs
xuyifangreeneyes Feb 16, 2022
fe5f331
Merge branch 'master' into analyze-status
xuyifangreeneyes Feb 16, 2022
d584c4b
fix last_insert_id
xuyifangreeneyes Feb 16, 2022
c8df42a
Merge branch 'analyze-status' of https://github.com/xuyifangreeneyes/…
xuyifangreeneyes Feb 16, 2022
a35e874
update job_info; delete outdated analyze jobs
xuyifangreeneyes Feb 21, 2022
521f452
handle error in gcAnalyzeHistory
xuyifangreeneyes Feb 22, 2022
a8a1f36
upd job info
xuyifangreeneyes Feb 22, 2022
ffe91b1
format import
xuyifangreeneyes Feb 22, 2022
b7129a3
upd
xuyifangreeneyes Feb 23, 2022
fb90284
upd
xuyifangreeneyes Feb 23, 2022
0404c48
Merge branch 'master' into analyze-status
xuyifangreeneyes Feb 23, 2022
fcb8c84
resolve conflict
xuyifangreeneyes Mar 7, 2022
27bdbce
resolve conflict
xuyifangreeneyes Mar 7, 2022
474b397
Merge branch 'master' into analyze-status
xuyifangreeneyes Mar 7, 2022
2ec2764
address comment
xuyifangreeneyes Mar 7, 2022
ad032d2
fix build
xuyifangreeneyes Mar 7, 2022
dfcd4a4
fix job_info
xuyifangreeneyes Mar 7, 2022
d2c15fd
fix
xuyifangreeneyes Mar 7, 2022
d2067b6
update processed_rows when finish analyze job
xuyifangreeneyes Mar 8, 2022
42f4c1b
upd
xuyifangreeneyes Mar 8, 2022
1e9250c
Merge branch 'master' into analyze-status
qw4990 Mar 9, 2022
41fee0d
avoid move function position
xuyifangreeneyes Mar 9, 2022
af16ac2
Merge branch 'analyze-status' of https://github.com/xuyifangreeneyes/…
xuyifangreeneyes Mar 9, 2022
e660844
Merge branch 'master' into analyze-status
xuyifangreeneyes Mar 10, 2022
15906fc
refine
xuyifangreeneyes Mar 12, 2022
6ef6ffe
Merge branch 'analyze-status' of https://github.com/xuyifangreeneyes/…
xuyifangreeneyes Mar 12, 2022
fc59504
add test
xuyifangreeneyes Mar 12, 2022
61af62b
refine test
xuyifangreeneyes Mar 12, 2022
331d7c1
add test
xuyifangreeneyes Mar 12, 2022
f9dc28b
refine test
xuyifangreeneyes Mar 12, 2022
8654b93
fmt
xuyifangreeneyes Mar 14, 2022
db5edd6
Merge branch 'master' into analyze-status
xuyifangreeneyes Mar 14, 2022
f646145
fix ut
xuyifangreeneyes Mar 14, 2022
390dc02
fix ut
xuyifangreeneyes Mar 14, 2022
809a203
fix ut
xuyifangreeneyes Mar 14, 2022
dcbe484
resolve conflict
xuyifangreeneyes Mar 28, 2022
60b2b7f
address comment
xuyifangreeneyes Mar 28, 2022
f83b735
add some comments
xuyifangreeneyes Mar 28, 2022
184b1e7
rename process to progress
xuyifangreeneyes Mar 28, 2022
ae1ec41
fix
xuyifangreeneyes Mar 29, 2022
2bb9529
resolve conflict
xuyifangreeneyes Mar 29, 2022
0b063fe
set process_id to null when the analyze job is finished/failed
xuyifangreeneyes Mar 29, 2022
4c538f6
fix ut
xuyifangreeneyes Mar 29, 2022
925574c
add test for checking jobInfo of show analyze status and tiny fix
xuyifangreeneyes Mar 30, 2022
70f195f
resolve conflict
xuyifangreeneyes Mar 31, 2022
dbd5b40
Merge branch 'master' into analyze-status
ti-chi-bot Mar 31, 2022
694be1e
Merge branch 'master' into analyze-status
ti-chi-bot Mar 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,8 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
do.wg.Add(1)
go do.autoAnalyzeWorker(owner)
}
do.wg.Add(1)
go do.gcAnalyzeHistory(owner)
return nil
}

Expand Down Expand Up @@ -1505,6 +1507,33 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
}
}

func (do *Domain) gcAnalyzeHistory(owner owner.Manager) {
defer util.Recover(metrics.LabelDomain, "gcAnalyzeHistory", nil, false)
const gcInterval = time.Hour
statsHandle := do.StatsHandle()
gcTicker := time.NewTicker(gcInterval)
defer func() {
gcTicker.Stop()
do.wg.Done()
logutil.BgLogger().Info("gcAnalyzeHistory exited.")
}()
for {
select {
case <-gcTicker.C:
if owner.IsOwner() {
const DaysToKeep = 7
updateTime := time.Now().AddDate(0, 0, -DaysToKeep)
err := statsHandle.DeleteAnalyzeJobs(updateTime)
if err != nil {
logutil.BgLogger().Warn("gc analyze history failed", zap.Error(err))
}
}
case <-do.exit:
return
}
}
}

// ExpensiveQueryHandle returns the expensive query handle.
func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
Expand Down
125 changes: 100 additions & 25 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
derr "github.com/pingcap/tidb/store/driver/error"
Expand Down Expand Up @@ -101,7 +100,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
for _, task := range e.tasks {
statistics.AddNewAnalyzeJob(task.job)
AddNewAnalyzeJob(e.ctx, task.job)
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
domain.GetDomain(e.ctx).SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID())
Expand Down Expand Up @@ -134,10 +133,16 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
// The meaning of key in map is the structure that used to store the tableID and indexID.
// The meaning of value in map is some additional information needed to build global-level stats.
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)
finishJobWithLogFn := func(ctx context.Context, job *statistics.AnalyzeJob, meetError bool) {
job.Finish(meetError)
finishJobWithLogFn := func(ctx context.Context, job *statistics.AnalyzeJob, analyzeErr error) {
FinishAnalyzeJob(e.ctx, job, analyzeErr)
if job != nil {
logutil.Logger(ctx).Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, job.State),
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.Logger(ctx).Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
Expand All @@ -157,7 +162,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLogFn(ctx, results.Job, true)
finishJobWithLogFn(ctx, results.Job, err)
continue
}
if results.TableID.IsPartitionTable() && needGlobalStats {
Expand Down Expand Up @@ -185,18 +190,15 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable() && needGlobalStats); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLogFn(ctx, results.Job, true)
finishJobWithLogFn(ctx, results.Job, err)
} else {
finishJobWithLogFn(ctx, results.Job, false)
finishJobWithLogFn(ctx, results.Job, nil)
// Dump stats to historical storage.
if err := e.recordHistoricalStats(results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
}
for _, task := range e.tasks {
statistics.MoveToHistory(task.job)
}
failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
domain.GetDomain(e.ctx).SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID())
})
Expand Down Expand Up @@ -358,7 +360,7 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
if !ok {
break
}
task.job.Start()
StartAnalyzeJob(e.ctx, task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushdown(task.colExec)
Expand Down Expand Up @@ -519,7 +521,7 @@ func (e *AnalyzeIndexExec) open(ranges []*ranger.Range, considerNull bool) error
}

func updateIndexResult(
ctx *stmtctx.StatementContext,
ctx sessionctx.Context,
resp *tipb.AnalyzeIndexResp,
job *statistics.AnalyzeJob,
hist *statistics.Histogram,
Expand All @@ -541,9 +543,9 @@ func updateIndexResult(
needCMS := cms != nil
respHist := statistics.HistogramFromProto(resp.Hist)
if job != nil {
job.Update(int64(respHist.TotalRowCount()))
UpdateAnalyzeJob(ctx, job, int64(respHist.TotalRowCount()))
}
hist, err = statistics.MergeHistograms(ctx, hist, respHist, numBuckets, statsVer)
hist, err = statistics.MergeHistograms(ctx.GetSessionVars().StmtCtx, hist, respHist, numBuckets, statsVer)
if err != nil {
return nil, nil, nil, nil, err
}
Expand Down Expand Up @@ -604,7 +606,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if err != nil {
return nil, nil, nil, nil, err
}
hist, cms, fms, topn, err = updateIndexResult(e.ctx.GetSessionVars().StmtCtx, resp, e.job, hist, cms, fms, topn,
hist, cms, fms, topn, err = updateIndexResult(e.ctx, resp, e.job, hist, cms, fms, topn,
e.idxInfo, int(e.opts[ast.AnalyzeOptNumBuckets]), int(e.opts[ast.AnalyzeOptNumTopN]), statsVer)
if err != nil {
return nil, nil, nil, nil, err
Expand Down Expand Up @@ -1136,7 +1138,7 @@ func (e *AnalyzeColumnsExec) handleNDVForSpecialIndexes(indexInfos []*model.Inde
statsConcurrncy, err := getBuildStatsConcurrency(e.ctx)
taskCh := make(chan *analyzeTask, len(tasks))
for _, task := range tasks {
statistics.AddNewAnalyzeJob(task.job)
AddNewAnalyzeJob(e.ctx, task.job)
}
resultsCh := make(chan *statistics.AnalyzeResults, len(tasks))
if len(tasks) < statsConcurrncy {
Expand All @@ -1161,15 +1163,14 @@ func (e *AnalyzeColumnsExec) handleNDVForSpecialIndexes(indexInfos []*model.Inde
break
}
if results.Err != nil {
results.Job.Finish(true)
err = results.Err
FinishAnalyzeJob(e.ctx, results.Job, err)
if err == errAnalyzeWorkerPanic {
panicCnt++
}
continue
}
results.Job.Finish(false)
statistics.MoveToHistory(results.Job)
FinishAnalyzeJob(e.ctx, results.Job, nil)
totalResult.results[results.Ars[0].Hist[0].ID] = results
}
if err != nil {
Expand Down Expand Up @@ -1200,7 +1201,7 @@ func (e *AnalyzeColumnsExec) subIndexWorkerForNDV(taskCh chan *analyzeTask, resu
if !ok {
break
}
task.job.Start()
StartAnalyzeJob(e.ctx, task.job)
if task.taskType != idxTask {
resultsCh <- &statistics.AnalyzeResults{
Err: errors.Errorf("incorrect analyze type"),
Expand Down Expand Up @@ -1325,7 +1326,7 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult
}
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
subCollector.Base().FromProto(colResp.RowCollector)
e.job.Update(subCollector.Base().Count)
UpdateAnalyzeJob(e.ctx, e.job, subCollector.Base().Count)
retCollector.MergeCollector(subCollector)
}
resultCh <- &samplingMergeResult{collector: retCollector}
Expand Down Expand Up @@ -1503,7 +1504,6 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
if data == nil {
break
}
sc := e.ctx.GetSessionVars().StmtCtx
var colResp *tipb.AnalyzeColumnsResp
if e.analyzePB.Tp == tipb.AnalyzeType_TypeMixed {
resp := &tipb.AnalyzeMixedResp{}
Expand All @@ -1512,7 +1512,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
return nil, nil, nil, nil, nil, err
}
colResp = resp.ColumnsResp
handleHist, handleCms, handleFms, handleTopn, err = updateIndexResult(sc, resp.IndexResp, nil, handleHist,
handleHist, handleCms, handleFms, handleTopn, err = updateIndexResult(e.ctx, resp.IndexResp, nil, handleHist,
handleCms, handleFms, handleTopn, e.commonHandle, int(e.opts[ast.AnalyzeOptNumBuckets]),
int(e.opts[ast.AnalyzeOptNumTopN]), statsVer)

Expand All @@ -1523,6 +1523,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
colResp = &tipb.AnalyzeColumnsResp{}
err = colResp.Unmarshal(data)
}
sc := e.ctx.GetSessionVars().StmtCtx
rowCount := int64(0)
if hasPkHist(e.handleCols) {
respHist := statistics.HistogramFromProto(colResp.PkHist)
Expand All @@ -1537,7 +1538,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
rowCount = respSample.Count + respSample.NullCount
collectors[i].MergeSampleCollector(sc, respSample)
}
e.job.Update(rowCount)
UpdateAnalyzeJob(e.ctx, e.job, rowCount)
}
timeZone := e.ctx.GetSessionVars().Location()
if hasPkHist(e.handleCols) {
Expand Down Expand Up @@ -2321,6 +2322,80 @@ func analyzePKIncremental(colExec *analyzePKIncrementalExec) *statistics.Analyze
}
}

// AddNewAnalyzeJob records the new analyze job.
func AddNewAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil {
return
}
statsHandle := domain.GetDomain(ctx).StatsHandle()
err := statsHandle.InsertAnalyzeJob(job, ctx.GetSessionVars().ConnectionID)
if err != nil {
logutil.BgLogger().Error("failed to insert analyze job", zap.Error(err))
}
}

// StartAnalyzeJob marks the state of the analyze job as running and sets the start time.
func StartAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob) {
if job == nil || job.ID == nil {
return
}
job.StartTime = time.Now()
job.Progress.SetLastDumpTime(job.StartTime)
exec := ctx.(sqlexec.RestrictedSQLExecutor)
const sql = "UPDATE mysql.analyze_jobs SET start_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(context.TODO(), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, job.StartTime.UTC().Format(types.TimeFormat), statistics.AnalyzeRunning, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzePending, statistics.AnalyzeRunning)), zap.Error(err))
}
}

// UpdateAnalyzeJob updates count of the processed rows when increment reaches a threshold.
func UpdateAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob, rowCount int64) {
if job == nil || job.ID == nil {
return
}
delta := job.Progress.Update(rowCount)
if delta == 0 {
return
}
Comment on lines +2358 to +2360
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen? the last updating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta == 0 means that job.Delta.Count doesn't reach threshold so we don't need to dump it. I forget to handle the last updating. Fix it by update processed_rows when calling finishAnalyzeJob.

exec := ctx.(sqlexec.RestrictedSQLExecutor)
const sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %? WHERE id = %?"
_, _, err := exec.ExecRestrictedSQL(context.TODO(), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, delta, *job.ID)
if err != nil {
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("process %v rows", delta)), zap.Error(err))
}
}

// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time.
func FinishAnalyzeJob(ctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}
job.EndTime = time.Now()
var sql string
var args []interface{}
// process_id is used to see which process is running the analyze job and kill the analyze job. After the analyze job
// is finished(or failed), process_id is useless and we set it to NULL to avoid `kill tidb process_id` wrongly.
if analyzeErr != nil {
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, analyzeErr.Error(), *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET processed_rows = processed_rows + %?, end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.Progress.GetDeltaCount(), job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := ctx.(sqlexec.RestrictedSQLExecutor)
_, _, err := exec.ExecRestrictedSQL(context.TODO(), []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
}

// analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup
// Please add all goroutine count when to `Add` to avoid exiting in advance.
type analyzeResultsNotifyWaitGroupWrapper struct {
Expand Down
Loading