From d6244e4b11d11ca92a0926862fbb4c5b2d87a347 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 30 Aug 2023 16:54:18 +0800 Subject: [PATCH 1/3] Pre-initialize the TSO metrics with the group label Signed-off-by: JmPotato --- pkg/tso/global_allocator.go | 62 +++++++++++++--------- pkg/tso/local_allocator.go | 46 ++++++++++------ pkg/tso/metrics.go | 14 ++--- pkg/tso/tso.go | 101 +++++++++++++++++++++++++++++------- 4 files changed, 154 insertions(+), 69 deletions(-) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index ed8136854fa..c5caac850e9 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" @@ -82,6 +83,9 @@ type GlobalTSOAllocator struct { // which is used to estimate the MaxTS in a Global TSO generation // to reduce the gRPC network IO latency. syncRTT atomic.Value // store as int64 milliseconds + + // pre-initialized metrics + tsoAllocatorRoleGauge prometheus.Gauge } // NewGlobalTSOAllocator creates a new global TSO allocator. @@ -92,20 +96,12 @@ func NewGlobalTSOAllocator( ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ - ctx: ctx, - cancel: cancel, - am: am, - member: am.member, - timestampOracle: ×tampOracle{ - client: am.member.GetLeadership().GetClient(), - tsPath: endpoint.KeyspaceGroupTSPath(am.kgID), - storage: am.storage, - saveInterval: am.saveInterval, - updatePhysicalInterval: am.updatePhysicalInterval, - maxResetTSGap: am.maxResetTSGap, - dcLocation: GlobalDCLocation, - tsoMux: &tsoObject{}, - }, + ctx: ctx, + cancel: cancel, + am: am, + member: am.member, + timestampOracle: newGlobalTimestampOracle(am, GlobalDCLocation), + tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(fmt.Sprintf("%d", am.kgID), GlobalDCLocation), } if startGlobalLeaderLoop { @@ -116,6 +112,22 @@ func NewGlobalTSOAllocator( return gta } +func newGlobalTimestampOracle(am *AllocatorManager, dcLocation string) *timestampOracle { + oracle := ×tampOracle{ + client: am.member.GetLeadership().GetClient(), + keyspaceGroupID: am.kgID, + tsPath: endpoint.KeyspaceGroupTSPath(am.kgID), + storage: am.storage, + saveInterval: am.saveInterval, + updatePhysicalInterval: am.updatePhysicalInterval, + maxResetTSGap: am.maxResetTSGap, + dcLocation: dcLocation, + tsoMux: &tsoObject{}, + } + oracle.initMetrics() + return oracle +} + // close is used to shutdown the primary election loop. // tso service call this function to shutdown the loop here, but pd manages its own loop. func (gta *GlobalTSOAllocator) close() { @@ -133,7 +145,7 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 { func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) { gta.syncRTT.Store(rtt) - tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt)) + gta.timestampOracle.globalTSOSyncRTTGauge.Set(float64(rtt)) } func (gta *GlobalTSOAllocator) getSyncRTT() int64 { @@ -172,7 +184,7 @@ func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { - tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(1) + gta.tsoAllocatorRoleGauge.Set(1) // The suffix of a Global TSO should always be 0. gta.timestampOracle.suffix = 0 return gta.timestampOracle.SyncTimestamp(gta.member.GetLeadership()) @@ -206,7 +218,7 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End() if !gta.member.GetLeadership().Check() { - tsoCounter.WithLabelValues("not_leader", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.notLeaderEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr)) } // To check if we have any dc-location configured in the cluster @@ -257,7 +269,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p // 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO, // we need to redo the setting phase with the bigger one and skip the check safely. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) > 0 { - tsoCounter.WithLabelValues("global_tso_sync", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.globalTSOSyncEvent.Inc() *estimatedMaxTSO = globalTSOResp // Re-add the count and check the overflow. estimatedMaxTSO.Logical += int64(count) @@ -270,7 +282,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p } // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 { - tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.globalTSOEstimateEvent.Inc() } // 4. Persist MaxTS into memory, and etcd if needed var currentGlobalTSO *pdpb.Timestamp @@ -281,10 +293,10 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p continue } if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 { - tsoCounter.WithLabelValues("global_tso_persist", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.globalTSOPersistEvent.Inc() // Update the Global TSO in memory if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil { - tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.globalTSOPersistErrEvent.Inc() log.Error("global tso allocator update the global tso in memory failed", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), errs.ZapError(err)) @@ -293,7 +305,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p } // 5. Check leadership again before we returning the response. if !gta.member.GetLeadership().Check() { - tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.notLeaderAnymoreEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } // 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster @@ -301,7 +313,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p globalTSOResp.SuffixBits = uint32(suffixBits) return globalTSOResp, nil } - tsoCounter.WithLabelValues("exceeded_max_retry", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.exceededMaxRetryEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("global tso allocator maximum number of retries exceeded") } @@ -324,7 +336,7 @@ func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBit log.Error("estimated logical part outside of max logical interval, please check ntp time", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow)) - tsoCounter.WithLabelValues("precheck_logical_overflow", gta.timestampOracle.dcLocation).Inc() + gta.timestampOracle.precheckLogicalOverflowEvent.Inc() return false } return true @@ -516,7 +528,7 @@ func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timesta // Reset is used to reset the TSO allocator. func (gta *GlobalTSOAllocator) Reset() { - tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(0) + gta.tsoAllocatorRoleGauge.Set(0) gta.timestampOracle.ResetTimestamp() } diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index a2459673c9b..342e5dc5124 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/utils" @@ -49,6 +50,9 @@ type LocalTSOAllocator struct { // So it's not conflicted. rootPath string allocatorLeader atomic.Value // stored as *pdpb.Member + + // pre-initialized metrics + tsoAllocatorRoleGauge prometheus.Gauge } // NewLocalTSOAllocator creates a new local TSO allocator. @@ -57,6 +61,16 @@ func NewLocalTSOAllocator( leadership *election.Leadership, dcLocation string, ) Allocator { + return &LocalTSOAllocator{ + allocatorManager: am, + leadership: leadership, + timestampOracle: newLocalTimestampOracle(am, leadership, dcLocation), + rootPath: leadership.GetLeaderKey(), + tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(fmt.Sprintf("%d", am.kgID), dcLocation), + } +} + +func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadership, dcLocation string) *timestampOracle { // Construct the timestampOracle path prefix, which is: // 1. for the default keyspace group: // lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp @@ -68,21 +82,19 @@ func NewLocalTSOAllocator( } else { tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation) } - return &LocalTSOAllocator{ - allocatorManager: am, - leadership: leadership, - timestampOracle: ×tampOracle{ - client: leadership.GetClient(), - tsPath: tsPath, - storage: am.storage, - saveInterval: am.saveInterval, - updatePhysicalInterval: am.updatePhysicalInterval, - maxResetTSGap: am.maxResetTSGap, - dcLocation: dcLocation, - tsoMux: &tsoObject{}, - }, - rootPath: leadership.GetLeaderKey(), + oracle := ×tampOracle{ + client: leadership.GetClient(), + keyspaceGroupID: am.kgID, + tsPath: tsPath, + storage: am.storage, + saveInterval: am.saveInterval, + updatePhysicalInterval: am.updatePhysicalInterval, + maxResetTSGap: am.maxResetTSGap, + dcLocation: dcLocation, + tsoMux: &tsoObject{}, } + oracle.initMetrics() + return oracle } // GetTimestampPath returns the timestamp path in etcd. @@ -100,7 +112,7 @@ func (lta *LocalTSOAllocator) GetDCLocation() string { // Initialize will initialize the created local TSO allocator. func (lta *LocalTSOAllocator) Initialize(suffix int) error { - tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(1) + lta.tsoAllocatorRoleGauge.Set(1) lta.timestampOracle.suffix = suffix return lta.timestampOracle.SyncTimestamp(lta.leadership) } @@ -126,7 +138,7 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End() if !lta.leadership.Check() { - tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc() + lta.timestampOracle.notLeaderEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs( fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation)) } @@ -135,7 +147,7 @@ func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pd // Reset is used to reset the TSO allocator. func (lta *LocalTSOAllocator) Reset() { - tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(0) + lta.tsoAllocatorRoleGauge.Set(0) lta.timestampOracle.ResetTimestamp() } diff --git a/pkg/tso/metrics.go b/pkg/tso/metrics.go index f48f2754499..aed63236ff4 100644 --- a/pkg/tso/metrics.go +++ b/pkg/tso/metrics.go @@ -17,19 +17,19 @@ package tso import "github.com/prometheus/client_golang/prometheus" const ( - dcLabel = "dc" - typeLabel = "type" + dcLabel = "dc" + typeLabel = "type" + groupLabel = "group" ) var ( - // TODO: pre-allocate gauge metrics tsoCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "tso", Name: "events", Help: "Counter of tso events", - }, []string{typeLabel, dcLabel}) + }, []string{typeLabel, groupLabel, dcLabel}) tsoGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -37,7 +37,7 @@ var ( Subsystem: "cluster", Name: "tso", Help: "Record of tso metadata.", - }, []string{typeLabel, dcLabel}) + }, []string{typeLabel, groupLabel, dcLabel}) tsoGap = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -45,7 +45,7 @@ var ( Subsystem: "cluster", Name: "tso_gap_millionseconds", Help: "The minimal (non-zero) TSO gap for each DC.", - }, []string{dcLabel}) + }, []string{groupLabel, dcLabel}) tsoAllocatorRole = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -53,7 +53,7 @@ var ( Subsystem: "tso", Name: "role", Help: "Indicate the PD server role info, whether it's a TSO allocator.", - }, []string{dcLabel}) + }, []string{groupLabel, dcLabel}) ) func init() { diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 15ce5ba4f9c..24964b9d87c 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -60,7 +61,8 @@ type tsoObject struct { // timestampOracle is used to maintain the logic of TSO. type timestampOracle struct { - client *clientv3.Client + client *clientv3.Client + keyspaceGroupID uint32 // When tsPath is empty, it means that it is a global timestampOracle. tsPath string storage endpoint.TSOStorage @@ -74,6 +76,65 @@ type timestampOracle struct { lastSavedTime atomic.Value // stored as time.Time suffix int dcLocation string + + // pre-initialized metrics + // timestampOracle event counter + syncEvent prometheus.Counter + syncOKEvent prometheus.Counter + errSaveSyncTSEvent prometheus.Counter + errLeaseResetTSEvent prometheus.Counter + errResetSmallTSEvent prometheus.Counter + errResetLargeTSEvent prometheus.Counter + errSaveResetTSEvent prometheus.Counter + resetTSOOKEvent prometheus.Counter + saveEvent prometheus.Counter + slowSaveEvent prometheus.Counter + systemTimeSlowEvent prometheus.Counter + skipSaveEvent prometheus.Counter + errSaveUpdateTSEvent prometheus.Counter + notLeaderAnymoreEvent prometheus.Counter + logicalOverflowEvent prometheus.Counter + exceededMaxRetryEvent prometheus.Counter + // allocator event counter + notLeaderEvent prometheus.Counter + globalTSOSyncEvent prometheus.Counter + globalTSOEstimateEvent prometheus.Counter + globalTSOPersistEvent prometheus.Counter + globalTSOPersistErrEvent prometheus.Counter + precheckLogicalOverflowEvent prometheus.Counter + // others + tsoPhysicalGauge prometheus.Gauge + tsoPhysicalGapGauge prometheus.Gauge + globalTSOSyncRTTGauge prometheus.Gauge +} + +func (t *timestampOracle) initMetrics() { + groupLabel := fmt.Sprintf("%d", t.keyspaceGroupID) + t.syncEvent = tsoCounter.WithLabelValues("sync", groupLabel, t.dcLocation) + t.syncOKEvent = tsoCounter.WithLabelValues("sync_ok", groupLabel, t.dcLocation) + t.errSaveSyncTSEvent = tsoCounter.WithLabelValues("err_save_sync_ts", groupLabel, t.dcLocation) + t.errLeaseResetTSEvent = tsoCounter.WithLabelValues("err_lease_reset_ts", groupLabel, t.dcLocation) + t.errResetSmallTSEvent = tsoCounter.WithLabelValues("err_reset_small_ts", groupLabel, t.dcLocation) + t.errResetLargeTSEvent = tsoCounter.WithLabelValues("err_reset_large_ts", groupLabel, t.dcLocation) + t.errSaveResetTSEvent = tsoCounter.WithLabelValues("err_save_reset_ts", groupLabel, t.dcLocation) + t.resetTSOOKEvent = tsoCounter.WithLabelValues("reset_tso_ok", groupLabel, t.dcLocation) + t.saveEvent = tsoCounter.WithLabelValues("save", groupLabel, t.dcLocation) + t.slowSaveEvent = tsoCounter.WithLabelValues("slow_save", groupLabel, t.dcLocation) + t.systemTimeSlowEvent = tsoCounter.WithLabelValues("system_time_slow", groupLabel, t.dcLocation) + t.skipSaveEvent = tsoCounter.WithLabelValues("skip_save", groupLabel, t.dcLocation) + t.errSaveUpdateTSEvent = tsoCounter.WithLabelValues("err_save_update_ts", groupLabel, t.dcLocation) + t.notLeaderAnymoreEvent = tsoCounter.WithLabelValues("not_leader_anymore", groupLabel, t.dcLocation) + t.logicalOverflowEvent = tsoCounter.WithLabelValues("logical_overflow", groupLabel, t.dcLocation) + t.exceededMaxRetryEvent = tsoCounter.WithLabelValues("exceeded_max_retry", groupLabel, t.dcLocation) + t.notLeaderEvent = tsoCounter.WithLabelValues("not_leader", groupLabel, t.dcLocation) + t.globalTSOSyncEvent = tsoCounter.WithLabelValues("global_tso_sync", groupLabel, t.dcLocation) + t.globalTSOEstimateEvent = tsoCounter.WithLabelValues("global_tso_estimate", groupLabel, t.dcLocation) + t.globalTSOPersistEvent = tsoCounter.WithLabelValues("global_tso_persist", groupLabel, t.dcLocation) + t.globalTSOPersistErrEvent = tsoCounter.WithLabelValues("global_tso_persist_err", groupLabel, t.dcLocation) + t.precheckLogicalOverflowEvent = tsoCounter.WithLabelValues("precheck_logical_overflow", groupLabel, t.dcLocation) + t.tsoPhysicalGauge = tsoGauge.WithLabelValues("tso", groupLabel, t.dcLocation) + t.tsoPhysicalGapGauge = tsoGap.WithLabelValues(groupLabel, t.dcLocation) + t.globalTSOSyncRTTGauge = tsoGauge.WithLabelValues("global_tso_sync_rtt", groupLabel, t.dcLocation) } func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { @@ -148,7 +209,7 @@ func (t *timestampOracle) GetTimestampPath() string { // SyncTimestamp is used to synchronize the timestamp. func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { - tsoCounter.WithLabelValues("sync", t.dcLocation).Inc() + t.syncEvent.Inc() failpoint.Inject("delaySyncTimestamp", func() { time.Sleep(time.Second) @@ -178,12 +239,12 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { }) save := next.Add(t.saveInterval) if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { - tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc() + t.errSaveSyncTSEvent.Inc() return err } t.lastSavedTime.Store(save) - tsoCounter.WithLabelValues("sync_ok", t.dcLocation).Inc() + t.syncOKEvent.Inc() log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) // save into memory t.setTSOPhysical(next, true) @@ -213,7 +274,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi t.tsoMux.Lock() defer t.tsoMux.Unlock() if !leadership.Check() { - tsoCounter.WithLabelValues("err_lease_reset_ts", t.dcLocation).Inc() + t.errLeaseResetTSEvent.Inc() return errs.ErrResetUserTimestamp.FastGenByArgs("lease expired") } var ( @@ -223,7 +284,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi ) // do not update if next physical time is less/before than prev if physicalDifference < 0 { - tsoCounter.WithLabelValues("err_reset_small_ts", t.dcLocation).Inc() + t.errResetSmallTSEvent.Inc() if ignoreSmaller { return nil } @@ -231,7 +292,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi } // do not update if next logical time is less/before/equal than prev if physicalDifference == 0 && logicalDifference <= 0 { - tsoCounter.WithLabelValues("err_reset_small_counter", t.dcLocation).Inc() + t.errResetSmallTSEvent.Inc() if ignoreSmaller { return nil } @@ -239,14 +300,14 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi } // do not update if physical time is too greater than prev if !skipUpperBoundCheck && physicalDifference >= t.maxResetTSGap().Milliseconds() { - tsoCounter.WithLabelValues("err_reset_large_ts", t.dcLocation).Inc() + t.errResetLargeTSEvent.Inc() return errs.ErrResetUserTimestamp.FastGenByArgs("the specified ts is too larger than now") } // save into etcd only if nextPhysical is close to lastSavedTime if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { - tsoCounter.WithLabelValues("err_save_reset_ts", t.dcLocation).Inc() + t.errSaveResetTSEvent.Inc() return err } t.lastSavedTime.Store(save) @@ -255,7 +316,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi t.tsoMux.physical = nextPhysical t.tsoMux.logical = int64(nextLogical) t.setTSOUpdateTimeLocked(time.Now()) - tsoCounter.WithLabelValues("reset_tso_ok", t.dcLocation).Inc() + t.resetTSOOKEvent.Inc() return nil } @@ -275,8 +336,8 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi // and should not be called when the TSO in memory has been reset anymore. func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error { prevPhysical, prevLogical := t.getTSO() - tsoGauge.WithLabelValues("tso", t.dcLocation).Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) - tsoGap.WithLabelValues(t.dcLocation).Set(float64(time.Since(prevPhysical).Milliseconds())) + t.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) + t.tsoPhysicalGapGauge.Set(float64(time.Since(prevPhysical).Milliseconds())) now := time.Now() failpoint.Inject("fallBackUpdate", func() { @@ -286,7 +347,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error now = now.Add(-time.Hour) }) - tsoCounter.WithLabelValues("save", t.dcLocation).Inc() + t.saveEvent.Inc() jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical) if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { @@ -295,11 +356,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval)) - tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc() + t.slowSaveEvent.Inc() } if jetLag < 0 { - tsoCounter.WithLabelValues("system_time_slow", t.dcLocation).Inc() + t.systemTimeSlowEvent.Inc() } var next time.Time @@ -313,7 +374,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error next = prevPhysical.Add(time.Millisecond) } else { // It will still use the previous physical time to alloc the timestamp. - tsoCounter.WithLabelValues("skip_save", t.dcLocation).Inc() + t.skipSaveEvent.Inc() return nil } @@ -326,7 +387,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error zap.String("dc-location", t.dcLocation), zap.String("timestamp-path", t.GetTimestampPath()), zap.Error(err)) - tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc() + t.errSaveUpdateTSEvent.Inc() return err } t.lastSavedTime.Store(save) @@ -354,7 +415,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader time.Sleep(200 * time.Millisecond) continue } - tsoCounter.WithLabelValues("not_leader_anymore", t.dcLocation).Inc() + t.notLeaderAnymoreEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") } // Get a new TSO result with the given count @@ -366,7 +427,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`", zap.Reflect("response", resp), zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow)) - tsoCounter.WithLabelValues("logical_overflow", t.dcLocation).Inc() + t.logicalOverflowEvent.Inc() time.Sleep(t.updatePhysicalInterval) continue } @@ -377,7 +438,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader resp.SuffixBits = uint32(suffixBits) return resp, nil } - tsoCounter.WithLabelValues("exceeded_max_retry", t.dcLocation).Inc() + t.exceededMaxRetryEvent.Inc() return resp, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("generate %s tso maximum number of retries exceeded", t.dcLocation)) } From 807392081c6e8465d7100d8ee197ae7ad2d73646 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 30 Aug 2023 20:22:29 +0800 Subject: [PATCH 2/3] Address the comments Signed-off-by: JmPotato --- pkg/tso/allocator_manager.go | 8 +++ pkg/tso/global_allocator.go | 32 ++++++------ pkg/tso/local_allocator.go | 10 ++-- pkg/tso/metrics.go | 63 +++++++++++++++++++++++ pkg/tso/tso.go | 97 ++++++++---------------------------- 5 files changed, 116 insertions(+), 94 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 6fb31004db0..2ecc453f80e 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -282,6 +282,14 @@ func (am *AllocatorManager) getGroupID() uint32 { return am.kgID } +// getGroupIDStr returns the keyspace group ID of the allocator manager in string format. +func (am *AllocatorManager) getGroupIDStr() string { + if am == nil { + return "0" + } + return strconv.FormatUint(uint64(am.kgID), 10) +} + // GetTimestampPath returns the timestamp path in etcd for the given DCLocation. func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { if am == nil { diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index c5caac850e9..bc5f368a862 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -100,8 +100,8 @@ func NewGlobalTSOAllocator( cancel: cancel, am: am, member: am.member, - timestampOracle: newGlobalTimestampOracle(am, GlobalDCLocation), - tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(fmt.Sprintf("%d", am.kgID), GlobalDCLocation), + timestampOracle: newGlobalTimestampOracle(am), + tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } if startGlobalLeaderLoop { @@ -112,7 +112,7 @@ func NewGlobalTSOAllocator( return gta } -func newGlobalTimestampOracle(am *AllocatorManager, dcLocation string) *timestampOracle { +func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { oracle := ×tampOracle{ client: am.member.GetLeadership().GetClient(), keyspaceGroupID: am.kgID, @@ -121,10 +121,10 @@ func newGlobalTimestampOracle(am *AllocatorManager, dcLocation string) *timestam saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, maxResetTSGap: am.maxResetTSGap, - dcLocation: dcLocation, + dcLocation: GlobalDCLocation, tsoMux: &tsoObject{}, + metrics: newTSOMetrics(am.getGroupIDStr(), GlobalDCLocation), } - oracle.initMetrics() return oracle } @@ -145,7 +145,7 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 { func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) { gta.syncRTT.Store(rtt) - gta.timestampOracle.globalTSOSyncRTTGauge.Set(float64(rtt)) + gta.getMetrics().globalTSOSyncRTTGauge.Set(float64(rtt)) } func (gta *GlobalTSOAllocator) getSyncRTT() int64 { @@ -218,7 +218,7 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End() if !gta.member.GetLeadership().Check() { - gta.timestampOracle.notLeaderEvent.Inc() + gta.getMetrics().notLeaderEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr)) } // To check if we have any dc-location configured in the cluster @@ -269,7 +269,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p // 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO, // we need to redo the setting phase with the bigger one and skip the check safely. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) > 0 { - gta.timestampOracle.globalTSOSyncEvent.Inc() + gta.getMetrics().globalTSOSyncEvent.Inc() *estimatedMaxTSO = globalTSOResp // Re-add the count and check the overflow. estimatedMaxTSO.Logical += int64(count) @@ -282,7 +282,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p } // Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid. if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 { - gta.timestampOracle.globalTSOEstimateEvent.Inc() + gta.getMetrics().globalTSOEstimateEvent.Inc() } // 4. Persist MaxTS into memory, and etcd if needed var currentGlobalTSO *pdpb.Timestamp @@ -293,10 +293,10 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p continue } if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 { - gta.timestampOracle.globalTSOPersistEvent.Inc() + gta.getMetrics().globalTSOPersistEvent.Inc() // Update the Global TSO in memory if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil { - gta.timestampOracle.globalTSOPersistErrEvent.Inc() + gta.getMetrics().errGlobalTSOPersistEvent.Inc() log.Error("global tso allocator update the global tso in memory failed", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), errs.ZapError(err)) @@ -305,7 +305,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p } // 5. Check leadership again before we returning the response. if !gta.member.GetLeadership().Check() { - gta.timestampOracle.notLeaderAnymoreEvent.Inc() + gta.getMetrics().notLeaderAnymoreEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } // 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster @@ -313,7 +313,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p globalTSOResp.SuffixBits = uint32(suffixBits) return globalTSOResp, nil } - gta.timestampOracle.exceededMaxRetryEvent.Inc() + gta.getMetrics().exceededMaxRetryEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("global tso allocator maximum number of retries exceeded") } @@ -336,7 +336,7 @@ func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBit log.Error("estimated logical part outside of max logical interval, please check ntp time", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow)) - gta.timestampOracle.precheckLogicalOverflowEvent.Inc() + gta.getMetrics().precheckLogicalOverflowEvent.Inc() return false } return true @@ -651,3 +651,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { } } } + +func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { + return gta.timestampOracle.metrics +} diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 342e5dc5124..5bbaea8c178 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -66,7 +66,7 @@ func NewLocalTSOAllocator( leadership: leadership, timestampOracle: newLocalTimestampOracle(am, leadership, dcLocation), rootPath: leadership.GetLeaderKey(), - tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(fmt.Sprintf("%d", am.kgID), dcLocation), + tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), dcLocation), } } @@ -92,8 +92,8 @@ func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadersh maxResetTSGap: am.maxResetTSGap, dcLocation: dcLocation, tsoMux: &tsoObject{}, + metrics: newTSOMetrics(am.getGroupIDStr(), dcLocation), } - oracle.initMetrics() return oracle } @@ -138,7 +138,7 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End() if !lta.leadership.Check() { - lta.timestampOracle.notLeaderEvent.Inc() + lta.getMetrics().notLeaderEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs( fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation)) } @@ -272,3 +272,7 @@ func (lta *LocalTSOAllocator) WatchAllocatorLeader(serverCtx context.Context, al lta.leadership.Watch(serverCtx, revision) lta.unsetAllocatorLeader() } + +func (lta *LocalTSOAllocator) getMetrics() *tsoMetrics { + return lta.timestampOracle.metrics +} diff --git a/pkg/tso/metrics.go b/pkg/tso/metrics.go index aed63236ff4..6750f1a308a 100644 --- a/pkg/tso/metrics.go +++ b/pkg/tso/metrics.go @@ -62,3 +62,66 @@ func init() { prometheus.MustRegister(tsoGap) prometheus.MustRegister(tsoAllocatorRole) } + +type tsoMetrics struct { + // timestampOracle event counter + syncEvent prometheus.Counter + syncOKEvent prometheus.Counter + errSaveSyncTSEvent prometheus.Counter + errLeaseResetTSEvent prometheus.Counter + errResetSmallPhysicalTSEvent prometheus.Counter + errResetSmallLogicalTSEvent prometheus.Counter + errResetLargeTSEvent prometheus.Counter + errSaveResetTSEvent prometheus.Counter + resetTSOOKEvent prometheus.Counter + saveEvent prometheus.Counter + slowSaveEvent prometheus.Counter + systemTimeSlowEvent prometheus.Counter + skipSaveEvent prometheus.Counter + errSaveUpdateTSEvent prometheus.Counter + notLeaderAnymoreEvent prometheus.Counter + logicalOverflowEvent prometheus.Counter + exceededMaxRetryEvent prometheus.Counter + // allocator event counter + notLeaderEvent prometheus.Counter + globalTSOSyncEvent prometheus.Counter + globalTSOEstimateEvent prometheus.Counter + globalTSOPersistEvent prometheus.Counter + precheckLogicalOverflowEvent prometheus.Counter + errGlobalTSOPersistEvent prometheus.Counter + // others + tsoPhysicalGauge prometheus.Gauge + tsoPhysicalGapGauge prometheus.Gauge + globalTSOSyncRTTGauge prometheus.Gauge +} + +func newTSOMetrics(groupID, dcLocation string) *tsoMetrics { + return &tsoMetrics{ + syncEvent: tsoCounter.WithLabelValues("sync", groupID, dcLocation), + syncOKEvent: tsoCounter.WithLabelValues("sync_ok", groupID, dcLocation), + errSaveSyncTSEvent: tsoCounter.WithLabelValues("err_save_sync_ts", groupID, dcLocation), + errLeaseResetTSEvent: tsoCounter.WithLabelValues("err_lease_reset_ts", groupID, dcLocation), + errResetSmallPhysicalTSEvent: tsoCounter.WithLabelValues("err_reset_physical_small_ts", groupID, dcLocation), + errResetSmallLogicalTSEvent: tsoCounter.WithLabelValues("err_reset_logical_small_ts", groupID, dcLocation), + errResetLargeTSEvent: tsoCounter.WithLabelValues("err_reset_large_ts", groupID, dcLocation), + errSaveResetTSEvent: tsoCounter.WithLabelValues("err_save_reset_ts", groupID, dcLocation), + resetTSOOKEvent: tsoCounter.WithLabelValues("reset_tso_ok", groupID, dcLocation), + saveEvent: tsoCounter.WithLabelValues("save", groupID, dcLocation), + slowSaveEvent: tsoCounter.WithLabelValues("slow_save", groupID, dcLocation), + systemTimeSlowEvent: tsoCounter.WithLabelValues("system_time_slow", groupID, dcLocation), + skipSaveEvent: tsoCounter.WithLabelValues("skip_save", groupID, dcLocation), + errSaveUpdateTSEvent: tsoCounter.WithLabelValues("err_save_update_ts", groupID, dcLocation), + notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation), + logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation), + exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation), + notLeaderEvent: tsoCounter.WithLabelValues("not_leader", groupID, dcLocation), + globalTSOSyncEvent: tsoCounter.WithLabelValues("global_tso_sync", groupID, dcLocation), + globalTSOEstimateEvent: tsoCounter.WithLabelValues("global_tso_estimate", groupID, dcLocation), + globalTSOPersistEvent: tsoCounter.WithLabelValues("global_tso_persist", groupID, dcLocation), + errGlobalTSOPersistEvent: tsoCounter.WithLabelValues("global_tso_persist_err", groupID, dcLocation), + precheckLogicalOverflowEvent: tsoCounter.WithLabelValues("precheck_logical_overflow", groupID, dcLocation), + tsoPhysicalGauge: tsoGauge.WithLabelValues("tso", groupID, dcLocation), + tsoPhysicalGapGauge: tsoGap.WithLabelValues(groupLabel, dcLocation), + globalTSOSyncRTTGauge: tsoGauge.WithLabelValues("global_tso_sync_rtt", groupID, dcLocation), + } +} diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 24964b9d87c..dd846573562 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" - "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -78,63 +77,7 @@ type timestampOracle struct { dcLocation string // pre-initialized metrics - // timestampOracle event counter - syncEvent prometheus.Counter - syncOKEvent prometheus.Counter - errSaveSyncTSEvent prometheus.Counter - errLeaseResetTSEvent prometheus.Counter - errResetSmallTSEvent prometheus.Counter - errResetLargeTSEvent prometheus.Counter - errSaveResetTSEvent prometheus.Counter - resetTSOOKEvent prometheus.Counter - saveEvent prometheus.Counter - slowSaveEvent prometheus.Counter - systemTimeSlowEvent prometheus.Counter - skipSaveEvent prometheus.Counter - errSaveUpdateTSEvent prometheus.Counter - notLeaderAnymoreEvent prometheus.Counter - logicalOverflowEvent prometheus.Counter - exceededMaxRetryEvent prometheus.Counter - // allocator event counter - notLeaderEvent prometheus.Counter - globalTSOSyncEvent prometheus.Counter - globalTSOEstimateEvent prometheus.Counter - globalTSOPersistEvent prometheus.Counter - globalTSOPersistErrEvent prometheus.Counter - precheckLogicalOverflowEvent prometheus.Counter - // others - tsoPhysicalGauge prometheus.Gauge - tsoPhysicalGapGauge prometheus.Gauge - globalTSOSyncRTTGauge prometheus.Gauge -} - -func (t *timestampOracle) initMetrics() { - groupLabel := fmt.Sprintf("%d", t.keyspaceGroupID) - t.syncEvent = tsoCounter.WithLabelValues("sync", groupLabel, t.dcLocation) - t.syncOKEvent = tsoCounter.WithLabelValues("sync_ok", groupLabel, t.dcLocation) - t.errSaveSyncTSEvent = tsoCounter.WithLabelValues("err_save_sync_ts", groupLabel, t.dcLocation) - t.errLeaseResetTSEvent = tsoCounter.WithLabelValues("err_lease_reset_ts", groupLabel, t.dcLocation) - t.errResetSmallTSEvent = tsoCounter.WithLabelValues("err_reset_small_ts", groupLabel, t.dcLocation) - t.errResetLargeTSEvent = tsoCounter.WithLabelValues("err_reset_large_ts", groupLabel, t.dcLocation) - t.errSaveResetTSEvent = tsoCounter.WithLabelValues("err_save_reset_ts", groupLabel, t.dcLocation) - t.resetTSOOKEvent = tsoCounter.WithLabelValues("reset_tso_ok", groupLabel, t.dcLocation) - t.saveEvent = tsoCounter.WithLabelValues("save", groupLabel, t.dcLocation) - t.slowSaveEvent = tsoCounter.WithLabelValues("slow_save", groupLabel, t.dcLocation) - t.systemTimeSlowEvent = tsoCounter.WithLabelValues("system_time_slow", groupLabel, t.dcLocation) - t.skipSaveEvent = tsoCounter.WithLabelValues("skip_save", groupLabel, t.dcLocation) - t.errSaveUpdateTSEvent = tsoCounter.WithLabelValues("err_save_update_ts", groupLabel, t.dcLocation) - t.notLeaderAnymoreEvent = tsoCounter.WithLabelValues("not_leader_anymore", groupLabel, t.dcLocation) - t.logicalOverflowEvent = tsoCounter.WithLabelValues("logical_overflow", groupLabel, t.dcLocation) - t.exceededMaxRetryEvent = tsoCounter.WithLabelValues("exceeded_max_retry", groupLabel, t.dcLocation) - t.notLeaderEvent = tsoCounter.WithLabelValues("not_leader", groupLabel, t.dcLocation) - t.globalTSOSyncEvent = tsoCounter.WithLabelValues("global_tso_sync", groupLabel, t.dcLocation) - t.globalTSOEstimateEvent = tsoCounter.WithLabelValues("global_tso_estimate", groupLabel, t.dcLocation) - t.globalTSOPersistEvent = tsoCounter.WithLabelValues("global_tso_persist", groupLabel, t.dcLocation) - t.globalTSOPersistErrEvent = tsoCounter.WithLabelValues("global_tso_persist_err", groupLabel, t.dcLocation) - t.precheckLogicalOverflowEvent = tsoCounter.WithLabelValues("precheck_logical_overflow", groupLabel, t.dcLocation) - t.tsoPhysicalGauge = tsoGauge.WithLabelValues("tso", groupLabel, t.dcLocation) - t.tsoPhysicalGapGauge = tsoGap.WithLabelValues(groupLabel, t.dcLocation) - t.globalTSOSyncRTTGauge = tsoGauge.WithLabelValues("global_tso_sync_rtt", groupLabel, t.dcLocation) + metrics *tsoMetrics } func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { @@ -209,7 +152,7 @@ func (t *timestampOracle) GetTimestampPath() string { // SyncTimestamp is used to synchronize the timestamp. func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { - t.syncEvent.Inc() + t.metrics.syncEvent.Inc() failpoint.Inject("delaySyncTimestamp", func() { time.Sleep(time.Second) @@ -239,12 +182,12 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { }) save := next.Add(t.saveInterval) if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { - t.errSaveSyncTSEvent.Inc() + t.metrics.errSaveSyncTSEvent.Inc() return err } t.lastSavedTime.Store(save) - t.syncOKEvent.Inc() + t.metrics.syncOKEvent.Inc() log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) // save into memory t.setTSOPhysical(next, true) @@ -274,7 +217,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi t.tsoMux.Lock() defer t.tsoMux.Unlock() if !leadership.Check() { - t.errLeaseResetTSEvent.Inc() + t.metrics.errLeaseResetTSEvent.Inc() return errs.ErrResetUserTimestamp.FastGenByArgs("lease expired") } var ( @@ -284,7 +227,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi ) // do not update if next physical time is less/before than prev if physicalDifference < 0 { - t.errResetSmallTSEvent.Inc() + t.metrics.errResetSmallPhysicalTSEvent.Inc() if ignoreSmaller { return nil } @@ -292,7 +235,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi } // do not update if next logical time is less/before/equal than prev if physicalDifference == 0 && logicalDifference <= 0 { - t.errResetSmallTSEvent.Inc() + t.metrics.errResetSmallLogicalTSEvent.Inc() if ignoreSmaller { return nil } @@ -300,14 +243,14 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi } // do not update if physical time is too greater than prev if !skipUpperBoundCheck && physicalDifference >= t.maxResetTSGap().Milliseconds() { - t.errResetLargeTSEvent.Inc() + t.metrics.errResetLargeTSEvent.Inc() return errs.ErrResetUserTimestamp.FastGenByArgs("the specified ts is too larger than now") } // save into etcd only if nextPhysical is close to lastSavedTime if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { - t.errSaveResetTSEvent.Inc() + t.metrics.errSaveResetTSEvent.Inc() return err } t.lastSavedTime.Store(save) @@ -316,7 +259,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi t.tsoMux.physical = nextPhysical t.tsoMux.logical = int64(nextLogical) t.setTSOUpdateTimeLocked(time.Now()) - t.resetTSOOKEvent.Inc() + t.metrics.resetTSOOKEvent.Inc() return nil } @@ -336,8 +279,8 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi // and should not be called when the TSO in memory has been reset anymore. func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error { prevPhysical, prevLogical := t.getTSO() - t.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) - t.tsoPhysicalGapGauge.Set(float64(time.Since(prevPhysical).Milliseconds())) + t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) + t.metrics.tsoPhysicalGapGauge.Set(float64(time.Since(prevPhysical).Milliseconds())) now := time.Now() failpoint.Inject("fallBackUpdate", func() { @@ -347,7 +290,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error now = now.Add(-time.Hour) }) - t.saveEvent.Inc() + t.metrics.saveEvent.Inc() jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical) if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { @@ -356,11 +299,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval)) - t.slowSaveEvent.Inc() + t.metrics.slowSaveEvent.Inc() } if jetLag < 0 { - t.systemTimeSlowEvent.Inc() + t.metrics.systemTimeSlowEvent.Inc() } var next time.Time @@ -374,7 +317,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error next = prevPhysical.Add(time.Millisecond) } else { // It will still use the previous physical time to alloc the timestamp. - t.skipSaveEvent.Inc() + t.metrics.skipSaveEvent.Inc() return nil } @@ -387,7 +330,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error zap.String("dc-location", t.dcLocation), zap.String("timestamp-path", t.GetTimestampPath()), zap.Error(err)) - t.errSaveUpdateTSEvent.Inc() + t.metrics.errSaveUpdateTSEvent.Inc() return err } t.lastSavedTime.Store(save) @@ -415,7 +358,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader time.Sleep(200 * time.Millisecond) continue } - t.notLeaderAnymoreEvent.Inc() + t.metrics.notLeaderAnymoreEvent.Inc() return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory isn't initialized") } // Get a new TSO result with the given count @@ -427,7 +370,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`", zap.Reflect("response", resp), zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow)) - t.logicalOverflowEvent.Inc() + t.metrics.logicalOverflowEvent.Inc() time.Sleep(t.updatePhysicalInterval) continue } @@ -438,7 +381,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader resp.SuffixBits = uint32(suffixBits) return resp, nil } - t.exceededMaxRetryEvent.Inc() + t.metrics.exceededMaxRetryEvent.Inc() return resp, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("generate %s tso maximum number of retries exceeded", t.dcLocation)) } From 1d469921e1d688ff173aa25b52b15fed21755b24 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 30 Aug 2023 22:24:12 +0800 Subject: [PATCH 3/3] Extract the Local TSO key path function Signed-off-by: JmPotato --- pkg/storage/endpoint/key_path.go | 18 +++++++++++++++--- pkg/tso/global_allocator.go | 3 +-- pkg/tso/keyspace_group_manager.go | 4 ++-- pkg/tso/keyspace_group_manager_test.go | 6 +++--- pkg/tso/local_allocator.go | 17 ++--------------- 5 files changed, 23 insertions(+), 25 deletions(-) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 0e99431044a..872d994a270 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -350,18 +350,30 @@ func buildPath(withSuffix bool, str ...string) string { return sb.String() } -// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is: +// KeyspaceGroupGlobalTSPath constructs the timestampOracle path prefix for Global TSO, which is: // 1. for the default keyspace group: // "" in /pd/{cluster_id}/timestamp // 2. for the non-default keyspace groups: // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp -func KeyspaceGroupTSPath(groupID uint32) string { +func KeyspaceGroupGlobalTSPath(groupID uint32) string { if groupID == utils.DefaultKeyspaceGroupID { return "" } return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) } +// KeyspaceGroupLocalTSPath constructs the timestampOracle path prefix for Local TSO, which is: +// 1. for the default keyspace group: +// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp +// 2. for the non-default keyspace groups: +// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp +func KeyspaceGroupLocalTSPath(keyPrefix string, groupID uint32, dcLocation string) string { + if groupID == utils.DefaultKeyspaceGroupID { + return path.Join(keyPrefix, dcLocation) + } + return path.Join(fmt.Sprintf("%05d", groupID), keyPrefix, dcLocation) +} + // TimestampPath returns the timestamp path for the given timestamp oracle path prefix. func TimestampPath(tsPath string) string { return path.Join(tsPath, TimestampKey) @@ -374,7 +386,7 @@ func TimestampPath(tsPath string) string { // /ms/{cluster_id}/tso/{group}/gta/timestamp func FullTimestampPath(clusterID uint64, groupID uint32) string { rootPath := TSOSvcRootPath(clusterID) - tsPath := TimestampPath(KeyspaceGroupTSPath(groupID)) + tsPath := TimestampPath(KeyspaceGroupGlobalTSPath(groupID)) if groupID == utils.DefaultKeyspaceGroupID { rootPath = LegacyRootPath(clusterID) } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index bc5f368a862..aaa29d41516 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -83,7 +83,6 @@ type GlobalTSOAllocator struct { // which is used to estimate the MaxTS in a Global TSO generation // to reduce the gRPC network IO latency. syncRTT atomic.Value // store as int64 milliseconds - // pre-initialized metrics tsoAllocatorRoleGauge prometheus.Gauge } @@ -116,7 +115,7 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { oracle := ×tampOracle{ client: am.member.GetLeadership().GetClient(), keyspaceGroupID: am.kgID, - tsPath: endpoint.KeyspaceGroupTSPath(am.kgID), + tsPath: endpoint.KeyspaceGroupGlobalTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 9ba3afb2a1f..7644f0b3c7a 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1272,7 +1272,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { - ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id)) + ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(id)) if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1429,7 +1429,7 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { // TODO: support the Local TSO Allocator clean up. err := kgm.tsoSvcStorage.DeleteTimestamp( endpoint.TimestampPath( - endpoint.KeyspaceGroupTSPath(groupID), + endpoint.KeyspaceGroupGlobalTSPath(groupID), ), ) if err != nil { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 2ffa802b4f3..a4c02f1c696 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -110,7 +110,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) // Check if the TSO key is created. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1)) re.NoError(err) return ts != typeutil.ZeroTime }) @@ -118,7 +118,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) // Check if the TSO key is deleted. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1)) + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1)) re.NoError(err) return ts == typeutil.ZeroTime }) @@ -137,7 +137,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { re.NotContains(mgr.deletedGroups, mcsutils.DefaultKeyspaceGroupID) mgr.RUnlock() // Default keyspace group TSO key should NOT be deleted. - ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID)) + ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(mcsutils.DefaultKeyspaceGroupID)) re.NoError(err) re.NotEmpty(ts) diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 5bbaea8c178..9d244d2531d 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -17,7 +17,6 @@ package tso import ( "context" "fmt" - "path" "runtime/trace" "sync/atomic" "time" @@ -27,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -50,7 +49,6 @@ type LocalTSOAllocator struct { // So it's not conflicted. rootPath string allocatorLeader atomic.Value // stored as *pdpb.Member - // pre-initialized metrics tsoAllocatorRoleGauge prometheus.Gauge } @@ -71,21 +69,10 @@ func NewLocalTSOAllocator( } func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadership, dcLocation string) *timestampOracle { - // Construct the timestampOracle path prefix, which is: - // 1. for the default keyspace group: - // lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp - // 2. for the non-default keyspace groups: - // {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp - var tsPath string - if am.kgID == utils.DefaultKeyspaceGroupID { - tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation) - } else { - tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation) - } oracle := ×tampOracle{ client: leadership.GetClient(), keyspaceGroupID: am.kgID, - tsPath: tsPath, + tsPath: endpoint.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval,