Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: pre-initialize the TSO metrics with the group label #7012

Merged
merged 5 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,18 +350,30 @@
return sb.String()
}

// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// KeyspaceGroupGlobalTSPath constructs the timestampOracle path prefix for Global TSO, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func KeyspaceGroupTSPath(groupID uint32) string {
func KeyspaceGroupGlobalTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// KeyspaceGroupLocalTSPath constructs the timestampOracle path prefix for Local TSO, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
func KeyspaceGroupLocalTSPath(keyPrefix string, groupID uint32, dcLocation string) string {
if groupID == utils.DefaultKeyspaceGroupID {
return path.Join(keyPrefix, dcLocation)
}
return path.Join(fmt.Sprintf("%05d", groupID), keyPrefix, dcLocation)

Check warning on line 374 in pkg/storage/endpoint/key_path.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/key_path.go#L374

Added line #L374 was not covered by tests
}

// TimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func TimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
Expand All @@ -374,7 +386,7 @@
// /ms/{cluster_id}/tso/{group}/gta/timestamp
func FullTimestampPath(clusterID uint64, groupID uint32) string {
rootPath := TSOSvcRootPath(clusterID)
tsPath := TimestampPath(KeyspaceGroupTSPath(groupID))
tsPath := TimestampPath(KeyspaceGroupGlobalTSPath(groupID))
if groupID == utils.DefaultKeyspaceGroupID {
rootPath = LegacyRootPath(clusterID)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@
return am.kgID
}

// getGroupIDStr returns the keyspace group ID of the allocator manager in string format.
func (am *AllocatorManager) getGroupIDStr() string {
if am == nil {
return "0"

Check warning on line 288 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L288

Added line #L288 was not covered by tests
}
return strconv.FormatUint(uint64(am.kgID), 10)
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
Expand Down
65 changes: 40 additions & 25 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
Expand Down Expand Up @@ -82,6 +83,8 @@
// which is used to estimate the MaxTS in a Global TSO generation
// to reduce the gRPC network IO latency.
syncRTT atomic.Value // store as int64 milliseconds
// pre-initialized metrics
tsoAllocatorRoleGauge prometheus.Gauge
}

// NewGlobalTSOAllocator creates a new global TSO allocator.
Expand All @@ -92,20 +95,12 @@
) Allocator {
ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
cancel: cancel,
am: am,
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: endpoint.KeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: GlobalDCLocation,
tsoMux: &tsoObject{},
},
ctx: ctx,
cancel: cancel,
am: am,
member: am.member,
timestampOracle: newGlobalTimestampOracle(am),
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation),
}

if startGlobalLeaderLoop {
Expand All @@ -116,6 +111,22 @@
return gta
}

func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle {
oracle := &timestampOracle{
client: am.member.GetLeadership().GetClient(),
keyspaceGroupID: am.kgID,
tsPath: endpoint.KeyspaceGroupGlobalTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: GlobalDCLocation,
tsoMux: &tsoObject{},
metrics: newTSOMetrics(am.getGroupIDStr(), GlobalDCLocation),
}
return oracle
}

// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
Expand All @@ -133,7 +144,7 @@

func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) {
gta.syncRTT.Store(rtt)
tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt))
gta.getMetrics().globalTSOSyncRTTGauge.Set(float64(rtt))
}

func (gta *GlobalTSOAllocator) getSyncRTT() int64 {
Expand Down Expand Up @@ -172,7 +183,7 @@

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(1)
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
return gta.timestampOracle.SyncTimestamp(gta.member.GetLeadership())
Expand Down Expand Up @@ -206,7 +217,7 @@
func (gta *GlobalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "GlobalTSOAllocator.GenerateTSO").End()
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().notLeaderEvent.Inc()
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))
}
// To check if we have any dc-location configured in the cluster
Expand Down Expand Up @@ -257,7 +268,7 @@
// 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO,
// we need to redo the setting phase with the bigger one and skip the check safely.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) > 0 {
tsoCounter.WithLabelValues("global_tso_sync", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOSyncEvent.Inc()
*estimatedMaxTSO = globalTSOResp
// Re-add the count and check the overflow.
estimatedMaxTSO.Logical += int64(count)
Expand All @@ -270,7 +281,7 @@
}
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 {
tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOEstimateEvent.Inc()

Check warning on line 284 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L284

Added line #L284 was not covered by tests
}
// 4. Persist MaxTS into memory, and etcd if needed
var currentGlobalTSO *pdpb.Timestamp
Expand All @@ -281,10 +292,10 @@
continue
}
if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 {
tsoCounter.WithLabelValues("global_tso_persist", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().globalTSOPersistEvent.Inc()
// Update the Global TSO in memory
if err = gta.timestampOracle.resetUserTimestamp(ctx1, gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil {
tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().errGlobalTSOPersistEvent.Inc()

Check warning on line 298 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L298

Added line #L298 was not covered by tests
log.Error("global tso allocator update the global tso in memory failed",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
errs.ZapError(err))
Expand All @@ -293,15 +304,15 @@
}
// 5. Check leadership again before we returning the response.
if !gta.member.GetLeadership().Check() {
tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().notLeaderAnymoreEvent.Inc()

Check warning on line 307 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L307

Added line #L307 was not covered by tests
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr))
}
// 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster
globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits)
globalTSOResp.SuffixBits = uint32(suffixBits)
return globalTSOResp, nil
}
tsoCounter.WithLabelValues("exceeded_max_retry", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().exceededMaxRetryEvent.Inc()

Check warning on line 315 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L315

Added line #L315 was not covered by tests
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("global tso allocator maximum number of retries exceeded")
}

