diff --git a/executor/adapter.go b/executor/adapter.go index 6c8e9015eec97..1bf4a920a0f8b 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -61,7 +61,6 @@ import ( "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" - "github.com/prometheus/client_golang/prometheus" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" @@ -149,7 +148,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog()), zap.Stack("stack")) }() - err = a.stmt.next(ctx, a.executor, req) + err = Next(ctx, a.executor, req) if err != nil { a.lastErr = err return err @@ -217,17 +216,6 @@ type ExecStmt struct { retryCount uint retryStartTime time.Time - // Phase durations are splited into two parts: 1. trying to lock keys (but - // failed); 2. the final iteration of the retry loop. Here we use - // [2]time.Duration to record such info for each phase. The first duration - // is increased only within the current iteration. When we meet a - // pessimistic lock error and decide to retry, we add the first duration to - // the second and reset the first to 0 by calling `resetPhaseDurations`. - phaseBuildDurations [2]time.Duration - phaseOpenDurations [2]time.Duration - phaseNextDurations [2]time.Duration - phaseLockDurations [2]time.Duration - // OutputNames will be set if using cached plan OutputNames []*types.FieldName PsStmt *plannercore.CachedPrepareStmt @@ -437,7 +425,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { ctx = a.observeStmtBeginForTopSQL(ctx) breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) - if err = a.openExecutor(ctx, e); err != nil { + if err = e.Open(ctx); err != nil { terror.Call(e.Close) return nil, err } @@ -637,7 +625,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor var err error req := newFirstChunk(e) for { - err = a.next(ctx, e, req) + err = Next(ctx, e, req) if err != nil { // Handle 'write conflict' error. break @@ -683,7 +671,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex } } - err = a.next(ctx, e, newFirstChunk(e)) + err = Next(ctx, e, newFirstChunk(e)) if err != nil { return nil, err } @@ -736,7 +724,6 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats) startLocking := time.Now() err = txn.LockKeys(ctx, lockCtx, keys...) - a.phaseLockDurations[0] += time.Since(startLocking) if lockKeyStats != nil { seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats) } @@ -802,8 +789,6 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError) - a.resetPhaseDurations() - e, err := a.buildExecutor() if err != nil { return nil, err @@ -817,7 +802,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true) }) - if err = a.openExecutor(ctx, e); err != nil { + if err = e.Open(ctx); err != nil { return nil, err } return e, nil @@ -831,7 +816,6 @@ type pessimisticTxn interface { // buildExecutor build an executor from plan, prepared statement may need additional procedure. func (a *ExecStmt) buildExecutor() (Executor, error) { - defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now()) ctx := a.Ctx stmtCtx := ctx.GetSessionVars().StmtCtx if _, ok := a.Plan.(*plannercore.Execute); !ok { @@ -874,31 +858,6 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return e, nil } -func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error { - start := time.Now() - err := e.Open(ctx) - a.phaseOpenDurations[0] += time.Since(start) - return err -} - -func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error { - start := time.Now() - err := Next(ctx, e, req) - a.phaseNextDurations[0] += time.Since(start) - return err -} - -func (a *ExecStmt) resetPhaseDurations() { - a.phaseBuildDurations[1] += a.phaseBuildDurations[0] - a.phaseBuildDurations[0] = 0 - a.phaseOpenDurations[1] += a.phaseOpenDurations[0] - a.phaseOpenDurations[0] = 0 - a.phaseNextDurations[1] += a.phaseNextDurations[0] - a.phaseNextDurations[0] = 0 - a.phaseLockDurations[1] += a.phaseLockDurations[0] - a.phaseLockDurations[0] = 0 -} - // QueryReplacer replaces new line and tab for grep result including query string. var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ") @@ -937,130 +896,12 @@ func FormatSQL(sql string) stringutil.StringerFunc { } } -const ( - phaseBuildLocking = "build:locking" - phaseOpenLocking = "open:locking" - phaseNextLocking = "next:locking" - phaseLockLocking = "lock:locking" - phaseBuildFinal = "build:final" - phaseOpenFinal = "open:final" - phaseNextFinal = "next:final" - phaseLockFinal = "lock:final" - phaseCommitPrewrite = "commit:prewrite" - phaseCommitCommit = "commit:commit" - phaseCommitWaitCommitTS = "commit:wait:commit-ts" - phaseCommitWaitLatestTS = "commit:wait:latest-ts" - phaseCommitWaitLatch = "commit:wait:local-latch" - phaseCommitWaitBinlog = "commit:wait:prewrite-binlog" - phaseWriteResponse = "write-response" -) - var ( sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal) sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral) totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK) - - // pre-define observers for non-internal queries - execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildLocking, "0") - execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenLocking, "0") - execNextLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseNextLocking, "0") - execLockLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseLockLocking, "0") - execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildFinal, "0") - execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenFinal, "0") - execNextFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseNextFinal, "0") - execLockFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseLockFinal, "0") - execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitPrewrite, "0") - execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitCommit, "0") - execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitCommitTS, "0") - execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0") - execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0") - execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0") - execWriteResponse = metrics.ExecPhaseDuration.WithLabelValues(phaseWriteResponse, "0") ) -func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer { - if internal { - return metrics.ExecPhaseDuration.WithLabelValues(phase, "1") - } - switch phase { - case phaseBuildLocking: - return execBuildLocking - case phaseOpenLocking: - return execOpenLocking - case phaseNextLocking: - return execNextLocking - case phaseLockLocking: - return execLockLocking - case phaseBuildFinal: - return execBuildFinal - case phaseOpenFinal: - return execOpenFinal - case phaseNextFinal: - return execNextFinal - case phaseLockFinal: - return execLockFinal - case phaseCommitPrewrite: - return execCommitPrewrite - case phaseCommitCommit: - return execCommitCommit - case phaseCommitWaitCommitTS: - return execCommitWaitCommitTS - case phaseCommitWaitLatestTS: - return execCommitWaitLatestTS - case phaseCommitWaitLatch: - return execCommitWaitLatch - case phaseCommitWaitBinlog: - return execCommitWaitBinlog - case phaseWriteResponse: - return execWriteResponse - default: - return metrics.ExecPhaseDuration.WithLabelValues(phase, "0") - } -} - -func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) { - for _, it := range []struct { - duration time.Duration - phase string - }{ - {a.phaseBuildDurations[0], phaseBuildFinal}, - {a.phaseBuildDurations[1], phaseBuildLocking}, - {a.phaseOpenDurations[0], phaseOpenFinal}, - {a.phaseOpenDurations[1], phaseOpenLocking}, - {a.phaseNextDurations[0], phaseNextFinal}, - {a.phaseNextDurations[1], phaseNextLocking}, - {a.phaseLockDurations[0], phaseLockFinal}, - {a.phaseLockDurations[1], phaseLockLocking}, - } { - if it.duration > 0 { - getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) - } - } - if commitDetails != nil { - for _, it := range []struct { - duration time.Duration - phase string - }{ - {commitDetails.PrewriteTime, phaseCommitPrewrite}, - {commitDetails.CommitTime, phaseCommitCommit}, - {commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS}, - {commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS}, - {commitDetails.LocalLatchTime, phaseCommitWaitLatch}, - {commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog}, - } { - if it.duration > 0 { - getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds()) - } - } - } - if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil { - d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration - if d > 0 { - getPhaseDurationObserver(phaseWriteResponse, internal).Observe(d.Seconds()) - } - } -} - // FinishExecuteStmt is used to record some information after `ExecStmt` execution finished: // 1. record slow log if needed. // 2. record summary statement. diff --git a/metrics/executor.go b/metrics/executor.go index 7a8aa294075e9..6702832ebe9ac 100644 --- a/metrics/executor.go +++ b/metrics/executor.go @@ -46,13 +46,4 @@ var ( Name: "statement_db_total", Help: "Counter of StmtNode by Database.", }, []string{LblDb, LblType}) - - // ExecPhaseDuration records the duration of each execution phase. - ExecPhaseDuration = prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Namespace: "tidb", - Subsystem: "executor", - Name: "phase_duration_seconds", - Help: "Summary of each execution phase duration.", - }, []string{LblPhase, LblInternal}) ) diff --git a/metrics/metrics.go b/metrics/metrics.go index e387d9d1c8ab4..a1c2c55f89974 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -150,7 +150,6 @@ func RegisterMetrics() { prometheus.MustRegister(StatsInaccuracyRate) prometheus.MustRegister(StmtNodeCounter) prometheus.MustRegister(DbStmtNodeCounter) - prometheus.MustRegister(ExecPhaseDuration) prometheus.MustRegister(StoreQueryFeedbackCounter) prometheus.MustRegister(TimeJumpBackCounter) prometheus.MustRegister(TransactionDuration) diff --git a/metrics/session.go b/metrics/session.go index 87a6e5ba3bc42..6acbfbb4be33b 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -81,7 +81,7 @@ var ( Name: "transaction_statement_num", Help: "Bucketed histogram of statements count in each transaction.", Buckets: prometheus.ExponentialBuckets(1, 2, 16), // 1 ~ 32768 - }, []string{LblTxnMode, LblType}) + }, []string{LbTxnMode, LblType}) TransactionDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -90,7 +90,7 @@ var ( Name: "transaction_duration_seconds", Help: "Bucketed histogram of a transaction execution duration, including retry.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days - }, []string{LblTxnMode, LblType}) + }, []string{LbTxnMode, LblType}) StatementDeadlockDetectDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ @@ -153,7 +153,7 @@ const ( LblCoprType = "copr_type" LblGeneral = "general" LblInternal = "internal" - LblTxnMode = "txn_mode" + LbTxnMode = "txn_mode" LblPessimistic = "pessimistic" LblOptimistic = "optimistic" LblStore = "store" @@ -165,6 +165,5 @@ const ( LblVersion = "version" LblHash = "hash" LblCTEType = "cte_type" - LblPhase = "phase" LblModule = "module" ) diff --git a/server/conn.go b/server/conn.go index 5495013ac81e9..76328d27d2151 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1465,18 +1465,7 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) { } func (cc *clientConn) flush(ctx context.Context) error { - var ( - stmtDetail *execdetails.StmtExecDetails - startTime time.Time - ) - if stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey); stmtDetailRaw != nil { - stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) - startTime = time.Now() - } defer func() { - if stmtDetail != nil { - stmtDetail.WriteSQLRespDuration += time.Since(startTime) - } trace.StartRegion(ctx, "FlushClientConn").End() if ctx := cc.getCtx(); ctx != nil && ctx.WarningCount() > 0 { for _, err := range ctx.GetWarnings() { @@ -2219,7 +2208,6 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool req := rs.NewChunk(cc.chunkAlloc) gotColumnInfo := false firstNext := true - var start time.Time var stmtDetail *execdetails.StmtExecDetails stmtDetailRaw := ctx.Value(execdetails.StmtExecDetailKey) if stmtDetailRaw != nil { @@ -2246,15 +2234,9 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool // We need to call Next before we get columns. // Otherwise, we will get incorrect columns info. columns := rs.Columns() - if stmtDetail != nil { - start = time.Now() - } if err = cc.writeColumnInfo(columns, serverStatus); err != nil { return false, err } - if stmtDetail != nil { - stmtDetail.WriteSQLRespDuration += time.Since(start) - } gotColumnInfo = true } rowCount := req.NumRows() @@ -2262,9 +2244,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool break } reg := trace.StartRegion(ctx, "WriteClientConn") - if stmtDetail != nil { - start = time.Now() - } + start := time.Now() for i := 0; i < rowCount; i++ { data = data[0:4] if binary { @@ -2286,14 +2266,7 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool stmtDetail.WriteSQLRespDuration += time.Since(start) } } - if stmtDetail != nil { - start = time.Now() - } - err := cc.writeEOF(serverStatus) - if stmtDetail != nil { - stmtDetail.WriteSQLRespDuration += time.Since(start) - } - return false, err + return false, cc.writeEOF(serverStatus) } // writeChunksWithFetchSize writes data from a Chunk, which filled data by a ResultSet, into a connection. @@ -2350,13 +2323,8 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if stmtDetailRaw != nil { stmtDetail = stmtDetailRaw.(*execdetails.StmtExecDetails) } - var ( - err error - start time.Time - ) - if stmtDetail != nil { - start = time.Now() - } + start := time.Now() + var err error for _, row := range curRows { data = data[0:4] data, err = dumpBinaryRow(data, rs.Columns(), row, cc.rsEncoder) @@ -2373,14 +2341,7 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet if cl, ok := rs.(fetchNotifier); ok { cl.OnFetchReturned() } - if stmtDetail != nil { - start = time.Now() - } - err = cc.writeEOF(serverStatus) - if stmtDetail != nil { - stmtDetail.WriteSQLRespDuration += time.Since(start) - } - return err + return cc.writeEOF(serverStatus) } func (cc *clientConn) setConn(conn net.Conn) {