Skip to content

Commit

Permalink
ddl, metrics: update metrics (#7472)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Aug 27, 2018
1 parent 09fb68a commit 3af92ab
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
11 changes: 7 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error {

return errors.Trace(err)
})
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerAddDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

Expand Down Expand Up @@ -278,7 +278,7 @@ func (w *worker) deleteRange(job *model.Job) error {
func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

switch job.Type {
Expand Down Expand Up @@ -396,7 +396,6 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
// which may act like a deadlock.
log.Infof("[ddl-%s] run DDL job error, sleeps a while:%v then retries it.", w, WaitTimeWhenErrorOccured)
metrics.DDLJobErrCounter.Inc()
time.Sleep(WaitTimeWhenErrorOccured)
}

Expand Down Expand Up @@ -447,6 +446,10 @@ func chooseLeaseTime(t, max time.Duration) time.Duration {
// runDDLJob runs a DDL job. It returns the current schema version in this transaction and the error.
func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
log.Infof("[ddl-%s] run DDL job %s", w, job)
timeStart := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerRunDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()
if job.IsFinished() {
return
}
Expand Down Expand Up @@ -554,7 +557,7 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
timeStart := time.Now()
var err error
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerWaitSchemaChanged, metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerWaitSchemaChanged, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()

if latestSchemaVersion == 0 {
Expand Down
18 changes: 11 additions & 7 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, version int
err := PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))

metrics.UpdateSelfVersionHistogram.WithLabelValues(metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
metrics.UpdateSelfVersionHistogram.WithLabelValues(ver, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

Expand All @@ -232,7 +232,7 @@ func (s *schemaVersionSyncer) OwnerUpdateGlobalVersion(ctx context.Context, vers
// TODO: If the version is larger than the original global version, we need set the version.
// Otherwise, we'd better set the original global version.
err := PutKVToEtcd(ctx, s.etcdCli, putKeyRetryUnlimited, DDLGlobalSchemaVersion, ver)
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerUpdateGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerUpdateGlobalVersion, ver, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

Expand Down Expand Up @@ -267,13 +267,17 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo
// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64, error) {
startTime := time.Now()
var err error
var resp *clientv3.GetResponse
var (
err error
ver int
resp *clientv3.GetResponse
)
failedCnt := 0
intervalCnt := int(time.Second / keyOpRetryInterval)

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
gVer := strconv.FormatInt(int64(ver), 10)
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, gVer, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if err != nil {
Expand All @@ -294,7 +298,6 @@ func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx context.Context) (int64,
continue
}
if len(resp.Kvs) > 0 {
var ver int
ver, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err == nil {
return int64(ver), nil
Expand Down Expand Up @@ -322,7 +325,8 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestV

var err error
defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
ver := strconv.FormatInt(latestVer, 10)
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerGetGlobalVersion, ver, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if isContextDone(ctx) {
Expand Down
21 changes: 9 additions & 12 deletions metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (
Name: "update_self_ver_duration_seconds",
Help: "Bucketed histogram of processing time (s) of update self version",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{LblResult})
}, []string{LblVersion, LblResult})

OwnerUpdateGlobalVersion = "update_global_version"
OwnerGetGlobalVersion = "get_global_version"
Expand All @@ -75,10 +75,11 @@ var (
Name: "owner_handle_syncer_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handle syncer",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{LblType, LblResult})
}, []string{LblType, LblVersion, LblResult})

// Metrics for ddl_worker.go.
WorkerAddDDLJob = "add_job"
WorkerRunDDLJob = "run_job"
WorkerFinishDDLJob = "finish_job"
WorkerWaitSchemaChanged = "wait_schema_changed"
DDLWorkerHistogram = prometheus.NewHistogramVec(
Expand All @@ -88,7 +89,7 @@ var (
Name: "worker_operation_duration_seconds",
Help: "Bucketed histogram of processing time (s) of ddl worker operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
}, []string{LblType, LblResult})
}, []string{LblType, LblAction, LblResult})

CreateDDLInstance = "create_ddl_instance"
CreateDDL = "create_ddl"
Expand All @@ -100,15 +101,12 @@ var (
Name: "worker_operation_total",
Help: "Counter of creating ddl/worker and isowner.",
}, []string{LblType})
)

// DDLJobErrCounter is the counter of error occurred in ddl job.
DDLJobErrCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "job_error_total",
Help: "Counter of error occurred in ddl job.",
})
// Label constants.
const (
LblAction = "action"
LblVersion = "version"
)

func init() {
Expand All @@ -120,5 +118,4 @@ func init() {
prometheus.MustRegister(OwnerHandleSyncerHistogram)
prometheus.MustRegister(DDLWorkerHistogram)
prometheus.MustRegister(DDLCounter)
prometheus.MustRegister(DDLJobErrCounter)
}

0 comments on commit 3af92ab

Please sign in to comment.