Expand All @@ -324,7 +335,7 @@
log.Error("estimated logical part outside of max logical interval, please check ntp time",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow))
tsoCounter.WithLabelValues("precheck_logical_overflow", gta.timestampOracle.dcLocation).Inc()
gta.getMetrics().precheckLogicalOverflowEvent.Inc()

Check warning on line 338 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L338

Added line #L338 was not covered by tests
return false
}
return true
Expand Down Expand Up @@ -516,7 +527,7 @@

// Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() {
tsoAllocatorRole.WithLabelValues(gta.timestampOracle.dcLocation).Set(0)
gta.tsoAllocatorRoleGauge.Set(0)
gta.timestampOracle.ResetTimestamp()
}

Expand Down Expand Up @@ -639,3 +650,7 @@
}
}
}

func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {
return gta.timestampOracle.metrics
}
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// calculate the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(id))
if err != nil {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1429,7 +1429,7 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.TimestampPath(
endpoint.KeyspaceGroupTSPath(groupID),
endpoint.KeyspaceGroupGlobalTSPath(groupID),
),
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})})
// Check if the TSO key is created.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1))
re.NoError(err)
return ts != typeutil.ZeroTime
})
// Delete keyspace group 1.
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)})
// Check if the TSO key is deleted.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(1))
re.NoError(err)
return ts == typeutil.ZeroTime
})
Expand All @@ -137,7 +137,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
re.NotContains(mgr.deletedGroups, mcsutils.DefaultKeyspaceGroupID)
mgr.RUnlock()
// Default keyspace group TSO key should NOT be deleted.
ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID))
ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.KeyspaceGroupGlobalTSPath(mcsutils.DefaultKeyspaceGroupID))
re.NoError(err)
re.NotEmpty(ts)

Expand Down
61 changes: 32 additions & 29 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
import (
"context"
"fmt"
"path"
"runtime/trace"
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -49,6 +49,8 @@
// So it's not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
// pre-initialized metrics
tsoAllocatorRoleGauge prometheus.Gauge
}

// NewLocalTSOAllocator creates a new local TSO allocator.
Expand All @@ -57,32 +59,29 @@
leadership *election.Leadership,
dcLocation string,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
var tsPath string
if am.kgID == utils.DefaultKeyspaceGroupID {
tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation)
} else {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation)
}
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
tsoMux: &tsoObject{},
},
rootPath: leadership.GetLeaderKey(),
allocatorManager: am,
leadership: leadership,
timestampOracle: newLocalTimestampOracle(am, leadership, dcLocation),
rootPath: leadership.GetLeaderKey(),
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), dcLocation),
}
}

func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadership, dcLocation string) *timestampOracle {
oracle := &timestampOracle{
client: leadership.GetClient(),
keyspaceGroupID: am.kgID,
tsPath: endpoint.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
tsoMux: &tsoObject{},
metrics: newTSOMetrics(am.getGroupIDStr(), dcLocation),
}
return oracle
}

// GetTimestampPath returns the timestamp path in etcd.
Expand All @@ -100,7 +99,7 @@

// Initialize will initialize the created local TSO allocator.
func (lta *LocalTSOAllocator) Initialize(suffix int) error {
tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(1)
lta.tsoAllocatorRoleGauge.Set(1)
lta.timestampOracle.suffix = suffix
return lta.timestampOracle.SyncTimestamp(lta.leadership)
}
Expand All @@ -126,7 +125,7 @@
func (lta *LocalTSOAllocator) GenerateTSO(ctx context.Context, count uint32) (pdpb.Timestamp, error) {
defer trace.StartRegion(ctx, "LocalTSOAllocator.GenerateTSO").End()
if !lta.leadership.Check() {
tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc()
lta.getMetrics().notLeaderEvent.Inc()

Check warning on line 128 in pkg/tso/local_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/local_allocator.go#L128

Added line #L128 was not covered by tests
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(
fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
Expand All @@ -135,7 +134,7 @@

// Reset is used to reset the TSO allocator.
func (lta *LocalTSOAllocator) Reset() {
tsoAllocatorRole.WithLabelValues(lta.timestampOracle.dcLocation).Set(0)
lta.tsoAllocatorRoleGauge.Set(0)
lta.timestampOracle.ResetTimestamp()
}

Expand Down Expand Up @@ -260,3 +259,7 @@
lta.leadership.Watch(serverCtx, revision)
lta.unsetAllocatorLeader()
}

func (lta *LocalTSOAllocator) getMetrics() *tsoMetrics {
return lta.timestampOracle.metrics

Check warning on line 264 in pkg/tso/local_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/local_allocator.go#L264

Added line #L264 was not covered by tests
}
Loading
Loading