Skip to content

Commit

Permalink
Bin/pick multi timeline fixes (tikv#128)
Browse files Browse the repository at this point in the history
* *: add group id to error logs (tikv#6695)

close tikv#6685

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tools: add keepalive for pd-tso-bench (tikv#6699)

close tikv#6681

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* Add more debugging info to time fallback log. (tikv#6700)

ref tikv#5895

Add more debugging info to time fallback log.
[2023/06/27 10:50:54.196 -07:00] [PANIC] [tso_dispatcher.go:764] ["[tso] timestamp fallback"] 
[dc-location=global] [keyspace=4294967295]
[last-ts="(1687888254152, 1)"] [cur-ts="(1687888254052, 2)"] 
[last-tso-server=127.0.0.1:3380] [cur-tso-server=127.0.0.1:3380]
[last-keyspace-group-in-request=0] [cur-keyspace-group-in-request=0] 
[last-keyspace-group-in-response=0] [cur-keyspace-group-in-response=0] 
[last-response-received-at=2023/06/27 10:50:54.195 -07:00]
[cur-response-received-at=2023/06/27 10:50:54.196 -07:00]

Signed-off-by: Bin Shi <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* keyspace: refine the split and merge details (tikv#6707)

close tikv#6686, close tikv#6698

- Adjust the merge operation order.
- Add some logs.
- Refine the code.

Signed-off-by: JmPotato <[email protected]>

---------

Signed-off-by: JmPotato <[email protected]>
Co-authored-by: Ryan Leung <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: lhy1024 <[email protected]>
Co-authored-by: JmPotato <[email protected]>
  • Loading branch information
5 people authored Jun 29, 2023
1 parent d1aaaf1 commit 3b9ca3f
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 144 deletions.
1 change: 1 addition & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
Expand Down
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type tsoClient struct {
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO
// dc-location -> *tsoInfo while the tsoInfo is the last TSO info
lastTSOInfoMap sync.Map // Same as map[string]*tsoInfo

checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
Expand Down
85 changes: 49 additions & 36 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ type tsoDispatcher struct {
tsoBatchController *tsoBatchController
}

type lastTSO struct {
keyspaceGroupID uint32
physical int64
logical int64
type tsoInfo struct {
tsoServer string
reqKeyspaceGroupID uint32
respKeyspaceGroupID uint32
respReceivedAt time.Time
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -709,62 +712,72 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID()
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID,
dcLocation, requests, tbc.batchStartTime)
if err != nil {
c.finishRequest(requests, 0, 0, 0, err)
return err
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
curTSOInfo := &tsoInfo{
tsoServer: stream.getServerAddr(),
reqKeyspaceGroupID: reqKeyspaceGroupID,
respKeyspaceGroupID: respKeyspaceGroupID,
respReceivedAt: time.Now(),
physical: physical,
logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits),
}
c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
dcLocation string,
curTSOInfo *tsoInfo,
physical, firstLogical int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
val, loaded := c.lastTSOInfoMap.LoadOrStore(dcLocation, curTSOInfo)
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
lastTSOInfo := val.(*tsoInfo)
if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt),
)
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
lastTSOInfo.tsoServer = curTSOInfo.tsoServer
lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID
lastTSOInfo.respKeyspaceGroupID = curTSOInfo.respKeyspaceGroupID
lastTSOInfo.respReceivedAt = curTSOInfo.respReceivedAt
lastTSOInfo.physical = curTSOInfo.physical
lastTSOInfo.logical = curTSOInfo.logical
}

func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) {
Expand Down
29 changes: 21 additions & 8 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type tsoStreamBuilderFactory interface {
type pdTSOStreamBuilderFactory struct{}

func (f *pdTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc)}
return &pdTSOStreamBuilder{client: pdpb.NewPDClient(cc), serverAddr: cc.Target()}
}

type tsoTSOStreamBuilderFactory struct{}

func (f *tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBuilder {
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc)}
return &tsoTSOStreamBuilder{client: tsopb.NewTSOClient(cc), serverAddr: cc.Target()}
}

// TSO Stream Builder
Expand All @@ -51,7 +51,8 @@ type tsoStreamBuilder interface {
}

type pdTSOStreamBuilder struct {
client pdpb.PDClient
serverAddr string
client pdpb.PDClient
}

func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) {
Expand All @@ -61,13 +62,14 @@ func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFun
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &pdTSOStream{stream: stream}, nil
return &pdTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}

type tsoTSOStreamBuilder struct {
client tsopb.TSOClient
serverAddr string
client tsopb.TSOClient
}

func (b *tsoTSOStreamBuilder) build(
Expand All @@ -79,7 +81,7 @@ func (b *tsoTSOStreamBuilder) build(
stream, err := b.client.Tso(ctx)
done <- struct{}{}
if err == nil {
return &tsoTSOStream{stream: stream}, nil
return &tsoTSOStream{stream: stream, serverAddr: b.serverAddr}, nil
}
return nil, err
}
Expand All @@ -98,6 +100,7 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha
// TSO Stream

type tsoStream interface {
getServerAddr() string
// processRequests processes TSO requests in streaming mode to get timestamps
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
Expand All @@ -106,7 +109,12 @@ type tsoStream interface {
}

type pdTSOStream struct {
stream pdpb.PD_TsoClient
serverAddr string
stream pdpb.PD_TsoClient
}

func (s *pdTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *pdTSOStream) processRequests(
Expand Down Expand Up @@ -155,7 +163,12 @@ func (s *pdTSOStream) processRequests(
}

type tsoTSOStream struct {
stream tsopb.TSO_TsoClient
serverAddr string
stream tsopb.TSO_TsoClient
}

func (s *tsoTSOStream) getServerAddr() string {
return s.serverAddr
}

func (s *tsoTSOStream) processRequests(
Expand Down
12 changes: 6 additions & 6 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,9 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key
// Changing the state of default keyspace is not allowed.
if name == utils.DefaultKeyspaceName {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace
}
var meta *keyspacepb.KeyspaceMeta
err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
Expand Down Expand Up @@ -563,9 +563,9 @@ func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.K
// Changing the state of default keyspace is not allowed.
if id == utils.DefaultKeyspaceID {
log.Warn("[keyspace] failed to update keyspace config",
zap.Error(errModifyDefault),
zap.Error(ErrModifyDefaultKeyspace),
)
return nil, errModifyDefault
return nil, ErrModifyDefaultKeyspace
}
var meta *keyspacepb.KeyspaceMeta
var err error
Expand Down Expand Up @@ -698,10 +698,10 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
return ErrKeyspaceGroupInSplit(utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
if err != nil {
Expand Down
Loading

0 comments on commit 3b9ca3f

Please sign in to comment.