Skip to content

Commit

Permalink
tso: pre-initialize the TSO metrics with the group label (#7012)
Browse files Browse the repository at this point in the history
ref #7011

Pre-initialize the TSO metrics with the group label.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Aug 31, 2023
1 parent 7a4e1e2 commit a7353bb
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 89 deletions.
18 changes: 15 additions & 3 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,18 +350,30 @@ func buildPath(withSuffix bool, str ...string) string {
return sb.String()
}

// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// KeyspaceGroupGlobalTSPath constructs the timestampOracle path prefix for Global TSO, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func KeyspaceGroupTSPath(groupID uint32) string {
func KeyspaceGroupGlobalTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// KeyspaceGroupLocalTSPath constructs the timestampOracle path prefix for Local TSO, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
func KeyspaceGroupLocalTSPath(keyPrefix string, groupID uint32, dcLocation string) string {
if groupID == utils.DefaultKeyspaceGroupID {
return path.Join(keyPrefix, dcLocation)
}
return path.Join(fmt.Sprintf("%05d", groupID), keyPrefix, dcLocation)
}

// TimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func TimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
Expand All @@ -374,7 +386,7 @@ func TimestampPath(tsPath string) string {
// /ms/{cluster_id}/tso/{group}/gta/timestamp
func FullTimestampPath(clusterID uint64, groupID uint32) string {
rootPath := TSOSvcRootPath(clusterID)
tsPath := TimestampPath(KeyspaceGroupTSPath(groupID))
tsPath := TimestampPath(KeyspaceGroupGlobalTSPath(groupID))
if groupID == utils.DefaultKeyspaceGroupID {
rootPath = LegacyRootPath(clusterID)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ func (am *AllocatorManager) getGroupID() uint32 {
return am.kgID
}

// getGroupIDStr returns the keyspace group ID of the allocator manager in string format.
func (am *AllocatorManager) getGroupIDStr() string {
if am == nil {
return "0"
}
return strconv.FormatUint(uint64(am.kgID), 10)
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
Expand Down
65 changes: 40 additions & 25 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -82,6 +83,8 @@ type GlobalTSOAllocator struct {
// which is used to estimate the MaxTS in a Global TSO generation
// to reduce the gRPC network IO latency.
syncRTT atomic.Value // store as int64 milliseconds
// pre-initialized metrics
tsoAllocatorRoleGauge prometheus.Gauge
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
Expand All @@ -92,20 +95,12 @@ func NewGlobalTSOAllocator(
) Allocator {
ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
cancel: cancel,
am: am,
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: endpoint.KeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: GlobalDCLocation,
tsoMux: &tsoObject{},
},
ctx: ctx,
cancel: cancel,
am: am,
member: am.member,
timestampOracle: newGlobalTimestampOracle(am),
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation),
}

if startGlobalLeaderLoop {
Expand All @@ -116,6 +111,22 @@ func NewGlobalTSOAllocator(
return gta
}

func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle {
oracle := &timestampOracle{
client: am.member.GetLeadership().GetClient(),
keyspaceGroupID: am.kgID,
tsPath: endpoint.KeyspaceGroupGlobalTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: GlobalDCLocation,
tsoMux: &tsoObject{},
metrics: newTSOMetrics(am.getGroupIDStr(), GlobalDCLocation),
}
return oracle
}

// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
Expand All @@ -133,7 +144,7 @@ func (gta *GlobalTSOAllocator) getGroupID() uint32 {

func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) {
gta.syncRTT.Store(rtt)
tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt))
gta.getMetrics().globalTSOSyncRTTGauge.Set(float64(rtt))
}

func (gta *GlobalTSOAllocator) getSyncRTT() int64 {
Expand Down Expand Up @@ -172,7 +183,7 @@ func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32,

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(1)
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
return gta.timestampOracle.SyncTimestamp(gta.member.GetLeadership())
Expand Down Expand Up @@ -206,7 +217,7 @@ func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundC
func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End()
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().notLeaderEvent.Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
}
// To check if we have any dc-location configured in the cluster
Expand Down Expand Up @@ -257,7 +268,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
// 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO,
// we need to redo the setting phase with the bigger one and skip the check safely.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) > 0 {
tsoCounter.WithLabelValues("global_tso_sync", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOSyncEvent.Inc()
*estimatedMaxTSO = globalTSOResp
// Re-add the count and check the overflow.
estimatedMaxTSO.Logical += int64(count)
Expand All @@ -270,7 +281,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
}
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 {
tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOEstimateEvent.Inc()
}
// 4. Persist MaxTS into memory, and etcd if needed
var currentGlobalTSO *pdpb.Timestamp
Expand All @@ -281,10 +292,10 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
continue
}
if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 {
tsoCounter.WithLabelValues("global_tso_persist", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOPersistEvent.Inc()
// Update the Global TSO in memory
if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().errGlobalTSOPersistEvent.Inc()
log.Error("global tso allocator update the global tso in memory failed",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
errs.ZapError(err))
Expand All @@ -293,15 +304,15 @@ func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (p
}
// 5. Check leadership again before we returning the response.
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().notLeaderAnymoreEvent.Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
// 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster
globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits)
globalTSOResp.SuffixBits = uint32(suffixBits)
return globalTSOResp, nil
}
tsoCounter.WithLabelValues("exceeded_max_retry", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().exceededMaxRetryEvent.Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("global tso allocator maximum number of retries exceeded")
}

Expand All @@ -324,7 +335,7 @@ func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBit
log.Error("estimated logical part outside of max logical interval, please check ntp time",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow))
tsoCounter.WithLabelValues("precheck_logical_overflow", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().precheckLogicalOverflowEvent.Inc()
return false
}
return true
Expand Down Expand Up @@ -516,7 +527,7 @@ func (gta *GlobalTSOAllocator) getCurrentTSO(ctx context.Context) (*pdpb.Timesta

// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(0)
gta.tsoAllocatorRoleGauge.Set(0)
gta.timestampOracle.ResetTimestamp()
}

Expand Down Expand Up @@ -639,3 +650,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
}
}
}

