Skip to content

Commit

Permalink
executor,metrics: add a metric for observing execution phases (#35906)
Browse files Browse the repository at this point in the history
ref #34106
  • Loading branch information
zyguan authored Jul 21, 2022
1 parent 4723998 commit 23f25af
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 13 deletions.
170 changes: 165 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/prometheus/client_golang/prometheus"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog()), zap.Stack("stack"))
}()

err = Next(ctx, a.executor, req)
err = a.stmt.next(ctx, a.executor, req)
if err != nil {
a.lastErr = err
return err
Expand Down Expand Up @@ -216,6 +217,17 @@ type ExecStmt struct {
retryCount uint
retryStartTime time.Time

// Phase durations are splited into two parts: 1. trying to lock keys (but
// failed); 2. the final iteration of the retry loop. Here we use
// [2]time.Duration to record such info for each phase. The first duration
// is increased only within the current iteration. When we meet a
// pessimistic lock error and decide to retry, we add the first duration to
// the second and reset the first to 0 by calling `resetPhaseDurations`.
phaseBuildDurations [2]time.Duration
phaseOpenDurations [2]time.Duration
phaseNextDurations [2]time.Duration
phaseLockDurations [2]time.Duration

// OutputNames will be set if using cached plan
OutputNames []*types.FieldName
PsStmt *plannercore.CachedPrepareStmt
Expand Down Expand Up @@ -425,7 +437,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
ctx = a.observeStmtBeginForTopSQL(ctx)

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun)
if err = e.Open(ctx); err != nil {
if err = a.openExecutor(ctx, e); err != nil {
terror.Call(e.Close)
return nil, err
}
Expand Down Expand Up @@ -625,7 +637,7 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor
var err error
req := newFirstChunk(e)
for {
err = Next(ctx, e, req)
err = a.next(ctx, e, req)
if err != nil {
// Handle 'write conflict' error.
break
Expand Down Expand Up @@ -671,7 +683,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex
}
}

err = Next(ctx, e, newFirstChunk(e))
err = a.next(ctx, e, newFirstChunk(e))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -724,6 +736,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
err = txn.LockKeys(ctx, lockCtx, keys...)
a.phaseLockDurations[0] += time.Since(startLocking)
if lockKeyStats != nil {
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
}
Expand Down Expand Up @@ -789,6 +802,8 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)

a.resetPhaseDurations()

e, err := a.buildExecutor()
if err != nil {
return nil, err
Expand All @@ -802,7 +817,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true)
})

if err = e.Open(ctx); err != nil {
if err = a.openExecutor(ctx, e); err != nil {
return nil, err
}
return e, nil
Expand All @@ -816,6 +831,7 @@ type pessimisticTxn interface {

// buildExecutor build an executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor() (Executor, error) {
defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now())
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
Expand Down Expand Up @@ -858,6 +874,31 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return e, nil
}

func (a *ExecStmt) openExecutor(ctx context.Context, e Executor) error {
start := time.Now()
err := e.Open(ctx)
a.phaseOpenDurations[0] += time.Since(start)
return err
}

func (a *ExecStmt) next(ctx context.Context, e Executor, req *chunk.Chunk) error {
start := time.Now()
err := Next(ctx, e, req)
a.phaseNextDurations[0] += time.Since(start)
return err
}

func (a *ExecStmt) resetPhaseDurations() {
a.phaseBuildDurations[1] += a.phaseBuildDurations[0]
a.phaseBuildDurations[0] = 0
a.phaseOpenDurations[1] += a.phaseOpenDurations[0]
a.phaseOpenDurations[0] = 0
a.phaseNextDurations[1] += a.phaseNextDurations[0]
a.phaseNextDurations[0] = 0
a.phaseLockDurations[1] += a.phaseLockDurations[0]
a.phaseLockDurations[0] = 0
}

