diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5f4c78f6afa..cb1bbc1b4dd 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -323,7 +323,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), subscribers: make(map[chan *TabletHealth]struct{}), cellAliases: make(map[string]string), - loadTabletsTrigger: make(chan struct{}), + loadTabletsTrigger: make(chan struct{}, 1), } var topoWatchers []*TopologyWatcher var filter TabletFilter @@ -516,7 +516,13 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ if prevTarget.TabletType == topodata.TabletType_PRIMARY { if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 { log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) - hc.loadTabletsTrigger <- struct{}{} + // We want to trigger a loadTablets call, but if the channel is not empty + // then a trigger is already scheduled, we don't need to trigger another one. + // This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994. + select { + case hc.loadTabletsTrigger <- struct{}{}: + default: + } } } } diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 3ac567acef8..0397b1d624b 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/test/utils" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -630,3 +631,178 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { tw.Stop() } +<<<<<<< HEAD +======= + +func TestNewFilterByTabletTags(t *testing.T) { + // no required tags == true + filter := NewFilterByTabletTags(nil) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{})) + + tags := map[string]string{ + "instance_type": "i3.xlarge", + "some_key": "some_value", + } + filter = NewFilterByTabletTags(tags) + + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: nil, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{}, + })) + assert.False(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: map[string]string{ + "instance_type": "i3.xlarge", + }, + })) + assert.True(t, filter.IsIncluded(&topodatapb.Tablet{ + Tags: tags, + })) +} + +func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + ts, factory := memorytopo.NewServerAndFactory(ctx, "aa") + defer ts.Close() + fhc := NewFakeHealthCheck(nil) + defer fhc.Close() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5) + defer tw.Stop() + + // Force fallback to getting tablets individually. + factory.AddOperationError(memorytopo.List, ".*", topo.NewError(topo.NoImplementation, "List not supported")) + + counts = checkOpCounts(t, counts, map[string]int64{}) + checkChecksum(t, tw, 0) + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 0, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) + + // Check the tablet is returned by GetAllTablets(). + allTablets := fhc.GetAllTablets() + key1 := TabletToMapKey(tablet1) + assert.Len(t, allTablets, 1) + assert.Contains(t, allTablets, key1) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + + // Add a second tablet to the topology. + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "keyspace", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(ctx, tablet2), "CreateTablet failed for %v", tablet2.Alias) + + // Cause the Get for the first tablet to fail. + factory.AddOperationError(memorytopo.Get, "tablets/aa-0000000000/Tablet", errors.New("fake error")) + + // Ensure that a topo Get error results in a partial results error. If not, the rest of this test is invalid. + _, err := ts.GetTabletsByCell(ctx, "aa", &topo.GetTabletsByCellOptions{}) + require.ErrorContains(t, err, "partial result") + + // Now force the error during loadTablets. + tw.loadTablets() + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1}) + checkChecksum(t, tw, 2762153755) + + // Ensure the first tablet is still returned by GetAllTablets() and the second tablet has been added. + allTablets = fhc.GetAllTablets() + key2 := TabletToMapKey(tablet2) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, key1) + assert.Contains(t, allTablets, key2) + assert.True(t, proto.Equal(tablet1, allTablets[key1])) + assert.True(t, proto.Equal(tablet2, allTablets[key2])) +} + +// TestDeadlockBetweenTopologyWatcherAndHealthCheck tests the possibility of a deadlock +// between the topology watcher and the health check. +// The issue https://github.com/vitessio/vitess/issues/16994 has more details on the deadlock. +func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // create a new memory topo server and an health check instance. + ts, _ := memorytopo.NewServerAndFactory(ctx, "zone-1") + hc := NewHealthCheck(ctx, time.Hour, time.Hour, ts, "zone-1", "", nil) + defer hc.Close() + defer hc.topoWatchers[0].Stop() + + // Add a tablet to the topology. + tablet1 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone-1", + Uid: 100, + }, + Type: topodatapb.TabletType_REPLICA, + Hostname: "host1", + PortMap: map[string]int32{ + "grpc": 123, + }, + Keyspace: "keyspace", + Shard: "shard", + } + err := ts.CreateTablet(ctx, tablet1) + // Run the first loadTablets call to ensure the tablet is present in the topology watcher. + hc.topoWatchers[0].loadTablets() + require.NoError(t, err) + + // We want to run updateHealth with arguments that always + // make it trigger load Tablets. + th := &TabletHealth{ + Tablet: tablet1, + Target: &querypb.Target{ + Keyspace: "keyspace", + Shard: "shard", + TabletType: topodatapb.TabletType_REPLICA, + }, + } + prevTarget := &querypb.Target{ + Keyspace: "keyspace", + Shard: "shard", + TabletType: topodatapb.TabletType_PRIMARY, + } + + // If we run the updateHealth function often enough, then we + // will see the deadlock where the topology watcher is trying to replace + // the tablet in the health check, but health check has the mutex acquired + // already because it is calling updateHealth. + // updateHealth itself will be stuck trying to send on the shared channel. + for i := 0; i < 10; i++ { + // Update the port of the tablet so that when update Health asks topo watcher to + // refresh the tablets, it finds an update and tries to replace it. + _, err = ts.UpdateTabletFields(ctx, tablet1.Alias, func(t *topodatapb.Tablet) error { + t.PortMap["testing_port"] = int32(i + 1) + return nil + }) + require.NoError(t, err) + hc.updateHealth(th, prevTarget, false, false) + } +} +>>>>>>> c1603c0c66 (Fix deadlock between health check and topology watcher (#16995))