Skip to content

Commit

Permalink
Cherry-pick c1603c0 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Oct 18, 2024
1 parent 1ae91e3 commit f5a8c10
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
10 changes: 8 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}),
loadTabletsTrigger: make(chan struct{}, 1),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
Expand Down Expand Up @@ -516,7 +516,13 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
hc.loadTabletsTrigger <- struct{}{}
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
select {
case hc.loadTabletsTrigger <- struct{}{}:
default:
}
}
}
}
Expand Down
176 changes: 176 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/test/utils"
querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -630,3 +631,178 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) {

tw.Stop()
}
<<<<<<< HEAD
=======

func TestNewFilterByTabletTags(t *testing.T) {
// no required tags == true
filter := NewFilterByTabletTags(nil)
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{}))

tags := map[string]string{
"instance_type": "i3.xlarge",
"some_key": "some_value",
}
filter = NewFilterByTabletTags(tags)

assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: nil,
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{},
}))
assert.False(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: map[string]string{
"instance_type": "i3.xlarge",
},
}))
assert.True(t, filter.IsIncluded(&topodatapb.Tablet{
Tags: tags,
}))
}

func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

ts, factory := memorytopo.NewServerAndFactory(ctx, "aa")
defer ts.Close()
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
defer tw.Stop()

// Force fallback to getting tablets individually.
factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported"))

counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key1 := TabletToMapKey(tablet1)
assert.Len(t, allTablets, 1)
assert.Contains(t, allTablets, key1)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))

// Add a second tablet to the topology.
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "aa",
Uid: 2,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 789,
},
Keyspace: "keyspace",
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias)

// Cause the Get for the first tablet to fail.
factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error"))

// Ensure that a topo Get error results in a partial results error. If not, the rest of this test is invalid.
_, err := ts.GetTabletsByCell(ctx, "aa", &topo.GetTabletsByCellOptions{})
require.ErrorContains(t, err, "partial result")

// Now force the error during loadTablets.
tw.loadTablets()
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

// Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added.
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
assert.Len(t, allTablets, 2)
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
assert.True(t, proto.Equal(tablet1, allTablets[key1]))
assert.True(t, proto.Equal(tablet2, allTablets[key2]))
}

// TestDeadlockBetweenTopologyWatcherAndHealthCheck tests the possibility of a deadlock
// between the topology watcher and the health check.
// The issue https://github.com/vitessio/vitess/issues/16994 has more details on the deadlock.
func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// create a new memory topo server and an health check instance.
ts, _ := memorytopo.NewServerAndFactory(ctx, "zone-1")
hc := NewHealthCheck(ctx, time.Hour, time.Hour, ts, "zone-1", "", nil)
defer hc.Close()
defer hc.topoWatchers[0].Stop()

// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone-1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Hostname: "host1",
PortMap: map[string]int32{
"grpc": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
err := ts.CreateTablet(ctx, tablet1)
// Run the first loadTablets call to ensure the tablet is present in the topology watcher.
hc.topoWatchers[0].loadTablets()
require.NoError(t, err)

// We want to run updateHealth with arguments that always
// make it trigger load Tablets.
th := &TabletHealth{
Tablet: tablet1,
Target: &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_REPLICA,
},
}
prevTarget := &querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
TabletType: topodatapb.TabletType_PRIMARY,
}

// If we run the updateHealth function often enough, then we
// will see the deadlock where the topology watcher is trying to replace
// the tablet in the health check, but health check has the mutex acquired
// already because it is calling updateHealth.
// updateHealth itself will be stuck trying to send on the shared channel.
for i := 0; i < 10; i++ {
// Update the port of the tablet so that when update Health asks topo watcher to
// refresh the tablets, it finds an update and tries to replace it.
_, err = ts.UpdateTabletFields(ctx, tablet1.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["testing_port"] = int32(i + 1)
return nil
})
require.NoError(t, err)
hc.updateHealth(th, prevTarget, false, false)
}
}
>>>>>>> c1603c0c66 (Fix deadlock between health check and topology watcher (#16995))

0 comments on commit f5a8c10

Please sign in to comment.