// QueryReplacer replaces new line and tab for grep result including query string.
var QueryReplacer = strings.NewReplacer("\r", " ", "\n", " ", "\t", " ")

Expand Down Expand Up @@ -896,12 +937,130 @@ func FormatSQL(sql string) stringutil.StringerFunc {
}
}

const (
phaseBuildLocking = "build:locking"
phaseOpenLocking = "open:locking"
phaseNextLocking = "next:locking"
phaseLockLocking = "lock:locking"
phaseBuildFinal = "build:final"
phaseOpenFinal = "open:final"
phaseNextFinal = "next:final"
phaseLockFinal = "lock:final"
phaseCommitPrewrite = "commit:prewrite"
phaseCommitCommit = "commit:commit"
phaseCommitWaitCommitTS = "commit:wait:commit-ts"
phaseCommitWaitLatestTS = "commit:wait:latest-ts"
phaseCommitWaitLatch = "commit:wait:local-latch"
phaseCommitWaitBinlog = "commit:wait:prewrite-binlog"
phaseWriteResponse = "write-response"
)

var (
sessionExecuteRunDurationInternal = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblInternal)
sessionExecuteRunDurationGeneral = metrics.SessionExecuteRunDuration.WithLabelValues(metrics.LblGeneral)
totalTiFlashQuerySuccCounter = metrics.TiFlashQueryTotalCounter.WithLabelValues("", metrics.LblOK)

// pre-define observers for non-internal queries
execBuildLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildLocking, "0")
execOpenLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenLocking, "0")
execNextLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseNextLocking, "0")
execLockLocking = metrics.ExecPhaseDuration.WithLabelValues(phaseLockLocking, "0")
execBuildFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseBuildFinal, "0")
execOpenFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseOpenFinal, "0")
execNextFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseNextFinal, "0")
execLockFinal = metrics.ExecPhaseDuration.WithLabelValues(phaseLockFinal, "0")
execCommitPrewrite = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitPrewrite, "0")
execCommitCommit = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitCommit, "0")
execCommitWaitCommitTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitCommitTS, "0")
execCommitWaitLatestTS = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatestTS, "0")
execCommitWaitLatch = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitLatch, "0")
execCommitWaitBinlog = metrics.ExecPhaseDuration.WithLabelValues(phaseCommitWaitBinlog, "0")
execWriteResponse = metrics.ExecPhaseDuration.WithLabelValues(phaseWriteResponse, "0")
)

func getPhaseDurationObserver(phase string, internal bool) prometheus.Observer {
if internal {
return metrics.ExecPhaseDuration.WithLabelValues(phase, "1")
}
switch phase {
case phaseBuildLocking:
return execBuildLocking
case phaseOpenLocking:
return execOpenLocking
case phaseNextLocking:
return execNextLocking
case phaseLockLocking:
return execLockLocking
case phaseBuildFinal:
return execBuildFinal
case phaseOpenFinal:
return execOpenFinal
case phaseNextFinal:
return execNextFinal
case phaseLockFinal:
return execLockFinal
case phaseCommitPrewrite:
return execCommitPrewrite
case phaseCommitCommit:
return execCommitCommit
case phaseCommitWaitCommitTS:
return execCommitWaitCommitTS
case phaseCommitWaitLatestTS:
return execCommitWaitLatestTS
case phaseCommitWaitLatch:
return execCommitWaitLatch
case phaseCommitWaitBinlog:
return execCommitWaitBinlog
case phaseWriteResponse:
return execWriteResponse
default:
return metrics.ExecPhaseDuration.WithLabelValues(phase, "0")
}
}

