Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: do not copy and paste the code for saving statistics #55046

Merged
merged 16 commits into from
Jul 31, 2024
78 changes: 26 additions & 52 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,70 +393,44 @@ func (e *AnalyzeExec) handleResultsError(
partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency)
// If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
dom := domain.GetDomain(e.Ctx())
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
if partitionStatsConcurrency > 1 {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
dom := domain.GetDomain(e.Ctx())
// FIXME: Since we don't use it either to save analysis results or to store job history, it has no effect. Please remove this :(
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
warningMessage := "Insufficient sessions to save analyze results. Consider increasing the 'analyze-partition-concurrency-quota' configuration to improve analyze performance. " +
"This value should typically be greater than or equal to the 'tidb_analyze_partition_concurrency' variable."
if len(subSctxs) < partitionStatsConcurrency {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(warningMessage))
logutil.BgLogger().Warn(
warningMessage,
zap.Int("sessionCount", len(subSctxs)),
zap.Int("needSessionCount", partitionStatsConcurrency),
)
}
if len(subSctxs) > 0 {
sessionCount := len(subSctxs)
logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
return err
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
}
}
logutil.BgLogger().Info("use single session to save analyze results")
failpoint.Inject("handleResultsErrorSingleThreadPanic", nil)
tableIDs := map[int64]struct{}{}
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
panicCnt := 0
for panicCnt < concurrency {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.Ctx(), results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if err1 := statsHandle.SaveTableStatsToStorage(results, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot, handleutil.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err), zap.Int64("tableID", tableID))
finishJobWithLog(e.Ctx(), results.Job, err)
} else {
finishJobWithLog(e.Ctx(), results.Job, nil)
}
if err := e.Ctx().GetSessionVars().SQLKiller.HandleSignal(); err != nil {
finishJobWithLog(e.Ctx(), results.Job, err)
results.DestroyAndPutToPool()
return err
}
results.DestroyAndPutToPool()
}
// Dump stats to historical storage.
for tableID := range tableIDs {
if err := recordHistoricalStats(e.Ctx(), tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return err
subSctxs := []sessionctx.Context{e.Ctx()}
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool,
func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
ctx context.Context,
statsConcurrency int,
needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
globalStatsMap globalStatsMap,
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := len(subSctxs)

wg := util.NewWaitGroupPool(e.gp)
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
}()
for results := range worker.resultsCh {
if err := worker.killer.HandleSignal(); err != nil {
finishJobWithLog(worker.sctx, results.Job, err)
results.DestroyAndPutToPool()
worker.errCh <- err
return
}
Expand Down