Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicas should be able to heal if replication is not initialised properly #10943

Merged
merged 2 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host strin
if stopReplicationBefore {
cmds = append(cmds, "STOP SLAVE")
}
cmds = append(cmds, "RESET SLAVE ALL")
cmds = append(cmds, "FAKE SET MASTER")
if startReplicationAfter {
cmds = append(cmds, "START SLAVE")
Expand Down
8 changes: 8 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
if replicationStopBefore {
cmds = append(cmds, conn.StopReplicationCommand())
}
// Reset replication parameters commands makes the instance forget the source host port
// This is required because sometimes MySQL gets stuck due to improper initialization of
// master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails.
// Therefore, we have elected to always reset the replication parameters whenever we try to set the source host port
// Since there is no real overhead, but it makes this function robust enough to also handle failures like these.
cmds = append(cmds, conn.ResetReplicationParametersCommands()...)
// If flag value is same as default, check deprecated flag value
if *replicationConnectRetry == 10*time.Second && *masterConnectRetry != *replicationConnectRetry {
*replicationConnectRetry = *masterConnectRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,25 @@ func TestInitShardPrimary(t *testing.T) {

tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// These come from InitShardPrimary
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
tablet2.FakeMysqlDaemon.SetReplicationSourceInputs = append(tablet2.FakeMysqlDaemon.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", tablet1.Tablet.Hostname, tablet1.Tablet.MysqlPort))

tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -127,6 +131,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -135,6 +140,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,14 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
if status.SourceHost != host || status.SourcePort != port {
// This handles both changing the address and starting replication.
// We want to reset the replication parameters and set replication source again when forceStartReplication is provided
// because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails. So when this RPC
// gets called from VTOrc or replication manager to fix the replication in these cases with forceStartReplication, we should also
// reset the replication parameters and set the source port information again.
if status.SourceHost != host || status.SourcePort != port || forceStartReplication {
// This handles reseting the replication parameters, changing the address and then starting the replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil {
if err := tm.handleRelayLogError(err); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ func TestCheckPrimaryShip(t *testing.T) {
fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon)
fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort))
fakeMysql.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
25 changes: 19 additions & 6 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -202,6 +203,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -238,12 +240,14 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -296,6 +300,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error {
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -423,7 +428,8 @@ func TestBackupRestoreLagged(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -434,6 +440,7 @@ func TestBackupRestoreLagged(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -491,12 +498,14 @@ func TestBackupRestoreLagged(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -642,7 +651,8 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -653,6 +663,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -682,12 +693,14 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/testlib/copy_schema_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func copySchema(t *testing.T, useShardAsSource bool) {
sourceRdonly := NewFakeTablet(t, wr, "cell1", 1,
topodatapb.TabletType_RDONLY, sourceRdonlyDb, TabletKeyspaceShard(t, "ks", "-80"))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
13 changes: 10 additions & 3 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica1.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -167,9 +169,11 @@ func TestEmergencyReparentShard(t *testing.T) {
goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions = append(goodReplica2.FakeMysqlDaemon.WaitPrimaryPositions, goodReplica2.FakeMysqlDaemon.CurrentSourceFilePosition)
goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet), topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
}
goodReplica2.StartActionLoop(t, wr)
Expand Down Expand Up @@ -232,6 +236,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
Expand Down Expand Up @@ -267,11 +272,13 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions = append(moreAdvancedReplica.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentSourceFilePosition)
newPrimary.FakeMysqlDaemon.WaitPrimaryPositions = append(newPrimary.FakeMysqlDaemon.WaitPrimaryPositions, moreAdvancedReplica.FakeMysqlDaemon.CurrentPrimaryPosition)
moreAdvancedReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE IO_THREAD",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
14 changes: 11 additions & 3 deletions go/vt/wrangler/testlib/external_reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestTabletExternallyReparentedBasic(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestTabletExternallyReparentedToReplica(t *testing.T) {
// primary is still good to go.
oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand Down Expand Up @@ -248,6 +250,7 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -260,7 +263,8 @@ func TestTabletExternallyReparentedWithDifferentMysqlPort(t *testing.T) {
// TabletActionReplicaWasRestarted and point to the new mysql port
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -335,6 +339,7 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -347,7 +352,8 @@ func TestTabletExternallyReparentedContinueOnUnexpectedPrimary(t *testing.T) {
// TabletActionReplicaWasRestarted and point to a bad host
goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs = append(goodReplica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(oldPrimary.Tablet))
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -418,6 +424,7 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {

oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs = append(oldPrimary.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(newPrimary.Tablet))
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START Replica",
}
Expand All @@ -430,7 +437,8 @@ func TestTabletExternallyReparentedRerun(t *testing.T) {
// On the good replica, we will respond to
// TabletActionReplicaWasRestarted.
goodReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/testlib/permissions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ func TestPermissions(t *testing.T) {
}
replica.FakeMysqlDaemon.SetReplicationSourceInputs = append(replica.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet))
replica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
Loading