func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {
return gta.timestampOracle.metrics
}
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// calculate the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(id))
if err != nil {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1429,7 +1429,7 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.TimestampPath(
endpoint.KeyspaceGroupTSPath(groupID),
endpoint.KeyspaceGroupGlobalTSPath(groupID),
),
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})})
// Check if the TSO key is created.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1))
re.NoError(err)
return ts != typeutil.ZeroTime
})
// Delete keyspace group 1.
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)})
// Check if the TSO key is deleted.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1))
re.NoError(err)
return ts == typeutil.ZeroTime
})
Expand All @@ -137,7 +137,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
re.NotContains(mgr.deletedGroups, mcsutils.DefaultKeyspaceGroupID)
mgr.RUnlock()
// Default keyspace group TSO key should NOT be deleted.
ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID))
ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(mcsutils.DefaultKeyspaceGroupID))
re.NoError(err)
re.NotEmpty(ts)

Expand Down
61 changes: 32 additions & 29 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ package tso
import (
"context"
"fmt"
"path"
"runtime/trace"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -49,6 +49,8 @@ type LocalTSOAllocator struct {
// So it's not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
// pre-initialized metrics
tsoAllocatorRoleGauge prometheus.Gauge
}

// NewLocalTSOAllocator creates a new local TSO allocator.
Expand All @@ -57,32 +59,29 @@ func NewLocalTSOAllocator(
leadership *election.Leadership,
dcLocation string,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
var tsPath string
if am.kgID == utils.DefaultKeyspaceGroupID {
tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation)
} else {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation)
}
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
tsoMux: &tsoObject{},
},
rootPath: leadership.GetLeaderKey(),
allocatorManager: am,
leadership: leadership,
timestampOracle: newLocalTimestampOracle(am, leadership, dcLocation),
rootPath: leadership.GetLeaderKey(),
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), dcLocation),
}
}

func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadership, dcLocation string) *timestampOracle {
oracle := &timestampOracle{
client: leadership.GetClient(),
keyspaceGroupID: am.kgID,
tsPath: endpoint.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
tsoMux: &tsoObject{},
metrics: newTSOMetrics(am.getGroupIDStr(), dcLocation),
}
return oracle
}

// GetTimestampPath returns the timestamp path in etcd.
Expand All @@ -100,7 +99,7 @@ func (lta *LocalTSOAllocator) GetDCLocation() string {

// Initialize will initialize the created local TSO allocator.
func (lta *LocalTSOAllocator) Initialize(suffix int) error {
tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(1)
lta.tsoAllocatorRoleGauge.Set(1)
lta.timestampOracle.suffix = suffix
return lta.timestampOracle.SyncTimestamp(lta.leadership)
}
Expand All @@ -126,7 +125,7 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh
func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End()
if !lta.leadership.Check() {
tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc()
lta.getMetrics().notLeaderEvent.Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(
fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
Expand All @@ -135,7 +134,7 @@ func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pd

// Reset is used to reset the TSO allocator.
func (lta *LocalTSOAllocator) Reset() {
tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(0)
lta.tsoAllocatorRoleGauge.Set(0)
lta.timestampOracle.ResetTimestamp()
}

Expand Down Expand Up @@ -260,3 +259,7 @@ func (lta *LocalTSOAllocator) WatchAllocatorLeader(serverCtx context.Context, al
lta.leadership.Watch(serverCtx, revision)
lta.unsetAllocatorLeader()
}

func (lta *LocalTSOAllocator) getMetrics() *tsoMetrics {
return lta.timestampOracle.metrics
}
Loading

0 comments on commit a7353bb

Please sign in to comment.