From 81a527454ebbb500cbfff348bbbc7c1032886a5e Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 4 Jul 2022 01:59:34 +0000 Subject: [PATCH 1/8] executor,metrics: add a metric for observing execution phases Signed-off-by: zyguan --- executor/adapter.go | 152 +++++++++++++++++++++++++++++++- metrics/executor.go | 9 ++ metrics/metrics.go | 1 + metrics/session.go | 7 +- util/execdetails/execdetails.go | 11 ++- 5 files changed, 174 insertions(+), 6 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 89980ab3779ae..eeacdef7c9f7e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -217,6 +217,11 @@ type ExecStmt struct { retryCount uint retryStartTime time.Time + phaseBuildDurations [2]time.Duration + phaseOpenDurations [2]time.Duration + phaseNextDurations [2]time.Duration + phaseLockDurations [2]time.Duration + // OutputNames will be set if using cached plan OutputNames []*types.FieldName PsStmt *plannercore.CachedPrepareStmt @@ -422,7 +427,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { ctx = a.observeStmtBeginForTopSQL(ctx) breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) - if err = e.Open(ctx); err != nil { + if err = a.openExecutor(ctx, e); err != nil { terror.Call(e.Close) return nil, err } @@ -721,6 +726,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats) startLocking := time.Now() err = txn.LockKeys(ctx, lockCtx, keys...) + a.phaseLockDurations[0] += time.Since(startLocking) if lockKeyStats != nil { seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats) } @@ -786,6 +792,9 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) + a.updateNextDuration() + a.resetPhaseDurations() + e, err := a.buildExecutor() if err != nil { return nil, err @@ -799,7 +808,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true) }) - if err = e.Open(ctx); err != nil { + if err = a.openExecutor(ctx, e); err != nil { return nil, err } return e, nil @@ -813,6 +822,7 @@ type pessimisticTxn interface { // buildExecutor build an executor from plan, prepared statement may need additional procedure. func (a *ExecStmt) buildExecutor() (Executor, error) { + defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now()) ctx := a.Ctx stmtCtx := ctx.GetSessionVars().StmtCtx if _, ok := a.Plan.(*plannercore.Execute); !ok { @@ -854,6 +864,31 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return e, nil } +func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { + start := time.Now() + err := e.Open(ctx) + a.phaseOpenDurations[0] += time.Since(start) + return err +} + +func (a *ExecStmt) updateNextDuration() { + if statsColl := a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl; statsColl != nil { + stats := statsColl.GetRootStats(a.Plan.ID()) + a.phaseNextDurations[0] = time.Duration(stats.GetTime()) - a.phaseNextDurations[1] + } +} + +func (a *ExecStmt) resetPhaseDurations() { + a.phaseBuildDurations[1] += a.phaseBuildDurations[0] + a.phaseBuildDurations[0] = 0 + a.phaseOpenDurations[1] += a.phaseOpenDurations[0] + a.phaseOpenDurations[0] = 0 + a.phaseNextDurations[1] += a.phaseNextDurations[0] + a.phaseNextDurations[0] = 0 + a.phaseLockDurations[1] += a.phaseLockDurations[0] + a.phaseLockDurations[0] = 0 +} + // QueryReplacer replaces new line and tab for grep result including query string. var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ") @@ -896,8 +931,119 @@ var ( sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK) + + execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues("build:locking", "0") + execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues("open:locking", "0") + execNextLocking = metrics.ExecPhaseDuration.WithLabelValues("next:locking", "0") + execLockLocking = metrics.ExecPhaseDuration.WithLabelValues("lock:locking", "0") + execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues("build:final", "0") + execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues("open:final", "0") + execNextFinal = metrics.ExecPhaseDuration.WithLabelValues("next:final", "0") + execLockFinal = metrics.ExecPhaseDuration.WithLabelValues("lock:final", "0") + + execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues("commit:prewrite", "0") + execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues("commit:commit", "0") + execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:commit-ts", "0") + execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "0") + execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:prewrite-binlog", "0") ) +func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { + if d := a.phaseBuildDurations[0]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("build:final", "1").Observe(d.Seconds()) + } else { + execBuildFinal.Observe(d.Seconds()) + } + } + if d := a.phaseBuildDurations[1]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("build:locking", "1").Observe(d.Seconds()) + } else { + execBuildLocking.Observe(d.Seconds()) + } + } + if d := a.phaseOpenDurations[0]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("open:final", "1").Observe(d.Seconds()) + } else { + execOpenFinal.Observe(d.Seconds()) + } + } + if d := a.phaseOpenDurations[1]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("open:locking", "1").Observe(d.Seconds()) + } else { + execOpenLocking.Observe(d.Seconds()) + } + } + if d := a.phaseNextDurations[0]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("next:final", "1").Observe(d.Seconds()) + } else { + execNextFinal.Observe(d.Seconds()) + } + } + if d := a.phaseNextDurations[1]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("next:locking", "1").Observe(d.Seconds()) + } else { + execNextLocking.Observe(d.Seconds()) + } + } + if d := a.phaseLockDurations[0]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("lock:final", "1").Observe(d.Seconds()) + } else { + execLockFinal.Observe(d.Seconds()) + } + } + if d := a.phaseLockDurations[1]; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("lock:locking", "1").Observe(d.Seconds()) + } else { + execLockLocking.Observe(d.Seconds()) + } + } + if commitDetails != nil { + if d := commitDetails.PrewriteTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:prewrite", "1").Observe(d.Seconds()) + } else { + execCommitPrewrite.Observe(d.Seconds()) + } + } + if d := commitDetails.CommitTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:commit", "1").Observe(d.Seconds()) + } else { + execCommitCommit.Observe(d.Seconds()) + } + } + if d := commitDetails.GetCommitTsTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:wait:commit-ts", "1").Observe(d.Seconds()) + } else { + execCommitWaitCommitTS.Observe(d.Seconds()) + } + } + if d := commitDetails.LocalLatchTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "1").Observe(d.Seconds()) + } else { + execCommitWaitLatch.Observe(d.Seconds()) + } + } + if d := commitDetails.WaitPrewriteBinlogTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:wait:prewrite-binlog", "1").Observe(d.Seconds()) + } else { + execCommitWaitBinlog.Observe(d.Seconds()) + } + } + } +} + // FinishExecuteStmt is used to record some information after `ExecStmt` execution finished: // 1. record slow log if needed. // 2. record summary statement. @@ -937,6 +1083,8 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo } sessVars.PrevStmt = FormatSQL(a.GetTextToLog()) + a.updateNextDuration() + a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail) executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile if sessVars.InRestrictedSQL { sessionExecuteRunDurationInternal.Observe(executeDuration.Seconds()) diff --git a/metrics/executor.go b/metrics/executor.go index 6702832ebe9ac..7a8aa294075e9 100644 --- a/metrics/executor.go +++ b/metrics/executor.go @@ -46,4 +46,13 @@ var ( Name: "statement_db_total", Help: "Counter of StmtNode by Database.", }, []string{LblDb, LblType}) + + // ExecPhaseDuration records the duration of each execution phase. + ExecPhaseDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "tidb", + Subsystem: "executor", + Name: "phase_duration_seconds", + Help: "Summary of each execution phase duration.", + }, []string{LblPhase, LblInternal}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index 19809bd9c85d2..cea49567e7272 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -146,6 +146,7 @@ func RegisterMetrics() { prometheus.MustRegister(StatsInaccuracyRate) prometheus.MustRegister(StmtNodeCounter) prometheus.MustRegister(DbStmtNodeCounter) + prometheus.MustRegister(ExecPhaseDuration) prometheus.MustRegister(StoreQueryFeedbackCounter) prometheus.MustRegister(TimeJumpBackCounter) prometheus.MustRegister(TransactionDuration) diff --git a/metrics/session.go b/metrics/session.go index 4065d57baf992..0377377c6b700 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -81,7 +81,7 @@ var ( Name: "transaction_statement_num", Help: "Bucketed histogram of statements count in each transaction.", Buckets: prometheus.ExponentialBuckets(1, 2, 16), // 1 ~ 32768 - }, []string{LbTxnMode, LblType}) + }, []string{LblTxnMode, LblType}) TransactionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -90,7 +90,7 @@ var ( Name: "transaction_duration_seconds", Help: "Bucketed histogram of a transaction execution duration, including retry.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days - }, []string{LbTxnMode, LblType}) + }, []string{LblTxnMode, LblType}) StatementDeadlockDetectDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -169,7 +169,7 @@ const ( LblCoprType = "copr_type" LblGeneral = "general" LblInternal = "internal" - LbTxnMode = "txn_mode" + LblTxnMode = "txn_mode" LblPessimistic = "pessimistic" LblOptimistic = "optimistic" LblStore = "store" @@ -187,4 +187,5 @@ const ( LblCommitting = "committing" LblRollingBack = "rolling_back" LblHasLock = "has_lock" + LblPhase = "phase" ) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 23ef49ff56ce1..57c48ff795586 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -484,7 +484,16 @@ type RootRuntimeStats struct { groupRss [][]RuntimeStats } -// GetActRows return total rows of RootRuntimeStats. +// GetTime returns total time of RootRuntimeStats. +func (e *RootRuntimeStats) GetTime() int64 { + t := int64(0) + for _, basic := range e.basics { + t += basic.GetTime() + } + return t +} + +// GetActRows returns total rows of RootRuntimeStats. func (e *RootRuntimeStats) GetActRows() int64 { num := int64(0) for _, basic := range e.basics { From 631c38dc0afd5aada08909129450dd3a3a03096b Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 5 Jul 2022 03:53:34 +0000 Subject: [PATCH 2/8] executor: record next durations more precise Signed-off-by: zyguan --- executor/adapter.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index eeacdef7c9f7e..8193e682440e2 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -148,7 +148,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog()), zap.Stack("stack")) }() - err = Next(ctx, a.executor, req) + err = a.stmt.next(ctx, a.executor, req) if err != nil { a.lastErr = err return err @@ -627,7 +627,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor var err error req := newFirstChunk(e) for { - err = Next(ctx, e, req) + err = a.next(ctx, e, req) if err != nil { // Handle 'write conflict' error. break @@ -673,7 +673,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex } } - err = Next(ctx, e, newFirstChunk(e)) + err = a.next(ctx, e, newFirstChunk(e)) if err != nil { return nil, err } @@ -792,7 +792,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) - a.updateNextDuration() + a.updateNextDurationFromRuntimeStats() a.resetPhaseDurations() e, err := a.buildExecutor() @@ -871,9 +871,21 @@ func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { return err } -func (a *ExecStmt) updateNextDuration() { - if statsColl := a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl; statsColl != nil { - stats := statsColl.GetRootStats(a.Plan.ID()) +func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error { + if a.Plan.ID() > 0 { + return Next(ctx, e, req) + } + // `newBaseExecutor` only set runtime stats for plans with positive ids, + // so we record the next duration here for stmts whose plan ids are 0 (eg. begin). + start := time.Now() + err := Next(ctx, e, req) + a.phaseNextDurations[0] += time.Since(start) + return err +} + +func (a *ExecStmt) updateNextDurationFromRuntimeStats() { + if planID, statsColl := a.Plan.ID(), a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl; planID > 0 && statsColl != nil { + stats := statsColl.GetRootStats(planID) a.phaseNextDurations[0] = time.Duration(stats.GetTime()) - a.phaseNextDurations[1] } } @@ -1083,7 +1095,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo } sessVars.PrevStmt = FormatSQL(a.GetTextToLog()) - a.updateNextDuration() + a.updateNextDurationFromRuntimeStats() a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail) executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile if sessVars.InRestrictedSQL { From 732be488ab955ec712ec921e4523d0af29a5e53f Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 5 Jul 2022 07:01:30 +0000 Subject: [PATCH 3/8] executor: simplify the way to get next duration Signed-off-by: zyguan --- executor/adapter.go | 14 -------------- util/execdetails/execdetails.go | 9 --------- 2 files changed, 23 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 8193e682440e2..a703739306b96 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -792,7 +792,6 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) - a.updateNextDurationFromRuntimeStats() a.resetPhaseDurations() e, err := a.buildExecutor() @@ -872,24 +871,12 @@ func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { } func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error { - if a.Plan.ID() > 0 { - return Next(ctx, e, req) - } - // `newBaseExecutor` only set runtime stats for plans with positive ids, - // so we record the next duration here for stmts whose plan ids are 0 (eg. begin). start := time.Now() err := Next(ctx, e, req) a.phaseNextDurations[0] += time.Since(start) return err } -func (a *ExecStmt) updateNextDurationFromRuntimeStats() { - if planID, statsColl := a.Plan.ID(), a.Ctx.GetSessionVars().StmtCtx.RuntimeStatsColl; planID > 0 && statsColl != nil { - stats := statsColl.GetRootStats(planID) - a.phaseNextDurations[0] = time.Duration(stats.GetTime()) - a.phaseNextDurations[1] - } -} - func (a *ExecStmt) resetPhaseDurations() { a.phaseBuildDurations[1] += a.phaseBuildDurations[0] a.phaseBuildDurations[0] = 0 @@ -1095,7 +1082,6 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo } sessVars.PrevStmt = FormatSQL(a.GetTextToLog()) - a.updateNextDurationFromRuntimeStats() a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail) executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile if sessVars.InRestrictedSQL { diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 57c48ff795586..58599c91d7c7e 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -484,15 +484,6 @@ type RootRuntimeStats struct { groupRss [][]RuntimeStats } -// GetTime returns total time of RootRuntimeStats. -func (e *RootRuntimeStats) GetTime() int64 { - t := int64(0) - for _, basic := range e.basics { - t += basic.GetTime() - } - return t -} - // GetActRows returns total rows of RootRuntimeStats. func (e *RootRuntimeStats) GetActRows() int64 { num := int64(0) From cc0ca978b4fdce8c6e2b7d0fb74638afa7d469a6 Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 5 Jul 2022 07:20:06 +0000 Subject: [PATCH 4/8] revert changes on util/execdetails Signed-off-by: zyguan --- util/execdetails/execdetails.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 58599c91d7c7e..23ef49ff56ce1 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -484,7 +484,7 @@ type RootRuntimeStats struct { groupRss [][]RuntimeStats } -// GetActRows returns total rows of RootRuntimeStats. +// GetActRows return total rows of RootRuntimeStats. func (e *RootRuntimeStats) GetActRows() int64 { num := int64(0) for _, basic := range e.basics { From 1e2bff1204846bd8b4b0a1474e1f061089e66246 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 15 Jul 2022 10:23:24 +0000 Subject: [PATCH 5/8] observe duration of waiting latest-ts Signed-off-by: zyguan --- executor/adapter.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/executor/adapter.go b/executor/adapter.go index 61c48a92371ad..16ba352c94223 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -947,6 +947,7 @@ var ( execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues("commit:prewrite", "0") execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues("commit:commit", "0") execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:commit-ts", "0") + execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:latest-ts", "0") execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "0") execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:prewrite-binlog", "0") ) @@ -1030,6 +1031,13 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm execCommitWaitCommitTS.Observe(d.Seconds()) } } + if d := commitDetails.GetLatestTsTime; d > 0 { + if internal { + metrics.ExecPhaseDuration.WithLabelValues("commit:wait:latest-ts", "1").Observe(d.Seconds()) + } else { + execCommitWaitLatestTS.Observe(d.Seconds()) + } + } if d := commitDetails.LocalLatchTime; d > 0 { if internal { metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "1").Observe(d.Seconds()) From a61399f0c5dafba5aff724fefd79bf8502fbd2f8 Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 19 Jul 2022 05:34:44 +0000 Subject: [PATCH 6/8] add comments for phase durations Signed-off-by: zyguan --- executor/adapter.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/executor/adapter.go b/executor/adapter.go index 16ba352c94223..88ddeae292f5a 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -216,6 +216,12 @@ type ExecStmt struct { retryCount uint retryStartTime time.Time + // Phase durations are splited into two parts: 1. trying to lock keys (but + // failed); 2. the final iteration of the retry loop. Here we use + // [2]time.Duration to record such info for each phase. The first duration + // is increased only whithin the current iteration. When we meet a + // pessimistic lock error and decide to retry, we add the first duration to + // the second and reset the first to 0 by calling `resetPhaseDurations`. phaseBuildDurations [2]time.Duration phaseOpenDurations [2]time.Duration phaseNextDurations [2]time.Duration From 4ccde746e5cf01e70bed1d30d393cc964a0241fd Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 19 Jul 2022 06:37:41 +0000 Subject: [PATCH 7/8] address https://github.com/pingcap/tidb/pull/35906#discussion_r922930455 Signed-off-by: zyguan --- executor/adapter.go | 215 +++++++++++++++++++++----------------------- 1 file changed, 103 insertions(+), 112 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 88ddeae292f5a..758287571e7c6 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/prometheus/client_golang/prometheus" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" @@ -936,127 +937,117 @@ func FormatSQL(sql string) stringutil.StringerFunc { } } +const ( + phaseBuildLocking = "build:locking" + phaseOpenLocking = "open:locking" + phaseNextLocking = "next:locking" + phaseLockLocking = "lock:locking" + phaseBuildFinal = "build:final" + phaseOpenFinal = "open:final" + phaseNextFinal = "next:final" + phaseLockFinal = "lock:final" + phaseCommitPrewrite = "commit:prewrite" + phaseCommitCommit = "commit:commit" + phaseCommitWaitCommitTS = "commit:wait:commit-ts" + phaseCommitWaitLatestTS = "commit:wait:latest-ts" + phaseCommitWaitLatch = "commit:wait:local-latch" + phaseCommitWaitBinlog = "commit:wait:prewrite-binlog" +) + var ( sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK) - execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues("build:locking", "0") - execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues("open:locking", "0") - execNextLocking = metrics.ExecPhaseDuration.WithLabelValues("next:locking", "0") - execLockLocking = metrics.ExecPhaseDuration.WithLabelValues("lock:locking", "0") - execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues("build:final", "0") - execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues("open:final", "0") - execNextFinal = metrics.ExecPhaseDuration.WithLabelValues("next:final", "0") - execLockFinal = metrics.ExecPhaseDuration.WithLabelValues("lock:final", "0") - - execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues("commit:prewrite", "0") - execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues("commit:commit", "0") - execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:commit-ts", "0") - execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:latest-ts", "0") - execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "0") - execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues("commit:wait:prewrite-binlog", "0") + // pre-define observers for non-internal queries + execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildLocking, "0") + execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenLocking, "0") + execNextLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseNextLocking, "0") + execLockLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseLockLocking, "0") + execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildFinal, "0") + execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenFinal, "0") + execNextFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseNextFinal, "0") + execLockFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseLockFinal, "0") + execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitPrewrite, "0") + execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitCommit, "0") + execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitCommitTS, "0") + execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0") + execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0") + execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0") ) -func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { - if d := a.phaseBuildDurations[0]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("build:final", "1").Observe(d.Seconds()) - } else { - execBuildFinal.Observe(d.Seconds()) - } - } - if d := a.phaseBuildDurations[1]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("build:locking", "1").Observe(d.Seconds()) - } else { - execBuildLocking.Observe(d.Seconds()) - } - } - if d := a.phaseOpenDurations[0]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("open:final", "1").Observe(d.Seconds()) - } else { - execOpenFinal.Observe(d.Seconds()) - } - } - if d := a.phaseOpenDurations[1]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("open:locking", "1").Observe(d.Seconds()) - } else { - execOpenLocking.Observe(d.Seconds()) - } - } - if d := a.phaseNextDurations[0]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("next:final", "1").Observe(d.Seconds()) - } else { - execNextFinal.Observe(d.Seconds()) - } - } - if d := a.phaseNextDurations[1]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("next:locking", "1").Observe(d.Seconds()) - } else { - execNextLocking.Observe(d.Seconds()) - } +func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { + if internal { + return metrics.ExecPhaseDuration.WithLabelValues(phase, "1") + } + switch phase { + case phaseBuildLocking: + return execBuildLocking + case phaseOpenLocking: + return execOpenLocking + case phaseNextLocking: + return execNextLocking + case phaseLockLocking: + return execLockLocking + case phaseBuildFinal: + return execBuildFinal + case phaseOpenFinal: + return execOpenFinal + case phaseNextFinal: + return execNextFinal + case phaseLockFinal: + return execLockFinal + case phaseCommitPrewrite: + return execCommitPrewrite + case phaseCommitCommit: + return execCommitCommit + case phaseCommitWaitCommitTS: + return execCommitWaitCommitTS + case phaseCommitWaitLatestTS: + return execCommitWaitLatestTS + case phaseCommitWaitLatch: + return execCommitWaitLatch + case phaseCommitWaitBinlog: + return execCommitWaitBinlog + default: + return metrics.ExecPhaseDuration.WithLabelValues(phase, "0") } - if d := a.phaseLockDurations[0]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("lock:final", "1").Observe(d.Seconds()) - } else { - execLockFinal.Observe(d.Seconds()) - } - } - if d := a.phaseLockDurations[1]; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("lock:locking", "1").Observe(d.Seconds()) - } else { - execLockLocking.Observe(d.Seconds()) - } +} + +func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { + for _, it := range []struct { + duration time.Duration + phase string + }{ + {a.phaseBuildDurations[0], phaseBuildFinal}, + {a.phaseBuildDurations[1], phaseBuildLocking}, + {a.phaseOpenDurations[0], phaseOpenFinal}, + {a.phaseOpenDurations[1], phaseOpenLocking}, + {a.phaseNextDurations[0], phaseNextFinal}, + {a.phaseNextDurations[1], phaseNextLocking}, + {a.phaseLockDurations[0], phaseLockFinal}, + {a.phaseLockDurations[1], phaseLockLocking}, + } { + if it.duration > 0 { + getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) + } + } + if commitDetails == nil { + return } - if commitDetails != nil { - if d := commitDetails.PrewriteTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:prewrite", "1").Observe(d.Seconds()) - } else { - execCommitPrewrite.Observe(d.Seconds()) - } - } - if d := commitDetails.CommitTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:commit", "1").Observe(d.Seconds()) - } else { - execCommitCommit.Observe(d.Seconds()) - } - } - if d := commitDetails.GetCommitTsTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:wait:commit-ts", "1").Observe(d.Seconds()) - } else { - execCommitWaitCommitTS.Observe(d.Seconds()) - } - } - if d := commitDetails.GetLatestTsTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:wait:latest-ts", "1").Observe(d.Seconds()) - } else { - execCommitWaitLatestTS.Observe(d.Seconds()) - } - } - if d := commitDetails.LocalLatchTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:wait:local-latch", "1").Observe(d.Seconds()) - } else { - execCommitWaitLatch.Observe(d.Seconds()) - } - } - if d := commitDetails.WaitPrewriteBinlogTime; d > 0 { - if internal { - metrics.ExecPhaseDuration.WithLabelValues("commit:wait:prewrite-binlog", "1").Observe(d.Seconds()) - } else { - execCommitWaitBinlog.Observe(d.Seconds()) - } + for _, it := range []struct { + duration time.Duration + phase string + }{ + {commitDetails.PrewriteTime, phaseCommitPrewrite}, + {commitDetails.CommitTime, phaseCommitCommit}, + {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, + {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, + {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, + {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, + } { + if it.duration > 0 { + getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) } } } From df8b914af4614605c2c915b9514431161bb54ece Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 19 Jul 2022 10:07:50 +0000 Subject: [PATCH 8/8] observe the duration of writing response Signed-off-by: zyguan --- executor/adapter.go | 41 ++++++++++++++++++++++--------------- server/conn.go | 49 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 758287571e7c6..83b47b51bde05 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -220,7 +220,7 @@ type ExecStmt struct { // Phase durations are splited into two parts: 1. trying to lock keys (but // failed); 2. the final iteration of the retry loop. Here we use // [2]time.Duration to record such info for each phase. The first duration - // is increased only whithin the current iteration. When we meet a + // is increased only within the current iteration. When we meet a // pessimistic lock error and decide to retry, we add the first duration to // the second and reset the first to 0 by calling `resetPhaseDurations`. phaseBuildDurations [2]time.Duration @@ -952,6 +952,7 @@ const ( phaseCommitWaitLatestTS = "commit:wait:latest-ts" phaseCommitWaitLatch = "commit:wait:local-latch" phaseCommitWaitBinlog = "commit:wait:prewrite-binlog" + phaseWriteResponse = "write-response" ) var ( @@ -974,6 +975,7 @@ var ( execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0") execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0") execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0") + execWriteResponse = metrics.ExecPhaseDuration.WithLabelValues(phaseWriteResponse, "0") ) func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { @@ -1009,6 +1011,8 @@ func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { return execCommitWaitLatch case phaseCommitWaitBinlog: return execCommitWaitBinlog + case phaseWriteResponse: + return execWriteResponse default: return metrics.ExecPhaseDuration.WithLabelValues(phase, "0") } @@ -1032,22 +1036,27 @@ func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.Comm getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) } } - if commitDetails == nil { - return + if commitDetails != nil { + for _, it := range []struct { + duration time.Duration + phase string + }{ + {commitDetails.PrewriteTime, phaseCommitPrewrite}, + {commitDetails.CommitTime, phaseCommitCommit}, + {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, + {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, + {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, + {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, + } { + if it.duration > 0 { + getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) + } + } } - for _, it := range []struct { - duration time.Duration - phase string - }{ - {commitDetails.PrewriteTime, phaseCommitPrewrite}, - {commitDetails.CommitTime, phaseCommitCommit}, - {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, - {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, - {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, - {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, - } { - if it.duration > 0 { - getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) + if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil { + d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration + if d > 0 { + getPhaseDurationObserver(phaseWriteResponse, internal).Observe(d.Seconds()) } } } diff --git a/server/conn.go b/server/conn.go index 76328d27d2151..5495013ac81e9 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1465,7 +1465,18 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) { } func (cc *clientConn) flush(ctx context.Context) error { + var ( + stmtDetail *execdetails.StmtExecDetails + startTime time.Time + ) + if stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey); stmtDetailRaw != nil { + stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) + startTime = time.Now() + } defer func() { + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(startTime) + } trace.StartRegion(ctx, "FlushClientConn").End() if ctx := cc.getCtx(); ctx != nil && ctx.WarningCount() > 0 { for _, err := range ctx.GetWarnings() { @@ -2208,6 +2219,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool req := rs.NewChunk(cc.chunkAlloc) gotColumnInfo := false firstNext := true + var start time.Time var stmtDetail *execdetails.StmtExecDetails stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey) if stmtDetailRaw != nil { @@ -2234,9 +2246,15 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool // We need to call Next before we get columns. // Otherwise, we will get incorrect columns info. columns := rs.Columns() + if stmtDetail != nil { + start = time.Now() + } if err = cc.writeColumnInfo(columns, serverStatus); err != nil { return false, err } + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } gotColumnInfo = true } rowCount := req.NumRows() @@ -2244,7 +2262,9 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool break } reg := trace.StartRegion(ctx, "WriteClientConn") - start := time.Now() + if stmtDetail != nil { + start = time.Now() + } for i := 0; i < rowCount; i++ { data = data[0:4] if binary { @@ -2266,7 +2286,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool stmtDetail.WriteSQLRespDuration += time.Since(start) } } - return false, cc.writeEOF(serverStatus) + if stmtDetail != nil { + start = time.Now() + } + err := cc.writeEOF(serverStatus) + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } + return false, err } // writeChunksWithFetchSize writes data from a Chunk, which filled data by a ResultSet, into a connection. @@ -2323,8 +2350,13 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if stmtDetailRaw != nil { stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) } - start := time.Now() - var err error + var ( + err error + start time.Time + ) + if stmtDetail != nil { + start = time.Now() + } for _, row := range curRows { data = data[0:4] data, err = dumpBinaryRow(data, rs.Columns(), row, cc.rsEncoder) @@ -2341,7 +2373,14 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if cl, ok := rs.(fetchNotifier); ok { cl.OnFetchReturned() } - return cc.writeEOF(serverStatus) + if stmtDetail != nil { + start = time.Now() + } + err = cc.writeEOF(serverStatus) + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } + return err } func (cc *clientConn) setConn(conn net.Conn) {