diff --git a/executor/adapter.go b/executor/adapter.go index 6083cd05f9f1c..5d076c2f4b456 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -61,6 +61,7 @@ import ( "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/prometheus/client_golang/prometheus" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" @@ -148,7 +149,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog()), zap.Stack("stack")) }() - err = Next(ctx, a.executor, req) + err = a.stmt.next(ctx, a.executor, req) if err != nil { a.lastErr = err return err @@ -216,6 +217,17 @@ type ExecStmt struct { retryCount uint retryStartTime time.Time + // Phase durations are splited into two parts: 1. trying to lock keys (but + // failed); 2. the final iteration of the retry loop. Here we use + // [2]time.Duration to record such info for each phase. The first duration + // is increased only within the current iteration. When we meet a + // pessimistic lock error and decide to retry, we add the first duration to + // the second and reset the first to 0 by calling `resetPhaseDurations`. + phaseBuildDurations [2]time.Duration + phaseOpenDurations [2]time.Duration + phaseNextDurations [2]time.Duration + phaseLockDurations [2]time.Duration + // OutputNames will be set if using cached plan OutputNames []*types.FieldName PsStmt *plannercore.CachedPrepareStmt @@ -425,7 +437,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { ctx = a.observeStmtBeginForTopSQL(ctx) breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) - if err = e.Open(ctx); err != nil { + if err = a.openExecutor(ctx, e); err != nil { terror.Call(e.Close) return nil, err } @@ -625,7 +637,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor var err error req := newFirstChunk(e) for { - err = Next(ctx, e, req) + err = a.next(ctx, e, req) if err != nil { // Handle 'write conflict' error. break @@ -671,7 +683,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex } } - err = Next(ctx, e, newFirstChunk(e)) + err = a.next(ctx, e, newFirstChunk(e)) if err != nil { return nil, err } @@ -724,6 +736,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats) startLocking := time.Now() err = txn.LockKeys(ctx, lockCtx, keys...) + a.phaseLockDurations[0] += time.Since(startLocking) if lockKeyStats != nil { seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats) } @@ -789,6 +802,8 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) + a.resetPhaseDurations() + e, err := a.buildExecutor() if err != nil { return nil, err @@ -802,7 +817,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true) }) - if err = e.Open(ctx); err != nil { + if err = a.openExecutor(ctx, e); err != nil { return nil, err } return e, nil @@ -816,6 +831,7 @@ type pessimisticTxn interface { // buildExecutor build an executor from plan, prepared statement may need additional procedure. func (a *ExecStmt) buildExecutor() (Executor, error) { + defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now()) ctx := a.Ctx stmtCtx := ctx.GetSessionVars().StmtCtx if _, ok := a.Plan.(*plannercore.Execute); !ok { @@ -858,6 +874,31 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return e, nil } +func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { + start := time.Now() + err := e.Open(ctx) + a.phaseOpenDurations[0] += time.Since(start) + return err +} + +func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error { + start := time.Now() + err := Next(ctx, e, req) + a.phaseNextDurations[0] += time.Since(start) + return err +} + +func (a *ExecStmt) resetPhaseDurations() { + a.phaseBuildDurations[1] += a.phaseBuildDurations[0] + a.phaseBuildDurations[0] = 0 + a.phaseOpenDurations[1] += a.phaseOpenDurations[0] + a.phaseOpenDurations[0] = 0 + a.phaseNextDurations[1] += a.phaseNextDurations[0] + a.phaseNextDurations[0] = 0 + a.phaseLockDurations[1] += a.phaseLockDurations[0] + a.phaseLockDurations[0] = 0 +} + // QueryReplacer replaces new line and tab for grep result including query string. var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ") @@ -896,12 +937,130 @@ func FormatSQL(sql string) stringutil.StringerFunc { } } +const ( + phaseBuildLocking = "build:locking" + phaseOpenLocking = "open:locking" + phaseNextLocking = "next:locking" + phaseLockLocking = "lock:locking" + phaseBuildFinal = "build:final" + phaseOpenFinal = "open:final" + phaseNextFinal = "next:final" + phaseLockFinal = "lock:final" + phaseCommitPrewrite = "commit:prewrite" + phaseCommitCommit = "commit:commit" + phaseCommitWaitCommitTS = "commit:wait:commit-ts" + phaseCommitWaitLatestTS = "commit:wait:latest-ts" + phaseCommitWaitLatch = "commit:wait:local-latch" + phaseCommitWaitBinlog = "commit:wait:prewrite-binlog" + phaseWriteResponse = "write-response" +) + var ( sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK) + + // pre-define observers for non-internal queries + execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildLocking, "0") + execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenLocking, "0") + execNextLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseNextLocking, "0") + execLockLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseLockLocking, "0") + execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildFinal, "0") + execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenFinal, "0") + execNextFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseNextFinal, "0") + execLockFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseLockFinal, "0") + execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitPrewrite, "0") + execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitCommit, "0") + execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitCommitTS, "0") + execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0") + execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0") + execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0") + execWriteResponse = metrics.ExecPhaseDuration.WithLabelValues(phaseWriteResponse, "0") ) +func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { + if internal { + return metrics.ExecPhaseDuration.WithLabelValues(phase, "1") + } + switch phase { + case phaseBuildLocking: + return execBuildLocking + case phaseOpenLocking: + return execOpenLocking + case phaseNextLocking: + return execNextLocking + case phaseLockLocking: + return execLockLocking + case phaseBuildFinal: + return execBuildFinal + case phaseOpenFinal: + return execOpenFinal + case phaseNextFinal: + return execNextFinal + case phaseLockFinal: + return execLockFinal + case phaseCommitPrewrite: + return execCommitPrewrite + case phaseCommitCommit: + return execCommitCommit + case phaseCommitWaitCommitTS: + return execCommitWaitCommitTS + case phaseCommitWaitLatestTS: + return execCommitWaitLatestTS + case phaseCommitWaitLatch: + return execCommitWaitLatch + case phaseCommitWaitBinlog: + return execCommitWaitBinlog + case phaseWriteResponse: + return execWriteResponse + default: + return metrics.ExecPhaseDuration.WithLabelValues(phase, "0") + } +} + +func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { + for _, it := range []struct { + duration time.Duration + phase string + }{ + {a.phaseBuildDurations[0], phaseBuildFinal}, + {a.phaseBuildDurations[1], phaseBuildLocking}, + {a.phaseOpenDurations[0], phaseOpenFinal}, + {a.phaseOpenDurations[1], phaseOpenLocking}, + {a.phaseNextDurations[0], phaseNextFinal}, + {a.phaseNextDurations[1], phaseNextLocking}, + {a.phaseLockDurations[0], phaseLockFinal}, + {a.phaseLockDurations[1], phaseLockLocking}, + } { + if it.duration > 0 { + getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) + } + } + if commitDetails != nil { + for _, it := range []struct { + duration time.Duration + phase string + }{ + {commitDetails.PrewriteTime, phaseCommitPrewrite}, + {commitDetails.CommitTime, phaseCommitCommit}, + {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, + {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, + {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, + {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, + } { + if it.duration > 0 { + getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) + } + } + } + if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil { + d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration + if d > 0 { + getPhaseDurationObserver(phaseWriteResponse, internal).Observe(d.Seconds()) + } + } +} + // FinishExecuteStmt is used to record some information after `ExecStmt` execution finished: // 1. record slow log if needed. // 2. record summary statement. @@ -946,6 +1105,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo } sessVars.PrevStmt = FormatSQL(a.GetTextToLog()) + a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail) executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile if sessVars.InRestrictedSQL { sessionExecuteRunDurationInternal.Observe(executeDuration.Seconds()) diff --git a/metrics/executor.go b/metrics/executor.go index 6702832ebe9ac..7a8aa294075e9 100644 --- a/metrics/executor.go +++ b/metrics/executor.go @@ -46,4 +46,13 @@ var ( Name: "statement_db_total", Help: "Counter of StmtNode by Database.", }, []string{LblDb, LblType}) + + // ExecPhaseDuration records the duration of each execution phase. + ExecPhaseDuration = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "tidb", + Subsystem: "executor", + Name: "phase_duration_seconds", + Help: "Summary of each execution phase duration.", + }, []string{LblPhase, LblInternal}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index dc358f1f078f4..37e63fa41f281 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -150,6 +150,7 @@ func RegisterMetrics() { prometheus.MustRegister(StatsInaccuracyRate) prometheus.MustRegister(StmtNodeCounter) prometheus.MustRegister(DbStmtNodeCounter) + prometheus.MustRegister(ExecPhaseDuration) prometheus.MustRegister(StoreQueryFeedbackCounter) prometheus.MustRegister(TimeJumpBackCounter) prometheus.MustRegister(TransactionDuration) diff --git a/metrics/session.go b/metrics/session.go index c1349eccd2c45..bd949edd51d45 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -81,7 +81,7 @@ var ( Name: "transaction_statement_num", Help: "Bucketed histogram of statements count in each transaction.", Buckets: prometheus.ExponentialBuckets(1, 2, 16), // 1 ~ 32768 - }, []string{LbTxnMode, LblType}) + }, []string{LblTxnMode, LblType}) TransactionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -90,7 +90,7 @@ var ( Name: "transaction_duration_seconds", Help: "Bucketed histogram of a transaction execution duration, including retry.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days - }, []string{LbTxnMode, LblType}) + }, []string{LblTxnMode, LblType}) StatementDeadlockDetectDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -169,7 +169,7 @@ const ( LblCoprType = "copr_type" LblGeneral = "general" LblInternal = "internal" - LbTxnMode = "txn_mode" + LblTxnMode = "txn_mode" LblPessimistic = "pessimistic" LblOptimistic = "optimistic" LblStore = "store" @@ -187,5 +187,6 @@ const ( LblCommitting = "committing" LblRollingBack = "rolling_back" LblHasLock = "has_lock" + LblPhase = "phase" LblModule = "module" ) diff --git a/server/conn.go b/server/conn.go index 76328d27d2151..5495013ac81e9 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1465,7 +1465,18 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) { } func (cc *clientConn) flush(ctx context.Context) error { + var ( + stmtDetail *execdetails.StmtExecDetails + startTime time.Time + ) + if stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey); stmtDetailRaw != nil { + stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) + startTime = time.Now() + } defer func() { + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(startTime) + } trace.StartRegion(ctx, "FlushClientConn").End() if ctx := cc.getCtx(); ctx != nil && ctx.WarningCount() > 0 { for _, err := range ctx.GetWarnings() { @@ -2208,6 +2219,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool req := rs.NewChunk(cc.chunkAlloc) gotColumnInfo := false firstNext := true + var start time.Time var stmtDetail *execdetails.StmtExecDetails stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey) if stmtDetailRaw != nil { @@ -2234,9 +2246,15 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool // We need to call Next before we get columns. // Otherwise, we will get incorrect columns info. columns := rs.Columns() + if stmtDetail != nil { + start = time.Now() + } if err = cc.writeColumnInfo(columns, serverStatus); err != nil { return false, err } + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } gotColumnInfo = true } rowCount := req.NumRows() @@ -2244,7 +2262,9 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool break } reg := trace.StartRegion(ctx, "WriteClientConn") - start := time.Now() + if stmtDetail != nil { + start = time.Now() + } for i := 0; i < rowCount; i++ { data = data[0:4] if binary { @@ -2266,7 +2286,14 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool stmtDetail.WriteSQLRespDuration += time.Since(start) } } - return false, cc.writeEOF(serverStatus) + if stmtDetail != nil { + start = time.Now() + } + err := cc.writeEOF(serverStatus) + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } + return false, err } // writeChunksWithFetchSize writes data from a Chunk, which filled data by a ResultSet, into a connection. @@ -2323,8 +2350,13 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if stmtDetailRaw != nil { stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) } - start := time.Now() - var err error + var ( + err error + start time.Time + ) + if stmtDetail != nil { + start = time.Now() + } for _, row := range curRows { data = data[0:4] data, err = dumpBinaryRow(data, rs.Columns(), row, cc.rsEncoder) @@ -2341,7 +2373,14 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if cl, ok := rs.(fetchNotifier); ok { cl.OnFetchReturned() } - return cc.writeEOF(serverStatus) + if stmtDetail != nil { + start = time.Now() + } + err = cc.writeEOF(serverStatus) + if stmtDetail != nil { + stmtDetail.WriteSQLRespDuration += time.Since(start) + } + return err } func (cc *clientConn) setConn(conn net.Conn) {