Skip to content

Commit

Permalink
Backport of vitessio#12194 (#43)
Browse files Browse the repository at this point in the history
This is a backport of vitessio#12194
  • Loading branch information
brirams authored Feb 3, 2023
1 parent 2409b02 commit 35afe6e
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 12 deletions.
11 changes: 8 additions & 3 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ func (tw *TopologyWatcher) loadTablets() {
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
if !(tw.tabletFilter == nil || tw.tabletFilter.IsIncluded(tablet.Tablet)) {
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
Expand All @@ -215,6 +212,10 @@ func (tw *TopologyWatcher) loadTablets() {
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
}

// trust the alias from topo and add it if it doesn't exist
if val, ok := tw.tablets[alias]; !ok {
tw.tabletRecorder.AddTablet(newVal.tablet)
Expand All @@ -233,6 +234,10 @@ func (tw *TopologyWatcher) loadTablets() {
}

for _, val := range tw.tablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) {
continue
}

if _, ok := newTablets[val.alias]; !ok {
tw.tabletRecorder.RemoveTablet(val.tablet)
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)
Expand Down
126 changes: 117 additions & 9 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ limitations under the License.
package discovery

import (
"context"
"math/rand"
"testing"
"time"

"context"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/logutil"
Expand All @@ -44,19 +45,14 @@ func checkOpCounts(t *testing.T, prevCounts, deltas map[string]int64) map[string
newVal = 0
}

if newVal != prevVal+delta {
t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal)
}
assert.Equal(t, newVal, prevVal+delta, "expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal)
}
return newCounts
}

func checkChecksum(t *testing.T, tw *TopologyWatcher, want uint32) {
t.Helper()
got := tw.TopoChecksum()
if want != got {
t.Errorf("want checksum %v got %v", want, got)
}
assert.Equal(t, want, tw.TopoChecksum())
}

func TestCellTabletsWatcher(t *testing.T) {
Expand Down Expand Up @@ -460,3 +456,115 @@ func TestFilterByKeyspace(t *testing.T) {
}
}
}

// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher
// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want
// to ensure that the TopologyWatcher:
// - does not continuosly call GetTablets for tablets that do not satisfy the filter
// - does not add or remove these filtered out tablets from the its healtcheck
func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {
ts := memorytopo.NewServer("aa")
fhc := NewFakeHealthCheck(nil)
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := NewFilterByKeyspace(testKeyspacesToWatch)
tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet from a tracked keyspace to the topology.
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "ks1",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check tablet is reported by HealthCheck
allTablets := fhc.GetAllTablets()
key := TabletToMapKey(tablet)
assert.Contains(t, allTablets, key)
assert.True(t, proto.Equal(tablet, allTablets[key]))

// Add a second tablet to the topology that should get filtered out by the keyspace filter
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "ks3",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet2))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1})
checkChecksum(t, tw, 2762153755)

// Check the new tablet is NOT reported by HealthCheck.
allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 1)
key = TabletToMapKey(tablet2)
assert.NotContains(t, allTablets, key)

// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
checkChecksum(t, tw, 2762153755)

// With refreshKnownTablets set to false, changes to the port map for the same tablet alias
// should not be reflected in the HealtCheck state
_, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["vt"] = 456
return nil
})
require.NoError(t, err)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
checkChecksum(t, tw, 2762153755)

allTablets = fhc.GetAllTablets()
assert.Len(t, allTablets, 1)
origKey := TabletToMapKey(tablet)
tabletWithNewPort := proto.Clone(tablet).(*topodatapb.Tablet)
tabletWithNewPort.PortMap["vt"] = 456
keyWithNewPort := TabletToMapKey(tabletWithNewPort)
assert.Contains(t, allTablets, origKey)
assert.NotContains(t, allTablets, keyWithNewPort)

// Remove the tracked tablet from the topo and check that it is detected as being gone.
require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias))

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1})
checkChecksum(t, tw, 789108290)
assert.Empty(t, fhc.GetAllTablets())

// Remove ignored tablet and check that we didn't try to remove it from the health check
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))

tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1})
checkChecksum(t, tw, 0)
assert.Empty(t, fhc.GetAllTablets())

tw.Stop()
}

0 comments on commit 35afe6e

Please sign in to comment.