Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to prevent stopping buffering prematurely #17013

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
62 changes: 62 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package newfeaturetest
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -177,3 +179,63 @@ func TestERSWithWriteInPromoteReplica(t *testing.T) {
_, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, "ERS should not fail even if there is a sidecardb change")
}

func TestBufferingWithMultipleDisruptions(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)

// Stop all VTOrc instances, so that they don't interfere with the test.
for _, vtorc := range clusterInstance.VTOrcProcesses {
err := vtorc.TearDown()
require.NoError(t, err)
}

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
shards := keyspace.Shards
for _, shard := range shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
require.NoError(t, err)
}

// We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
// to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
// fix this.
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0])
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0])

wg := sync.WaitGroup{}
rowCount := 10
vtParams := clusterInstance.GetVTParams(keyspace.Name)
// We now spawn writes for a bunch of go routines.
// The ones going to shard 1 and shard 2 should block, since
// they're in the midst of a reparenting operation (as seen by the buffering code).
for i := 1; i <= rowCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
conn, err := mysql.Connect(context.Background(), &vtParams)
if err != nil {
return
}
defer conn.Close()
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
require.NoError(t, err)
}(i)
}

// Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
// since the disruption on the two shards hasn't stopped.
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias)
require.NoError(t, err)
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
time.Sleep(1 * time.Second)
// Finally, we'll now make the 2 shards healthy again by running PRS.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
require.NoError(t, err)
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
require.NoError(t, err)
// Wait for all the writes to have succeeded.
wg.Wait()
}
47 changes: 47 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,53 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
}

// SetupShardedReparentCluster is used to setup a sharded cluster for testing
func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster {
clusterInstance := cluster.NewCluster(cell1, Hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
// Fast health checks help find corner cases.
"--health_check_interval", "1s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer",
// Long timeout in case failover is slow.
"--buffer_window", "10m",
"--buffer_max_failover_duration", "10m",
"--buffer_min_time_between_failovers", "20m",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
DurabilityPolicy: durability,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
require.NoError(t, err)

// Start Vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
return clusterInstance
}

// GetInsertQuery returns a built insert query to insert a row.
func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
}

// TeardownCluster is used to teardown the reparent cluster. When
// run in a CI environment -- which is considered true when the
// "CI" env variable is set to "true" -- the teardown also removes
Expand Down
15 changes: 15 additions & 0 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType
item.ts.Target.TabletType = tabletType
}

// SetPrimaryTimestamp sets the primary timestamp for the given tablet
func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) {
if fhc.ch == nil {
return
}
fhc.mu.Lock()
defer fhc.mu.Unlock()
key := TabletToMapKey(tablet)
item, isPresent := fhc.items[key]
if !isPresent {
return
}
item.ts.PrimaryTermStartTime = timestamp
}

// Unsubscribe is not implemented.
func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}
Expand Down
69 changes: 66 additions & 3 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
}

type shardState struct {
target *querypb.Target
serving bool
target *querypb.Target
serving bool
// waitForReparent is used to tell the keyspace event watcher
// that this shard should be marked serving only after a reparent
// operation has succeeded.
waitForReparent bool
externallyReparented int64
currentPrimary *topodatapb.TabletAlias
}
Expand Down Expand Up @@ -368,8 +372,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
// if the shard went from serving to not serving, or the other way around, the keyspace
// is undergoing an availability event
if sstate.serving != th.Serving {
sstate.serving = th.Serving
kss.consistent = false
switch {
case th.Serving && sstate.waitForReparent:
// While waiting for a reparent, if we receive a serving primary,
// we should check if the primary term start time is greater than the externally reparented time.
// We mark the shard serving only if it is. This is required so that we don't prematurely stop
// buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the
// same old primary tablet that has already been turned read-only.
if th.PrimaryTermStartTime > sstate.externallyReparented {
sstate.waitForReparent = false
sstate.serving = true
}
case th.Serving && !sstate.waitForReparent:
sstate.serving = true
case !th.Serving:
sstate.serving = false
}
}
if !th.Serving {
// Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait
// for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop
// buffering when we receive a serving healthcheck from the primary that is being demoted.
// However, if we receive a non-serving check, then we know that we won't receive any more serving
// healthchecks anymore until reparent finishes. Specifically, this helps us when PRS fails, but
// stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote
// the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop
// buffering for that case.
sstate.waitForReparent = false
}

// if the primary for this shard has been externally reparented, we're undergoing a failover,
Expand Down Expand Up @@ -784,3 +814,36 @@ func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context,
}
}
}

// MarkShardNotServing marks the given shard not serving.
// We use this when we start buffering for a given shard. This helps
// coordinate between the sharding logic and the keyspace event watcher.
// We take in a boolean as well to tell us whether this error is because
// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent.
// The return argument is whether the shard was found and marked not serving successfully or not.
func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool {
kss := kew.getKeyspaceStatus(ctx, keyspace)
if kss == nil {
// Only happens if the keyspace was deleted.
return false
}
kss.mu.Lock()
defer kss.mu.Unlock()
sstate := kss.shards[shard]
if sstate == nil {
// This only happens if the shard is deleted, or if
// the keyspace event watcher hasn't seen the shard at all.
return false
}
// Mark the keyspace inconsistent and the shard not serving.
kss.consistent = false
sstate.serving = false
if isReparentErr {
// If the error was triggered because a reparent operation has started.
// We mark the shard to wait for a reparent to finish before marking it serving.
// This is required to prevent premature stopping of buffering if we receive
// a serving healthcheck from a primary that is being demoted.
sstate.waitForReparent = true
}
return true
}
Loading
Loading