Skip to content

Commit

Permalink
Remove the healthcheck based implementation in the buffering logic in…
Browse files Browse the repository at this point in the history
… vtgate. Associated flag has been deprecated, to be removed in the next version

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jul 20, 2023
1 parent 60b50b1 commit f9489c5
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 67 deletions.
11 changes: 0 additions & 11 deletions go/vt/vtgate/buffer/buffer_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ type failover func(buf *Buffer, tablet *topodatapb.Tablet, keyspace, shard strin
func testAllImplementations(t *testing.T, runTest func(t *testing.T, fail failover)) {
t.Helper()

t.Run("HealthCheck", func(t *testing.T) {
t.Helper()
runTest(t, func(buf *Buffer, tablet *topodatapb.Tablet, keyspace, shard string, now time.Time) {
buf.ProcessPrimaryHealth(&discovery.TabletHealth{
Tablet: tablet,
Target: &query.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_PRIMARY},
PrimaryTermStartTime: now.Unix(),
})
})
})

t.Run("KeyspaceEvent", func(t *testing.T) {
t.Helper()
runTest(t, func(buf *Buffer, tablet *topodatapb.Tablet, keyspace, shard string, now time.Time) {
Expand Down
64 changes: 18 additions & 46 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ var (
// CellsToWatch is the list of cells the healthcheck operates over. If it is empty, only the local cell is watched
CellsToWatch string

bufferImplementation = "keyspace_events"
// deprecated: remove bufferImplementation and the associated flag in Vitess 19
bufferImplementationDeprecated = "deprecated"

initialTabletTimeout = 30 * time.Second
// retryCount is the number of times a query will be retried on error
retryCount = 2
Expand All @@ -57,7 +59,7 @@ var (
func init() {
servenv.OnParseFor("vtgate", func(fs *pflag.FlagSet) {
fs.StringVar(&CellsToWatch, "cells_to_watch", "", "comma-separated list of cells for watching tablets")
fs.StringVar(&bufferImplementation, "buffer_implementation", "keyspace_events", "Allowed values: healthcheck (legacy implementation), keyspace_events (default)")
fs.StringVar(&bufferImplementationDeprecated, "buffer_implementation", "deprecated", "DEPRECATED: will be deleted in Vitess 19")
fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type")
fs.IntVar(&retryCount, "retry-count", 2, "retry count")
})
Expand Down Expand Up @@ -118,55 +120,25 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) {
cfg := buffer.NewConfigFromFlags()
gw.buffer = buffer.New(cfg)

switch bufferImplementation {
case "healthcheck":
// subscribe to healthcheck updates so that buffer can be notified if needed
// we run this in a separate goroutine so that normal processing doesn't need to block
hcChan := gw.hc.Subscribe()
bufferCtx, bufferCancel := context.WithCancel(ctx)

go func(ctx context.Context, c chan *discovery.TabletHealth, buffer *buffer.Buffer) {
defer bufferCancel()

for {
select {
case <-ctx.Done():
return
case result := <-hcChan:
if result == nil {
return
}
if result.Target.TabletType == topodatapb.TabletType_PRIMARY {
buffer.ProcessPrimaryHealth(result)
}
}
}
}(bufferCtx, hcChan, gw.buffer)

case "keyspace_events":
gw.kev = discovery.NewKeyspaceEventWatcher(ctx, gw.srvTopoServer, gw.hc, gw.localCell)
ksChan := gw.kev.Subscribe()
bufferCtx, bufferCancel := context.WithCancel(ctx)
gw.kev = discovery.NewKeyspaceEventWatcher(ctx, gw.srvTopoServer, gw.hc, gw.localCell)
ksChan := gw.kev.Subscribe()
bufferCtx, bufferCancel := context.WithCancel(ctx)

go func(ctx context.Context, c chan *discovery.KeyspaceEvent, buffer *buffer.Buffer) {
defer bufferCancel()
go func(ctx context.Context, c chan *discovery.KeyspaceEvent, buffer *buffer.Buffer) {
defer bufferCancel()

for {
select {
case <-ctx.Done():
for {
select {
case <-ctx.Done():
return
case result := <-ksChan:
if result == nil {
return
case result := <-ksChan:
if result == nil {
return
}
buffer.HandleKeyspaceEvent(result)
}
buffer.HandleKeyspaceEvent(result)
}
}(bufferCtx, ksChan, gw.buffer)

default:
log.Exitf("unknown buffering implementation for TabletGateway: %q", bufferImplementation)
}
}
}(bufferCtx, ksChan, gw.buffer)
}

// QueryServiceByAlias satisfies the Gateway interface
Expand Down
6 changes: 0 additions & 6 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ import (
// TestGatewayBufferingWhenPrimarySwitchesServingState is used to test that the buffering mechanism buffers the queries when a primary goes to a non serving state and
// stops buffering when the primary is healthy again
func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
bufferImplementation = "keyspace_events"
buffer.SetBufferingModeInTestingEnv(true)
defer func() {
buffer.SetBufferingModeInTestingEnv(false)
bufferImplementation = "healthcheck"
}()

keyspace := "ks1"
Expand Down Expand Up @@ -119,11 +117,9 @@ func TestGatewayBufferingWhenPrimarySwitchesServingState(t *testing.T) {
// TestGatewayBufferingWhileReparenting is used to test that the buffering mechanism buffers the queries when a PRS happens
// the healthchecks that happen during a PRS are simulated in this test
func TestGatewayBufferingWhileReparenting(t *testing.T) {
bufferImplementation = "keyspace_events"
buffer.SetBufferingModeInTestingEnv(true)
defer func() {
buffer.SetBufferingModeInTestingEnv(false)
bufferImplementation = "healthcheck"
}()

keyspace := "ks1"
Expand Down Expand Up @@ -249,11 +245,9 @@ outer:
// This is inconsistent and we want to fail properly. This scenario used to panic since no error and no results were
// returned.
func TestInconsistentStateDetectedBuffering(t *testing.T) {
bufferImplementation = "keyspace_events"
buffer.SetBufferingModeInTestingEnv(true)
defer func() {
buffer.SetBufferingModeInTestingEnv(false)
bufferImplementation = "healthcheck"
}()

keyspace := "ks1"
Expand Down
12 changes: 8 additions & 4 deletions go/vt/vtgate/tabletgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func TestTabletGatewayBeginExecute(t *testing.T) {

func TestTabletGatewayShuffleTablets(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "local")
ts := &fakeTopoServer{}
tg := NewTabletGateway(context.Background(), hc, ts, "local")

ts1 := &discovery.TabletHealth{
Tablet: topo.NewTablet(1, "cell1", "host1"),
Expand Down Expand Up @@ -154,7 +155,8 @@ func TestTabletGatewayReplicaTransactionError(t *testing.T) {
TabletType: tabletType,
}
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "cell")
ts := &fakeTopoServer{}
tg := NewTabletGateway(context.Background(), hc, ts, "cell")

_ = hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil)
_, err := tg.Execute(context.Background(), target, "query", nil, 1, 0, nil)
Expand All @@ -174,7 +176,8 @@ func testTabletGatewayGeneric(t *testing.T, f func(tg *TabletGateway, target *qu
TabletType: tabletType,
}
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "cell")
ts := &fakeTopoServer{}
tg := NewTabletGateway(context.Background(), hc, ts, "cell")

// no tablet
want := []string{"target: ks.0.replica", `no healthy tablet available for 'keyspace:"ks" shard:"0" tablet_type:REPLICA`}
Expand Down Expand Up @@ -241,7 +244,8 @@ func testTabletGatewayTransact(t *testing.T, f func(tg *TabletGateway, target *q
TabletType: tabletType,
}
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "cell")
ts := &fakeTopoServer{}
tg := NewTabletGateway(context.Background(), hc, ts, "cell")

// retry error - no retry
sc1 := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil)
Expand Down

0 comments on commit f9489c5

Please sign in to comment.