Skip to content

Commit

Permalink
[latest-17.0](vitessio#2855): CherryPick(vitessio#13623): Improvement…
Browse files Browse the repository at this point in the history
…s to PRS (vitessio#2862)

* backport of 2855

* feat: fix e2e tests

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix e2e tests again

Signed-off-by: Manan Gupta <[email protected]>

---------

Signed-off-by: Manan Gupta <[email protected]>
Co-authored-by: Manan Gupta <[email protected]>
  • Loading branch information
planetscale-actions-bot and GuptaManan100 authored Aug 2, 2023
1 parent e003277 commit 7acb5b0
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 29 deletions.
22 changes: 19 additions & 3 deletions examples/backups/restart_tablets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ for i in 300 301 302; do
done
sleep 5

# Wait for all the replica tablets to be in the serving state before initiating
# InitShardPrimary. This is essential, since we want the RESTORE phase to be
# complete before we start InitShardPrimary, otherwise we end up reading the
# Wait for all the tablets to be in the serving state before initiating
# PlannedReparentShard. This is essential, since we want the RESTORE phase to be
# complete before we start PlannedReparentShard, otherwise we end up reading the
# tablet type to RESTORE and do not set semi-sync, which leads to the primary
# hanging on writes.
totalTime=600
Expand All @@ -50,13 +50,29 @@ for i in 101 201 301; do
done
done

for i in 102 202 302; do
while [ $totalTime -gt 0 ]; do
status=$(curl "http://$hostname:15$i/debug/status_details")
echo "$status" | grep "RDONLY: Serving" && break
totalTime=$((totalTime-1))
sleep 0.1
done
done

# Check that all the replica tablets have reached REPLICA: Serving state
for i in 101 201 301; do
status=$(curl "http://$hostname:15$i/debug/status_details")
echo "$status" | grep "REPLICA: Serving" && continue
echo "tablet-$i did not reach REPLICA: Serving state. Exiting due to failure."
exit 1
done
# Check that all the rdonly tablets have reached RDONLY: Serving state
for i in 102 202 302; do
status=$(curl "http://$hostname:15$i/debug/status_details")
echo "$status" | grep "RDONLY: Serving" && continue
echo "tablet-$i did not reach RDONLY: Serving state. Exiting due to failure."
exit 1
done

vtctldclient PlannedReparentShard commerce/0 --new-primary "zone1-100"
vtctldclient PlannedReparentShard customer/-80 --new-primary "zone1-200"
Expand Down
16 changes: 8 additions & 8 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func TestReparentReplicaOffline(t *testing.T) {
// Perform a graceful reparent operation.
out, err := utils.PrsWithTimeout(t, clusterInstance, tablets[1], false, "", "31s")
require.Error(t, err)
assert.True(t, utils.SetReplicationSourceFailed(tablets[3], out))

utils.CheckPrimaryTablet(t, clusterInstance, tablets[1])
// Assert that PRS failed
assert.Contains(t, out, "rpc error: code = DeadlineExceeded desc")
utils.CheckPrimaryTablet(t, clusterInstance, tablets[0])
}

func TestReparentAvoid(t *testing.T) {
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestReparentAvoid(t *testing.T) {
utils.StopTablet(t, tablets[0], true)
out, err := utils.PrsAvoid(t, clusterInstance, tablets[1])
require.Error(t, err)
assert.Contains(t, out, "cannot find a tablet to reparent to in the same cell as the current primary")
assert.Contains(t, out, "rpc error: code = DeadlineExceeded desc = latest balancer error")
utils.ValidateTopology(t, clusterInstance, false)
utils.CheckPrimaryTablet(t, clusterInstance, tablets[1])
}
Expand Down Expand Up @@ -275,17 +276,16 @@ func TestReparentWithDownReplica(t *testing.T) {
// Perform a graceful reparent operation. It will fail as one tablet is down.
out, err := utils.Prs(t, clusterInstance, tablets[1])
require.Error(t, err)
assert.True(t, utils.SetReplicationSourceFailed(tablets[2], out))

// insert data into the new primary, check the connected replica work
insertVal := utils.ConfirmReplication(t, tablets[1], []*cluster.Vttablet{tablets[0], tablets[3]})
assert.Contains(t, out, fmt.Sprintf("TabletManager.PrimaryStatus on %s error", tablets[2].Alias))
// insert data into the old primary, check the connected replica works. The primary tablet shouldn't have changed.
insertVal := utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[3]})

// restart mysql on the old replica, should still be connecting to the old primary
tablets[2].MysqlctlProcess.InitMysql = false
err = tablets[2].MysqlctlProcess.Start()
require.NoError(t, err)

// Use the same PlannedReparentShard command to fix up the tablet.
// Use the same PlannedReparentShard command to promote the new primary.
_, err = utils.Prs(t, clusterInstance, tablets[1])
require.NoError(t, err)

Expand Down
30 changes: 30 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,21 @@ func TestPlannedReparentShardSlow(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
PrimaryPositionResults: map[string]struct {
Position string
Error error
Expand Down Expand Up @@ -505,6 +520,21 @@ func TestPlannedReparentShardSlow(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
PrimaryPositionResults: map[string]struct {
Position string
Error error
Expand Down
15 changes: 15 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6253,6 +6253,21 @@ func TestPlannedReparentShard(t *testing.T) {
Error: nil,
},
},
// This is only needed to verify reachability, so empty results are fine.
PrimaryStatusResults: map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}{
"zone1-0000000200": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000101": {
Status: &replicationdatapb.PrimaryStatus{},
},
"zone1-0000000100": {
Status: &replicationdatapb.PrimaryStatus{},
},
},
PrimaryPositionResults: map[string]struct {
Position string
Error error
Expand Down
31 changes: 31 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ type TabletManagerClient struct {
Position *replicationdatapb.Status
Error error
}
PrimaryStatusDelays map[string]time.Duration
PrimaryStatusResults map[string]struct {
Status *replicationdatapb.PrimaryStatus
Error error
}
RestoreFromBackupResults map[string]struct {
Events []*logutilpb.Event
EventInterval time.Duration
Expand Down Expand Up @@ -874,6 +879,32 @@ func (fake *TabletManagerClient) ReplicationStatus(ctx context.Context, tablet *
return nil, assert.AnError
}

// PrimaryStatus is part of the tmclient.TabletManagerClient interface.
func (fake *TabletManagerClient) PrimaryStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.PrimaryStatus, error) {
if fake.PrimaryStatusResults == nil {
return nil, assert.AnError
}

key := topoproto.TabletAliasString(tablet.Alias)

if fake.PrimaryStatusDelays != nil {
if delay, ok := fake.PrimaryStatusDelays[key]; ok {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
// proceed to results
}
}
}

if result, ok := fake.PrimaryStatusResults[key]; ok {
return result.Status, result.Error
}

return nil, assert.AnError
}

type backupRestoreStreamAdapter struct {
*grpcshim.BidiStream
ch chan *logutilpb.Event
Expand Down
27 changes: 25 additions & 2 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/event"
Expand Down Expand Up @@ -518,6 +519,11 @@ func (pr *PlannedReparenter) reparentShardLocked(
return err
}

err = pr.verifyAllTabletsReachable(ctx, tabletMap)
if err != nil {
return err
}

// Check invariants that PlannedReparentShard depends on.
if isNoop, err := pr.preflightChecks(ctx, ev, keyspace, shard, tabletMap, &opts); err != nil {
return err
Expand Down Expand Up @@ -572,12 +578,12 @@ func (pr *PlannedReparenter) reparentShardLocked(
// inserted in the new primary's journal, so we can use it below to check
// that all the replicas have attached to new primary successfully.
switch {
case currentPrimary == nil && ev.ShardInfo.PrimaryAlias == nil:
case currentPrimary == nil && ev.ShardInfo.PrimaryTermStartTime == nil:
// Case (1): no primary has been elected ever. Initialize
// the primary-elect tablet
reparentJournalPos, err = pr.performInitialPromotion(ctx, ev.NewPrimary, opts)
needsRefresh = true
case currentPrimary == nil && ev.ShardInfo.PrimaryAlias != nil:
case currentPrimary == nil && ev.ShardInfo.PrimaryTermStartTime != nil:
// Case (2): no clear current primary. Try to find a safe promotion
// candidate, and promote to it.
err = pr.performPotentialPromotion(ctx, keyspace, shard, ev.NewPrimary, tabletMap, opts)
Expand Down Expand Up @@ -713,3 +719,20 @@ func (pr *PlannedReparenter) reparentTablets(

return nil
}

// verifyAllTabletsReachable verifies that all the tablets are reachable when running PRS.
func (pr *PlannedReparenter) verifyAllTabletsReachable(ctx context.Context, tabletMap map[string]*topo.TabletInfo) error {
// Create a cancellable context for the entire set of RPCs to verify reachability.
verifyCtx, verifyCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer verifyCancel()

errorGroup, groupCtx := errgroup.WithContext(verifyCtx)
for _, info := range tabletMap {
tablet := info.Tablet
errorGroup.Go(func() error {
_, err := pr.tmc.PrimaryStatus(groupCtx, tablet)
return err
})
}
return errorGroup.Wait()
}
Loading

0 comments on commit 7acb5b0

Please sign in to comment.