Skip to content

Commit

Permalink
feat: remove sending primary position from PRS and ERS into SetReplic…
Browse files Browse the repository at this point in the history
…ationSource RPC

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Sep 26, 2024
1 parent 66f0b6d commit 38983b7
Show file tree
Hide file tree
Showing 22 changed files with 643 additions and 740 deletions.
1,211 changes: 600 additions & 611 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

44 changes: 0 additions & 44 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ func (itmc *internalTabletManagerClient) UndoDemotePrimary(context.Context, *top
return fmt.Errorf("not implemented in vtcombo")
}

func (itmc *internalTabletManagerClient) SetReplicationSource(context.Context, *topodatapb.Tablet, *topodatapb.TabletAlias, int64, string, string, bool, bool, float64) error {
func (itmc *internalTabletManagerClient) SetReplicationSource(context.Context, *topodatapb.Tablet, *topodatapb.TabletAlias, int64, string, bool, bool, float64) error {
return fmt.Errorf("not implemented in vtcombo")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,7 @@ func (s *VtctldServer) ReparentTablet(ctx context.Context, req *vtctldatapb.Repa
return nil, err
}

if err = s.tmc.SetReplicationSource(ctx, tablet.Tablet, shard.PrimaryAlias, 0, "", "", false, reparentutil.IsReplicaSemiSync(durability, shardPrimary.Tablet, tablet.Tablet), 0); err != nil {
if err = s.tmc.SetReplicationSource(ctx, tablet.Tablet, shard.PrimaryAlias, 0, "", false, reparentutil.IsReplicaSemiSync(durability, shardPrimary.Tablet, tablet.Tablet), 0); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctldserver/testutil/test_tmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ func (fake *TabletManagerClient) RunHealthCheck(ctx context.Context, tablet *top
}

// SetReplicationSource is part of the tmclient.TabletManagerClient interface.
func (fake *TabletManagerClient) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition, primaryPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
func (fake *TabletManagerClient) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
if fake.SetReplicationSourceResults == nil {
return assert.AnError
}
Expand Down
28 changes: 12 additions & 16 deletions go/vt/vtctl/reparentutil/emergency_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
tabletMap map[string]*topo.TabletInfo
validCandidates map[string]replication.Position
intermediateSource *topodatapb.Tablet
latestPosition replication.Position
validCandidateTablets []*topodatapb.Tablet
validReplacementCandidates []*topodatapb.Tablet
betterCandidate *topodatapb.Tablet
Expand Down Expand Up @@ -231,7 +230,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
// Here we also check for split brain scenarios and check that the selected replica must be more advanced than all the other valid candidates.
// We fail in case there is a split brain detected.
// The validCandidateTablets list is sorted by the replication positions with ties broken by promotion rules.
intermediateSource, latestPosition, validCandidateTablets, err = erp.findMostAdvanced(validCandidates, tabletMap, opts)
intermediateSource, validCandidateTablets, err = erp.findMostAdvanced(validCandidates, tabletMap, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -267,7 +266,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
// we do not promote the tablet or change the shard record. We only change the replication for all the other tablets
// it also returns the list of the tablets that started replication successfully including itself part of the validCandidateTablets list.
// These are the candidates that we can use to find a replacement.
validReplacementCandidates, err = erp.promoteIntermediateSource(ctx, ev, intermediateSource, latestPosition, tabletMap, stoppedReplicationSnapshot.statusMap, validCandidateTablets, opts)
validReplacementCandidates, err = erp.promoteIntermediateSource(ctx, ev, intermediateSource, tabletMap, stoppedReplicationSnapshot.statusMap, validCandidateTablets, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -303,7 +302,7 @@ func (erp *EmergencyReparenter) reparentShardLocked(ctx context.Context, ev *eve
// Since the new primary tablet belongs to the validCandidateTablets list, we no longer need any additional constraint checks

// Final step is to promote our primary candidate
_, err = erp.reparentReplicas(ctx, ev, newPrimary, latestPosition, tabletMap, stoppedReplicationSnapshot.statusMap, opts, false /* intermediateReparent */)
_, err = erp.reparentReplicas(ctx, ev, newPrimary, tabletMap, stoppedReplicationSnapshot.statusMap, opts, false /* intermediateReparent */)
if err != nil {
return err
}
Expand Down Expand Up @@ -382,18 +381,18 @@ func (erp *EmergencyReparenter) findMostAdvanced(
validCandidates map[string]replication.Position,
tabletMap map[string]*topo.TabletInfo,
opts EmergencyReparentOptions,
) (*topodatapb.Tablet, replication.Position, []*topodatapb.Tablet, error) {
) (*topodatapb.Tablet, []*topodatapb.Tablet, error) {
erp.logger.Infof("started finding the intermediate source")
// convert the valid candidates into a list so that we can use it for sorting
validTablets, tabletPositions, err := getValidCandidatesAndPositionsAsList(validCandidates, tabletMap)
if err != nil {
return nil, replication.Position{}, nil, err
return nil, nil, err
}

// sort the tablets for finding the best intermediate source in ERS
err = sortTabletsForReparent(validTablets, tabletPositions, nil, opts.durability)
if err != nil {
return nil, replication.Position{}, nil, err
return nil, nil, err
}
for _, tablet := range validTablets {
erp.logger.Infof("finding intermediate source - sorted replica: %v", tablet.Alias)
Expand All @@ -407,7 +406,7 @@ func (erp *EmergencyReparenter) findMostAdvanced(
// superset of all the other valid positions. If that is not the case, then we have a split brain scenario, and we should cancel the ERS
for i, position := range tabletPositions {
if !winningPosition.AtLeast(position) {
return nil, replication.Position{}, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "split brain detected between servers - %v and %v", winningPrimaryTablet.Alias, validTablets[i].Alias)
return nil, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "split brain detected between servers - %v and %v", winningPrimaryTablet.Alias, validTablets[i].Alias)
}
}

Expand All @@ -417,20 +416,20 @@ func (erp *EmergencyReparenter) findMostAdvanced(
requestedPrimaryAlias := topoproto.TabletAliasString(opts.NewPrimaryAlias)
pos, ok := validCandidates[requestedPrimaryAlias]
if !ok {
return nil, replication.Position{}, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "requested primary elect %v has errant GTIDs", requestedPrimaryAlias)
return nil, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "requested primary elect %v has errant GTIDs", requestedPrimaryAlias)
}
// if the requested tablet is as advanced as the most advanced tablet, then we can just use it for promotion.
// otherwise, we should let it catchup to the most advanced tablet and not change the intermediate source
if pos.AtLeast(winningPosition) {
requestedPrimaryInfo, isFound := tabletMap[requestedPrimaryAlias]
if !isFound {
return nil, replication.Position{}, nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "candidate %v not found in the tablet map; this an impossible situation", requestedPrimaryAlias)
return nil, nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "candidate %v not found in the tablet map; this an impossible situation", requestedPrimaryAlias)
}
winningPrimaryTablet = requestedPrimaryInfo.Tablet
}
}

return winningPrimaryTablet, winningPosition, validTablets, nil
return winningPrimaryTablet, validTablets, nil
}

// promoteIntermediateSource reparents all the other tablets to start replicating from the intermediate source.
Expand All @@ -439,7 +438,6 @@ func (erp *EmergencyReparenter) promoteIntermediateSource(
ctx context.Context,
ev *events.Reparent,
source *topodatapb.Tablet,
latestPosition replication.Position,
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
validCandidateTablets []*topodatapb.Tablet,
Expand All @@ -454,7 +452,7 @@ func (erp *EmergencyReparenter) promoteIntermediateSource(

// we reparent all the other valid tablets to start replication from our new source
// we wait for all the replicas so that we can choose a better candidate from the ones that started replication later
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, latestPosition, validTabletMap, statusMap, opts, true /* intermediateReparent */)
reachableTablets, err := erp.reparentReplicas(ctx, ev, source, validTabletMap, statusMap, opts, true /* intermediateReparent */)
if err != nil {
return nil, err
}
Expand All @@ -480,7 +478,6 @@ func (erp *EmergencyReparenter) reparentReplicas(
ctx context.Context,
ev *events.Reparent,
newPrimaryTablet *topodatapb.Tablet,
latestPosition replication.Position,
tabletMap map[string]*topo.TabletInfo,
statusMap map[string]*replicationdatapb.StopReplicationStatus,
opts EmergencyReparentOptions,
Expand Down Expand Up @@ -543,7 +540,6 @@ func (erp *EmergencyReparenter) reparentReplicas(
return nil
}

latestPosStr := replication.EncodePosition(latestPosition)
handleReplica := func(alias string, ti *topo.TabletInfo) {
defer replWg.Done()
erp.logger.Infof("setting new primary on replica %v", alias)
Expand All @@ -561,7 +557,7 @@ func (erp *EmergencyReparenter) reparentReplicas(
forceStart = fs
}

err := erp.tmc.SetReplicationSource(replCtx, ti.Tablet, newPrimaryTablet.Alias, 0, "", latestPosStr, forceStart, IsReplicaSemiSync(opts.durability, newPrimaryTablet, ti.Tablet), 0)
err := erp.tmc.SetReplicationSource(replCtx, ti.Tablet, newPrimaryTablet.Alias, 0, "", forceStart, IsReplicaSemiSync(opts.durability, newPrimaryTablet, ti.Tablet), 0)
if err != nil {
err = vterrors.Wrapf(err, "tablet %v SetReplicationSource failed: %v", alias, err)
rec.RecordError(err)
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtctl/reparentutil/emergency_reparenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2328,7 +2328,7 @@ func TestEmergencyReparenter_promotionOfNewPrimary(t *testing.T) {
tt.emergencyReparentOps.durability = durability

erp := NewEmergencyReparenter(ts, tt.tmc, logger)
_, err := erp.reparentReplicas(ctx, ev, tabletInfo.Tablet, replication.Position{}, tt.tabletMap, tt.statusMap, tt.emergencyReparentOps, false)
_, err := erp.reparentReplicas(ctx, ev, tabletInfo.Tablet, tt.tabletMap, tt.statusMap, tt.emergencyReparentOps, false)
if tt.shouldErr {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.errShouldContain)
Expand Down Expand Up @@ -2976,7 +2976,7 @@ func TestEmergencyReparenter_findMostAdvanced(t *testing.T) {
erp := NewEmergencyReparenter(nil, nil, logutil.NewMemoryLogger())

test.emergencyReparentOps.durability = durability
winningTablet, _, _, err := erp.findMostAdvanced(test.validCandidates, test.tabletMap, test.emergencyReparentOps)
winningTablet, _, err := erp.findMostAdvanced(test.validCandidates, test.tabletMap, test.emergencyReparentOps)
if test.err != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), test.err)
Expand Down Expand Up @@ -3502,7 +3502,7 @@ func TestEmergencyReparenter_reparentReplicas(t *testing.T) {
tt.emergencyReparentOps.durability = durability

erp := NewEmergencyReparenter(ts, tt.tmc, logger)
_, err := erp.reparentReplicas(ctx, ev, tabletInfo.Tablet, replication.Position{}, tt.tabletMap, tt.statusMap, tt.emergencyReparentOps, false /* intermediateReparent */)
_, err := erp.reparentReplicas(ctx, ev, tabletInfo.Tablet, tt.tabletMap, tt.statusMap, tt.emergencyReparentOps, false /* intermediateReparent */)
if tt.shouldErr {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.errShouldContain)
Expand Down Expand Up @@ -4077,7 +4077,7 @@ func TestEmergencyReparenter_promoteIntermediateSource(t *testing.T) {
tt.emergencyReparentOps.durability = durability

erp := NewEmergencyReparenter(ts, tt.tmc, logger)
res, err := erp.promoteIntermediateSource(ctx, ev, tabletInfo.Tablet, replication.Position{}, tt.tabletMap, tt.statusMap, tt.validCandidateTablets, tt.emergencyReparentOps)
res, err := erp.promoteIntermediateSource(ctx, ev, tabletInfo.Tablet, tt.tabletMap, tt.statusMap, tt.validCandidateTablets, tt.emergencyReparentOps)
if tt.shouldErr {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.errShouldContain)
Expand Down Expand Up @@ -4368,7 +4368,7 @@ func TestParentContextCancelled(t *testing.T) {
time.Sleep(time.Second)
cancel()
}()
_, err = erp.reparentReplicas(ctx, ev, tabletMap[newPrimaryTabletAlias].Tablet, replication.Position{}, tabletMap, statusMap, emergencyReparentOps, true)
_, err = erp.reparentReplicas(ctx, ev, tabletMap[newPrimaryTabletAlias].Tablet, tabletMap, statusMap, emergencyReparentOps, true)
require.NoError(t, err)
}

Expand Down
8 changes: 2 additions & 6 deletions go/vt/vtctl/reparentutil/planned_reparenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (pr *PlannedReparenter) performGracefulPromotion(
setSourceCtx, setSourceCancel := context.WithTimeout(ctx, opts.WaitReplicasTimeout)
defer setSourceCancel()

if err := pr.tmc.SetReplicationSource(setSourceCtx, primaryElect, currentPrimary.Alias, 0, snapshotPos, snapshotPos, true, IsReplicaSemiSync(opts.durability, currentPrimary.Tablet, primaryElect), 0); err != nil {
if err := pr.tmc.SetReplicationSource(setSourceCtx, primaryElect, currentPrimary.Alias, 0, snapshotPos, true, IsReplicaSemiSync(opts.durability, currentPrimary.Tablet, primaryElect), 0); err != nil {
return vterrors.Wrapf(err, "replication on primary-elect %v did not catch up in time; replication must be healthy to perform PlannedReparent", primaryElectAliasStr)
}

Expand Down Expand Up @@ -660,10 +660,6 @@ func (pr *PlannedReparenter) reparentTablets(
primaryElectAliasStr := topoproto.TabletAliasString(ev.NewPrimary.Alias)
replicasWg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
latestPos, err := pr.tmc.PrimaryPosition(ctx, ev.NewPrimary)
if err != nil {
return vterrors.Wrapf(err, "failed PrimaryPosition(primary=%v): %v", primaryElectAliasStr, err)
}

// Point all replicas at the new primary and check that they receive the
// reparent journal entry, proving that they are replicating from the new
Expand All @@ -689,7 +685,7 @@ func (pr *PlannedReparenter) reparentTablets(
// that it needs to start replication after transitioning from
// PRIMARY => REPLICA.
forceStartReplication := false
if err := pr.tmc.SetReplicationSource(replCtx, tablet, ev.NewPrimary.Alias, reparentJournalTimestamp, "", latestPos, forceStartReplication, IsReplicaSemiSync(opts.durability, ev.NewPrimary, tablet), 0); err != nil {
if err := pr.tmc.SetReplicationSource(replCtx, tablet, ev.NewPrimary.Alias, reparentJournalTimestamp, "", forceStartReplication, IsReplicaSemiSync(opts.durability, ev.NewPrimary, tablet), 0); err != nil {
rec.RecordError(vterrors.Wrapf(err, "tablet %v failed to SetReplicationSource(%v): %v", alias, primaryElectAliasStr, err))
}
}(alias, tabletInfo.Tablet)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/reparentutil/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func SetReplicationSource(ctx context.Context, ts *topo.Server, tmc tmclient.Tab
}

isSemiSync := IsReplicaSemiSync(durability, shardPrimary.Tablet, tablet)
return tmc.SetReplicationSource(ctx, tablet, shardPrimary.Alias, 0, "", "", false, isSemiSync, 0)
return tmc.SetReplicationSource(ctx, tablet, shardPrimary.Alias, 0, "", false, isSemiSync, 0)
}

// replicationSnapshot stores the status maps and the tablets that were reachable
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func resetReplicationParameters(ctx context.Context, tablet *topodatapb.Tablet)
func setReplicationSource(ctx context.Context, replica *topodatapb.Tablet, primary *topodatapb.Tablet, semiSync bool, heartbeatInterval float64) error {
tmcCtx, tmcCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer tmcCancel()
return tmc.SetReplicationSource(tmcCtx, replica, primary.Alias, 0, "", "", true, semiSync, heartbeatInterval)
return tmc.SetReplicationSource(tmcCtx, replica, primary.Alias, 0, "", true, semiSync, heartbeatInterval)
}

// shardPrimary finds the primary of the given keyspace-shard by reading the vtorc backend
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/faketmclient/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (client *FakeTabletManagerClient) UndoDemotePrimary(ctx context.Context, ta
}

// SetReplicationSource is part of the tmclient.TabletManagerClient interface.
func (client *FakeTabletManagerClient) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition, primaryPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
func (client *FakeTabletManagerClient) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (client *Client) ResetReplicationParameters(ctx context.Context, tablet *to
}

// SetReplicationSource is part of the tmclient.TabletManagerClient interface.
func (client *Client) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition, primaryPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
func (client *Client) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool, semiSync bool, heartbeatInterval float64) error {
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return err
Expand All @@ -1141,7 +1141,6 @@ func (client *Client) SetReplicationSource(ctx context.Context, tablet *topodata
Parent: parent,
TimeCreatedNs: timeCreatedNS,
WaitPosition: waitPosition,
PrimaryPosition: primaryPosition,
ForceStartReplication: forceStartReplication,
SemiSync: semiSync,
HeartbeatInterval: heartbeatInterval,
Expand Down
Loading

0 comments on commit 38983b7

Please sign in to comment.