diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index f2381e9e7d8..d6a378704d8 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -274,6 +274,14 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } +// getGroupID returns the keyspace group ID of the allocator manager. +func (am *AllocatorManager) getGroupID() uint32 { + if am == nil { + return 0 + } + return am.kgID +} + // GetTimestampPath returns the timestamp path in etcd for the given DCLocation. func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { if am == nil { @@ -297,13 +305,13 @@ func (am *AllocatorManager) tsoAllocatorLoop() { defer am.svcLoopWG.Done() am.AllocatorDaemon(am.ctx) - log.Info("exit allocator loop") + log.Info("exit allocator loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) } // close is used to shutdown TSO Allocator updating daemon. // tso service call this function to shutdown the loop here, but pd manages its own loop. func (am *AllocatorManager) close() { - log.Info("closing the allocator manager") + log.Info("closing the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) if allocatorGroup, exist := am.getAllocatorGroup(GlobalDCLocation); exist { allocatorGroup.allocator.(*GlobalTSOAllocator).close() @@ -312,7 +320,7 @@ func (am *AllocatorManager) close() { am.cancel() am.svcLoopWG.Wait() - log.Info("closed the allocator manager") + log.Info("closed the allocator manager", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) } // GetMember returns the ElectionMember of this AllocatorManager. @@ -327,6 +335,7 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error { serverID := am.member.ID() if err := am.checkDCLocationUpperLimit(dcLocation); err != nil { log.Error("check dc-location upper limit failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID != 0), zap.Int("upper-limit", int(math.Pow(2, MaxSuffixBits))-1), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), @@ -345,12 +354,14 @@ func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error { } if !resp.Succeeded { log.Warn("write dc-location configuration into etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), zap.Uint64("server-id", serverID)) return errs.ErrEtcdTxnConflict.FastGenByArgs() } log.Info("write dc-location configuration into etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("server-name", serverName), zap.Uint64("server-id", serverID)) @@ -393,6 +404,7 @@ func (am *AllocatorManager) GetClusterDCLocationsFromEtcd() (clusterDCLocations dcLocation := string(kv.Value) if err != nil { log.Warn("get server id and dcLocation from etcd failed, invalid server id", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Any("splitted-serverPath", serverPath), zap.String("dc-location", dcLocation), errs.ZapError(err)) @@ -427,7 +439,9 @@ func (am *AllocatorManager) CleanUpDCLocation() error { } else if !resp.Succeeded { return errs.ErrEtcdTxnConflict.FastGenByArgs() } - log.Info("delete the dc-location key previously written in etcd", zap.Uint64("server-id", serverID)) + log.Info("delete the dc-location key previously written in etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.Uint64("server-id", serverID)) go am.ClusterDCLocationChecker() return nil } @@ -493,6 +507,7 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string { func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) { defer logutil.LogPanic() defer log.Info("server is closed, return local tso allocator leader loop", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.String("local-tso-allocator-name", am.member.Name())) for { @@ -509,11 +524,13 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * } if allocatorLeader != nil { log.Info("start to watch allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader), zap.String("local-tso-allocator-name", am.member.Name())) // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed. allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev) log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation())) } @@ -521,6 +538,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation()) if err != nil { log.Error("get next leader from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) time.Sleep(200 * time.Millisecond) @@ -530,6 +548,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * if nextLeader != 0 { if nextLeader != am.member.ID() { log.Info("skip campaigning of the local tso allocator leader and check later", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("server-name", am.member.Name()), zap.Uint64("server-id", am.member.ID()), zap.Uint64("next-leader-id", nextLeader)) @@ -544,6 +563,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation()) if err != nil { log.Error("get dc-location info from pd leader failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) // PD leader hasn't been elected out, wait for the campaign @@ -554,6 +574,7 @@ func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator * } if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil { log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("wait-duration", checkStep.String())) @@ -590,6 +611,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( isNextLeader bool, ) { log.Info("start to campaign local tso allocator leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -613,11 +635,13 @@ func (am *AllocatorManager) campaignAllocatorLeader( if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { log.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) } else { log.Error("failed to campaign local tso allocator leader due to etcd error", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name()), @@ -633,16 +657,19 @@ func (am *AllocatorManager) campaignAllocatorLeader( // Maintain the Local TSO Allocator leader go allocator.KeepAllocatorLeader(ctx) log.Info("campaign local tso allocator leader ok", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) log.Info("initialize the local TSO allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil { log.Error("failed to initialize the local TSO allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), errs.ZapError(err)) @@ -651,6 +678,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( if dcLocationInfo.GetMaxTs().GetPhysical() != 0 { if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil { log.Error("failed to write the max local TSO after member changed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), errs.ZapError(err)) @@ -662,6 +690,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( // The next leader is me, delete it to finish campaigning am.deleteNextLeaderID(allocator.GetDCLocation()) log.Info("local tso allocator leader is ready to serve", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -674,6 +703,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( case <-leaderTicker.C: if !allocator.IsAllocatorLeader() { log.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -682,6 +712,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( case <-ctx.Done(): // Server is closed and it should return nil. log.Info("server is closed, reset the local tso allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("name", am.member.Name())) @@ -693,7 +724,7 @@ func (am *AllocatorManager) campaignAllocatorLeader( // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { - log.Info("entering into allocator daemon") + log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) // allocatorPatroller should only work when enableLocalTSO is true to // set up the new Local TSO Allocator in time. @@ -730,7 +761,7 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { // PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run, // we should run them concurrently to speed up the progress. case <-ctx.Done(): - log.Info("exit allocator daemon") + log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return } } @@ -757,17 +788,20 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { case <-ag.ctx.Done(): // Resetting the allocator will clear TSO in memory ag.allocator.Reset() - log.Info("exit the allocator update loop") + log.Info("exit the allocator update loop", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) return default: } if !ag.leadership.Check() { - log.Info("allocator doesn't campaign leadership yet", zap.String("dc-location", ag.dcLocation)) + log.Info("allocator doesn't campaign leadership yet", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", ag.dcLocation)) time.Sleep(200 * time.Millisecond) return } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", ag.dcLocation), zap.String("name", am.member.Name()), errs.ZapError(err)) @@ -813,7 +847,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { } newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd() if err != nil { - log.Error("get cluster dc-locations from etcd failed", errs.ZapError(err)) + log.Error("get cluster dc-locations from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + errs.ZapError(err)) return } am.mu.Lock() @@ -844,7 +880,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation) if err != nil { log.Warn("get or create the local tso suffix failed", - zap.String("dc-location", dcLocation), errs.ZapError(err)) + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", dcLocation), + errs.ZapError(err)) continue } if suffix > am.mu.maxSuffix { @@ -856,7 +894,9 @@ func (am *AllocatorManager) ClusterDCLocationChecker() { // Follower should check and update the am.mu.maxSuffix maxSuffix, err := am.getMaxLocalTSOSuffix() if err != nil { - log.Error("get the max local tso suffix from etcd failed", errs.ZapError(err)) + log.Error("get the max local tso suffix from etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + errs.ZapError(err)) // Rollback the new dc-locations we update before for _, dcLocation := range newDCLocations { delete(am.mu.clusterDCLocations, dcLocation) @@ -901,6 +941,7 @@ func (am *AllocatorManager) getOrCreateLocalTSOSuffix(dcLocation string) (int32, } if !txnResp.Succeeded { log.Warn("write local tso suffix into etcd failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.String("local-tso-suffix", localTSOSuffixValue), zap.String("server-name", am.member.Name()), @@ -983,12 +1024,14 @@ func (am *AllocatorManager) PriorityChecker() { // find this allocator's dc-location isn't the same with server of dc-2 but is same with itself. if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation { log.Info("try to move the local tso allocator", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Uint64("old-leader-id", leaderServerID), zap.String("old-dc-location", leaderServerDCLocation), zap.Uint64("next-leader-id", serverID), zap.String("next-dc-location", myServerDCLocation)) if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil { log.Error("move the local tso allocator failed", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.Uint64("old-leader-id", leaderServerID), zap.String("old-dc-location", leaderServerDCLocation), zap.Uint64("next-leader-id", serverID), @@ -1005,12 +1048,16 @@ func (am *AllocatorManager) PriorityChecker() { nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation) if err != nil { log.Error("get next leader from etcd failed", - zap.String("dc-location", allocatorGroup.dcLocation), errs.ZapError(err)) + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", allocatorGroup.dcLocation), + errs.ZapError(err)) continue } // nextLeader is not empty and isn't same with the server ID, resign the leader if nextLeader != 0 && nextLeader != serverID { - log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader)) + log.Info("next leader key found, resign current leader", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.Uint64("nextLeaderID", nextLeader)) am.ResetAllocatorGroup(allocatorGroup.dcLocation) } } @@ -1286,6 +1333,7 @@ func (am *AllocatorManager) setGRPCConn(newConn *grpc.ClientConn, addr string) { if _, ok := am.localAllocatorConn.clientConns[addr]; ok { newConn.Close() log.Debug("use old connection", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("target", newConn.Target()), zap.String("state", newConn.GetState().String())) return @@ -1303,6 +1351,7 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u if err != nil { err = errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause() log.Error("failed to grant the lease of the next leader key", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID), errs.ZapError(err)) @@ -1315,12 +1364,15 @@ func (am *AllocatorManager) transferLocalAllocator(dcLocation string, serverID u if err != nil { err = errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() log.Error("failed to write next leader key into etcd", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), zap.String("dc-location", dcLocation), zap.Uint64("serverID", serverID), errs.ZapError(err)) return err } if !resp.Succeeded { - log.Warn("write next leader id into etcd unsuccessfully", zap.String("dc-location", dcLocation)) + log.Warn("write next leader id into etcd unsuccessfully", + logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), + zap.String("dc-location", dcLocation)) return errs.ErrEtcdTxnConflict.GenWithStack("write next leader id into etcd unsuccessfully") } return nil diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 1d961fd1b95..284d7dc316a 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -133,6 +133,14 @@ func (gta *GlobalTSOAllocator) close() { gta.wg.Wait() } +// getGroupID returns the keyspace group ID of the allocator. +func (gta *GlobalTSOAllocator) getGroupID() uint32 { + if gta.am == nil { + return 0 + } + return gta.am.getGroupID() +} + func (gta *GlobalTSOAllocator) setSyncRTT(rtt int64) { gta.syncRTT.Store(rtt) tsoGauge.WithLabelValues("global_tso_sync_rtt", gta.timestampOracle.dcLocation).Set(float64(rtt)) @@ -234,7 +242,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT. estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(count, suffixBits) if err != nil { - log.Error("global tso allocator estimates MaxTS failed", errs.ZapError(err)) + log.Error("global tso allocator estimates MaxTS failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } if shouldRetry { @@ -247,7 +257,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // we need to validate it first before we write it into every Local TSO Allocator's memory. globalTSOResp = *estimatedMaxTSO if err = gta.SyncMaxTS(ctx, dcLocationMap, &globalTSOResp, skipCheck); err != nil { - log.Error("global tso allocator synchronizes MaxTS failed", errs.ZapError(err)) + log.Error("global tso allocator synchronizes MaxTS failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } // 3. If skipCheck is false and the maxTSO is bigger than estimatedMaxTSO, @@ -271,7 +283,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // 4. Persist MaxTS into memory, and etcd if needed var currentGlobalTSO *pdpb.Timestamp if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil { - log.Error("global tso allocator gets the current global tso in memory failed", errs.ZapError(err)) + log.Error("global tso allocator gets the current global tso in memory failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } if tsoutil.CompareTimestamp(currentGlobalTSO, &globalTSOResp) < 0 { @@ -279,7 +293,9 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // Update the Global TSO in memory if err = gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tsoutil.GenerateTS(&globalTSOResp), true); err != nil { tsoCounter.WithLabelValues("global_tso_persist_err", gta.timestampOracle.dcLocation).Inc() - log.Error("global tso allocator update the global tso in memory failed", errs.ZapError(err)) + log.Error("global tso allocator update the global tso in memory failed", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) continue } } @@ -314,6 +330,7 @@ func (gta *GlobalTSOAllocator) precheckLogical(maxTSO *pdpb.Timestamp, suffixBit // Check if the logical part will reach the overflow condition after being differentiated. if caliLogical := gta.timestampOracle.calibrateLogical(maxTSO.Logical, suffixBits); caliLogical >= maxLogical { log.Error("estimated logical part outside of max logical interval, please check ntp time", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.Reflect("max-tso", maxTSO), errs.ZapError(errs.ErrLogicOverflow)) tsoCounter.WithLabelValues("precheck_logical_overflow", gta.timestampOracle.dcLocation).Inc() return false @@ -400,12 +417,14 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( respCh <- syncMaxTSResp if syncMaxTSResp.err != nil { log.Error("sync max ts rpc failed, got an error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(err)) return } if syncMaxTSResp.rpcRes.GetHeader().GetError() != nil { log.Error("sync max ts rpc failed, got an error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("local-allocator-leader-url", leaderConn.Target()), errs.ZapError(errors.New(syncMaxTSResp.rpcRes.GetHeader().GetError().String()))) return @@ -456,7 +475,10 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( // Check whether all dc-locations have been considered during the synchronization and retry once if any dc-location missed. if ok, unsyncedDCs := gta.checkSyncedDCs(dcLocationMap, syncedDCs); !ok { log.Info("unsynced dc-locations found, will retry", - zap.Bool("skip-check", skipCheck), zap.Strings("synced-DCs", syncedDCs), zap.Strings("unsynced-DCs", unsyncedDCs)) + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.Bool("skip-check", skipCheck), + zap.Strings("synced-DCs", syncedDCs), + zap.Strings("unsynced-DCs", unsyncedDCs)) if i < syncMaxRetryCount-1 { // maxTSO should remain the same. *maxTSO = originalMaxTSO @@ -483,7 +505,10 @@ func (gta *GlobalTSOAllocator) checkSyncedDCs(dcLocationMap map[string]DCLocatio unsyncedDCs = append(unsyncedDCs, dcLocation) } } - log.Debug("check unsynced dc-locations", zap.Strings("unsynced-DCs", unsyncedDCs), zap.Strings("synced-DCs", syncedDCs)) + log.Debug("check unsynced dc-locations", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.Strings("unsynced-DCs", unsyncedDCs), + zap.Strings("synced-DCs", syncedDCs)) return len(unsyncedDCs) == 0, unsyncedDCs } @@ -508,7 +533,8 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { for { select { case <-gta.ctx.Done(): - log.Info("exit the global tso primary election loop") + log.Info("exit the global tso primary election loop", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return default: } @@ -519,11 +545,13 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } if primary != nil { log.Info("start to watch the primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name()), zap.Stringer("tso-primary", primary)) // Watch will keep looping and never return unless the primary has changed. primary.Watch(gta.ctx) - log.Info("the tso primary has changed, try to re-campaign a primary") + log.Info("the tso primary has changed, try to re-campaign a primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) } gta.campaignLeader() @@ -531,16 +559,21 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } func (gta *GlobalTSOAllocator) campaignLeader() { - log.Info("start to campaign the primary", zap.String("campaign-tso-primary-name", gta.member.Name())) + log.Info("start to campaign the primary", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil { if errors.Is(err, errs.ErrEtcdTxnConflict) { log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) } else if errors.Is(err, errs.ErrCheckCampaign) { log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name())) } else { log.Error("campaign tso primary meets error due to etcd error", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), zap.String("campaign-tso-primary-name", gta.member.Name()), errs.ZapError(err)) } return @@ -559,16 +592,22 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // maintain the the leadership, after this, TSO can be service. gta.member.KeepLeader(ctx) - log.Info("campaign tso primary ok", zap.String("campaign-tso-primary-name", gta.member.Name())) + log.Info("campaign tso primary ok", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("campaign-tso-primary-name", gta.member.Name())) allocator, err := gta.am.GetAllocator(GlobalDCLocation) if err != nil { - log.Error("failed to get the global tso allocator", errs.ZapError(err)) + log.Error("failed to get the global tso allocator", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) return } log.Info("initializing the global tso allocator") if err := allocator.Initialize(0); err != nil { - log.Error("failed to initialize the global tso allocator", errs.ZapError(err)) + log.Error("failed to initialize the global tso allocator", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + errs.ZapError(err)) return } defer func() { @@ -583,7 +622,9 @@ func (gta *GlobalTSOAllocator) campaignLeader() { // TODO: if enable-local-tso is true, check the cluster dc-location after the primary is elected // go gta.tsoAllocatorManager.ClusterDCLocationChecker() - log.Info("tso primary is ready to serve", zap.String("tso-primary-name", gta.member.Name())) + log.Info("tso primary is ready to serve", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), + zap.String("tso-primary-name", gta.member.Name())) leaderTicker := time.NewTicker(mcsutils.LeaderTickInterval) defer leaderTicker.Stop() @@ -592,12 +633,14 @@ func (gta *GlobalTSOAllocator) campaignLeader() { select { case <-leaderTicker.C: if !gta.member.IsLeader() { - log.Info("no longer a primary because lease has expired, the tso primary will step down") + log.Info("no longer a primary because lease has expired, the tso primary will step down", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return } case <-ctx.Done(): // Server is closed and it should return nil. - log.Info("exit leader campaign") + log.Info("exit leader campaign", + logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return } } diff --git a/pkg/utils/logutil/log.go b/pkg/utils/logutil/log.go index a46f47da891..abb6a2783a0 100644 --- a/pkg/utils/logutil/log.go +++ b/pkg/utils/logutil/log.go @@ -152,3 +152,13 @@ type stringer struct { func (s stringer) String() string { return "?" } + +// CondUint32 constructs a field with the given key and value conditionally. +// If the condition is true, it constructs a field with uint32 type; otherwise, +// skip the field. +func CondUint32(key string, val uint32, condition bool) zap.Field { + if condition { + return zap.Uint32(key, val) + } + return zap.Skip() +}