Skip to content

Commit

Permalink
Merge branch 'master' into fix-unsafe-recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Jun 28, 2023
2 parents c50eb1a + 6f0c63b commit f359fbd
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 46 deletions.
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type tsoClient struct {
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo

checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
Expand Down
85 changes: 49 additions & 36 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ type tsoDispatcher struct {
tsoBatchController *tsoBatchController
}

type lastTSO struct {
keyspaceGroupID uint32
physical int64
logical int64
type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -709,62 +712,72 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerAddr(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
Expand Down
29 changes: 21 additions & 8 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface {
type pdTSOStreamBuilderFactory struct{}

func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)}
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()}
}

type tsoTSOStreamBuilderFactory struct{}

func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)}
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()}
}

// TSO Stream Builder
Expand All @@ -51,7 +51,8 @@ type tsoStreamBuilder interface {
}

type pdTSOStreamBuilder struct {
client pdpb.PDClient
serverAddr string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
Expand All @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream}, nil
return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}

type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
serverAddr string
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(
Expand All @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build(
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream}, nil
return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}
Expand All @@ -98,6 +100,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
// TSO Stream

type tsoStream interface {
getServerAddr() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
Expand All @@ -106,7 +109,12 @@ type tsoStream interface {
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
serverAddr string
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *pdTSOStream) processRequests(
Expand Down Expand Up @@ -155,7 +163,12 @@ func (s *pdTSOStream) processRequests(
}

type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
serverAddr string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *tsoTSOStream) processRequests(
Expand Down

0 comments on commit f359fbd

Please sign in to comment.