func (a *ExecStmt) observePhaseDurations(internal bool, commitDetails *util.CommitDetails) {
for _, it := range []struct {
duration time.Duration
phase string
}{
{a.phaseBuildDurations[0], phaseBuildFinal},
{a.phaseBuildDurations[1], phaseBuildLocking},
{a.phaseOpenDurations[0], phaseOpenFinal},
{a.phaseOpenDurations[1], phaseOpenLocking},
{a.phaseNextDurations[0], phaseNextFinal},
{a.phaseNextDurations[1], phaseNextLocking},
{a.phaseLockDurations[0], phaseLockFinal},
{a.phaseLockDurations[1], phaseLockLocking},
} {
if it.duration > 0 {
getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds())
}
}
if commitDetails != nil {
for _, it := range []struct {
duration time.Duration
phase string
}{
{commitDetails.PrewriteTime, phaseCommitPrewrite},
{commitDetails.CommitTime, phaseCommitCommit},
{commitDetails.GetCommitTsTime, phaseCommitWaitCommitTS},
{commitDetails.GetLatestTsTime, phaseCommitWaitLatestTS},
{commitDetails.LocalLatchTime, phaseCommitWaitLatch},
{commitDetails.WaitPrewriteBinlogTime, phaseCommitWaitBinlog},
} {
if it.duration > 0 {
getPhaseDurationObserver(it.phase, internal).Observe(it.duration.Seconds())
}
}
}
if stmtDetailsRaw := a.GoCtx.Value(execdetails.StmtExecDetailKey); stmtDetailsRaw != nil {
d := stmtDetailsRaw.(*execdetails.StmtExecDetails).WriteSQLRespDuration
if d > 0 {
getPhaseDurationObserver(phaseWriteResponse, internal).Observe(d.Seconds())
}
}
}

// FinishExecuteStmt is used to record some information after `ExecStmt` execution finished:
// 1. record slow log if needed.
// 2. record summary statement.
Expand Down Expand Up @@ -946,6 +1105,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
sessVars.PrevStmt = FormatSQL(a.GetTextToLog())

a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail)
executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile
if sessVars.InRestrictedSQL {
sessionExecuteRunDurationInternal.Observe(executeDuration.Seconds())
Expand Down
9 changes: 9 additions & 0 deletions metrics/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ var (
Name: "statement_db_total",
Help: "Counter of StmtNode by Database.",
}, []string{LblDb, LblType})

// ExecPhaseDuration records the duration of each execution phase.
ExecPhaseDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "tidb",
Subsystem: "executor",
Name: "phase_duration_seconds",
Help: "Summary of each execution phase duration.",
}, []string{LblPhase, LblInternal})
)
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func RegisterMetrics() {
prometheus.MustRegister(StatsInaccuracyRate)
prometheus.MustRegister(StmtNodeCounter)
prometheus.MustRegister(DbStmtNodeCounter)
prometheus.MustRegister(ExecPhaseDuration)
prometheus.MustRegister(StoreQueryFeedbackCounter)
prometheus.MustRegister(TimeJumpBackCounter)
prometheus.MustRegister(TransactionDuration)
Expand Down
7 changes: 4 additions & 3 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
Name: "transaction_statement_num",
Help: "Bucketed histogram of statements count in each transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 16), // 1 ~ 32768
}, []string{LbTxnMode, LblType})
}, []string{LblTxnMode, LblType})

TransactionDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -90,7 +90,7 @@ var (
Name: "transaction_duration_seconds",
Help: "Bucketed histogram of a transaction execution duration, including retry.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days
}, []string{LbTxnMode, LblType})
}, []string{LblTxnMode, LblType})

StatementDeadlockDetectDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -169,7 +169,7 @@ const (
LblCoprType = "copr_type"
LblGeneral = "general"
LblInternal = "internal"
LbTxnMode = "txn_mode"
LblTxnMode = "txn_mode"
LblPessimistic = "pessimistic"
LblOptimistic = "optimistic"
LblStore = "store"
Expand All @@ -187,5 +187,6 @@ const (
LblCommitting = "committing"
LblRollingBack = "rolling_back"
LblHasLock = "has_lock"
LblPhase = "phase"
LblModule = "module"
)
Loading

0 comments on commit 23f25af

Please sign in to comment.