Skip to content

Commit

Permalink
Fixed bugs in tso service watching loop.
Browse files Browse the repository at this point in the history
1. When re-watch a range, to continue from what left by the last watch, the revision is wresp.Header.Revision + 1 instead of wresp.Header.Revision, where wresp.Header.Revision is the revision indicated in the response of the last watch. Because of this bug, it was processing the same event endless as you can see from the log below.
2. In tso service watch loop in /Users/binshi/code/pingcap/my-pd/pkg/keyspace/tso_keyspace_group.go, If this is delete event, the json.Unmarshal(event.Kv.Value, s) will fail with the error "unexpected end of JSON input", so there is no way to get s.serviceAddr from the result of json.Unmarshal.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Apr 20, 2023
1 parent 5f99e0c commit 9f1dc94
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
25 changes: 20 additions & 5 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type GroupManager struct {
// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
// serviceRegistryMap stores the mapping from the service registry key to the service address.
serviceRegistryMap map[string]string
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
Expand Down Expand Up @@ -131,6 +133,7 @@ func (m *GroupManager) Bootstrap() error {
// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}
Expand Down Expand Up @@ -169,6 +172,7 @@ func (m *GroupManager) startWatchLoop() {
continue
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr
}
break
}
Expand Down Expand Up @@ -219,17 +223,28 @@ func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (i
return revision, wresp.Err()
}
for _, event := range wresp.Events {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
}
switch event.Type {
case clientv3.EventTypePut:
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err))
break
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr
case clientv3.EventTypeDelete:
m.nodesBalancer.Delete(s.ServiceAddr)
key := string(event.Kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
} else {
log.Warn("can't retrieve service addr from service registry map",
zap.String("event-kv-key", key))
}
}
}
revision = wresp.Header.Revision + 1
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keyspace-group-id", groupID),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
break
}
kgm.updateKeyspaceGroup(group)
case clientv3.EventTypeDelete:
Expand All @@ -499,7 +500,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
}
}
}
revision = wresp.Header.Revision
revision = wresp.Header.Revision + 1
}

select {
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string
s.servicePrimaryMap.Delete(serviceName)
}
}
revision = wresp.Header.Revision
revision = wresp.Header.Revision + 1
}
}
}
Expand Down

0 comments on commit 9f1dc94

Please sign in to comment.