diff --git a/client/tso_client.go b/client/tso_client.go index e0b0579bbfca..35d9388c72b3 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -83,8 +83,8 @@ type tsoClient struct { tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest // dc-location -> deadline tsDeadline sync.Map // Same as map[string]chan deadline - // dc-location -> *lastTSO - lastTSMap sync.Map // Same as map[string]*lastTSO + // dc-location -> *tsoInfo while the tsoInfo is the last TSO info + lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo checkTSDeadlineCh chan struct{} checkTSODispatcherCh chan struct{} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index c1e94f0f2308..37bea8db9e54 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -40,10 +40,13 @@ type tsoDispatcher struct { tsoBatchController *tsoBatchController } -type lastTSO struct { - keyspaceGroupID uint32 - physical int64 - logical int64 +type tsoInfo struct { + tsoServer string + reqKeyspaceGroupID uint32 + respKeyspaceGroupID uint32 + respReceivedAt time.Time + physical int64 + logical int64 } const ( @@ -709,8 +712,9 @@ func (c *tsoClient) processRequests( requests := tbc.getCollectedRequests() count := int64(len(requests)) + reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( - c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(), + c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, dcLocation, requests, tbc.batchStartTime) if err != nil { c.finishRequest(requests, 0, 0, 0, err) @@ -718,53 +722,62 @@ func (c *tsoClient) processRequests( } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count) + curTSOInfo := &tsoInfo{ + tsoServer: stream.getServerAddr(), + reqKeyspaceGroupID: reqKeyspaceGroupID, + respKeyspaceGroupID: respKeyspaceGroupID, + respReceivedAt: time.Now(), + physical: physical, + logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), + } + c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil } func (c *tsoClient) compareAndSwapTS( - dcLocation string, respKeyspaceGroupID uint32, - physical, firstLogical int64, suffixBits uint32, count int64, + dcLocation string, + curTSOInfo *tsoInfo, + physical, firstLogical int64, ) { - largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits) - lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{ - keyspaceGroupID: respKeyspaceGroupID, - physical: physical, - // Save the largest logical part here - logical: largestLogical, - }) + val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo) if !loaded { return } - lastTSOPointer := lastTSOInterface.(*lastTSO) - lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID - lastPhysical := lastTSOPointer.physical - lastLogical := lastTSOPointer.logical - - if lastKeyspaceGroupID != respKeyspaceGroupID { + lastTSOInfo := val.(*tsoInfo) + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", zap.String("dc-location", dcLocation), - zap.Uint32("old-group-id", lastKeyspaceGroupID), - zap.Uint32("new-group-id", respKeyspaceGroupID)) + zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) } // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. - if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) { - panic(errors.Errorf( - "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+ - "last keyspace group: %d, keyspace in request: %d, "+ - "keyspace group in request: %d, keyspace group in response: %d", - dcLocation, physical, firstLogical, lastPhysical, lastLogical, - lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(), - c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID)) + // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // last time. + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), + ) } - lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID - lastTSOPointer.physical = physical - // Same as above, we save the largest logical part here. - lastTSOPointer.logical = largestLogical + lastTSOInfo.tsoServer = curTSOInfo.tsoServer + lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID + lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID + lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt + lastTSOInfo.physical = curTSOInfo.physical + lastTSOInfo.logical = curTSOInfo.logical } func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { diff --git a/client/tso_stream.go b/client/tso_stream.go index aaabbb1712e1..892512d85594 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface { type pdTSOStreamBuilderFactory struct{} func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder { - return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)} + return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()} } type tsoTSOStreamBuilderFactory struct{} func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder { - return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)} + return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()} } // TSO Stream Builder @@ -51,7 +51,8 @@ type tsoStreamBuilder interface { } type pdTSOStreamBuilder struct { - client pdpb.PDClient + serverAddr string + client pdpb.PDClient } func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) { @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &pdTSOStream{stream: stream}, nil + return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil } return nil, err } type tsoTSOStreamBuilder struct { - client tsopb.TSOClient + serverAddr string + client tsopb.TSOClient } func (b *tsoTSOStreamBuilder) build( @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build( stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &tsoTSOStream{stream: stream}, nil + return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil } return nil, err } @@ -98,6 +100,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha // TSO Stream type tsoStream interface { + getServerAddr() string // processRequests processes TSO requests in streaming mode to get timestamps processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, @@ -106,7 +109,12 @@ type tsoStream interface { } type pdTSOStream struct { - stream pdpb.PD_TsoClient + serverAddr string + stream pdpb.PD_TsoClient +} + +func (s *pdTSOStream) getServerAddr() string { + return s.serverAddr } func (s *pdTSOStream) processRequests( @@ -155,7 +163,12 @@ func (s *pdTSOStream) processRequests( } type tsoTSOStream struct { - stream tsopb.TSO_TsoClient + serverAddr string + stream tsopb.TSO_TsoClient +} + +func (s *tsoTSOStream) getServerAddr() string { + return s.serverAddr } func (s *tsoTSOStream) processRequests(