Skip to content

Commit

Permalink
CherryPick: Replicas should be able to heal if replication is not ini…
Browse files Browse the repository at this point in the history
…tialised properly vitessio#10943 (vitessio#935)

* Replicas should be able to heal if replication is not initialised properly (vitessio#10943)

* feat: add code to also reset replication parameters in setReplicationSourceLocked when required

Signed-off-by: Manan Gupta <[email protected]>

* test: fix tests to reflect the change

Signed-off-by: Manan Gupta <[email protected]>

* feat: fix vtworker tests

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Aug 10, 2022
1 parent 119883f commit 8bc83c4
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 51 deletions.
1 change: 1 addition & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (fmd *FakeMysqlDaemon) SetReplicationSource(ctx context.Context, host strin
if stopReplicationBefore {
cmds = append(cmds, "STOP SLAVE")
}
cmds = append(cmds, "RESET SLAVE ALL")
cmds = append(cmds, "FAKE SET MASTER")
if startReplicationAfter {
cmds = append(cmds, "START SLAVE")
Expand Down
8 changes: 8 additions & 0 deletions go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ func (mysqld *Mysqld) SetReplicationSource(ctx context.Context, host string, por
if replicationStopBefore {
cmds = append(cmds, conn.StopReplicationCommand())
}
// Reset replication parameters commands makes the instance forget the source host port
// This is required because sometimes MySQL gets stuck due to improper initialization of
// master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails.
// Therefore, we have elected to always reset the replication parameters whenever we try to set the source host port
// Since there is no real overhead, but it makes this function robust enough to also handle failures like these.
cmds = append(cmds, conn.ResetReplicationParametersCommands()...)
// If flag value is same as default, check deprecated flag value
if *replicationConnectRetry == 10*time.Second && *masterConnectRetry != *replicationConnectRetry {
*replicationConnectRetry = *masterConnectRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,25 @@ func TestInitShardPrimary(t *testing.T) {

tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// These come from InitShardPrimary
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
tablet2.FakeMysqlDaemon.SetReplicationSourceInputs = append(tablet2.FakeMysqlDaemon.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", tablet1.Tablet.Hostname, tablet1.Tablet.MysqlPort))

tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -127,6 +131,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -135,6 +140,7 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
tablet3.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"FAKE RESET ALL REPLICATION",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,14 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
}
host := parent.Tablet.MysqlHostname
port := int(parent.Tablet.MysqlPort)
if status.SourceHost != host || status.SourcePort != port {
// This handles both changing the address and starting replication.
// We want to reset the replication parameters and set replication source again when forceStartReplication is provided
// because sometimes MySQL gets stuck due to improper initialization of master info structure or related failures and throws errors like
// ERROR 1201 (HY000): Could not initialize master info structure; more error messages can be found in the MySQL error log
// These errors can only be resolved by resetting the replication parameters, otherwise START SLAVE fails. So when this RPC
// gets called from VTOrc or replication manager to fix the replication in these cases with forceStartReplication, we should also
// reset the replication parameters and set the source port information again.
if status.SourceHost != host || status.SourcePort != port || forceStartReplication {
// This handles reseting the replication parameters, changing the address and then starting the replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, wasReplicating, shouldbeReplicating); err != nil {
if err := tm.handleRelayLogError(err); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/tm_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func TestCheckPrimaryShip(t *testing.T) {
fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon)
fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort))
fakeMysql.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
18 changes: 12 additions & 6 deletions go/vt/worker/multi_split_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,39 +313,45 @@ func testMultiSplitDiff(t *testing.T, v3 bool) {

sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}

leftRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(leftRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(leftPrimary.Tablet))
leftRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
leftRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(leftRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(leftPrimary.Tablet))
leftRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}

rightRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(rightRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(rightPrimary.Tablet))
rightRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
rightRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(rightRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(rightPrimary.Tablet))
rightRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
9 changes: 6 additions & 3 deletions go/vt/worker/split_clone_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (tc *splitCloneTestCase) setUpWithConcurrency(v3 bool, concurrency, writeQu
}
sourceRdonly.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
Expand All @@ -237,7 +238,8 @@ func (tc *splitCloneTestCase) setUpWithConcurrency(v3 bool, concurrency, writeQu
rightRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(rightRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(rightPrimary.Tablet))
rightRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(rightRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(rightPrimary.Tablet))
leftReplica.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand All @@ -246,7 +248,8 @@ func (tc *splitCloneTestCase) setUpWithConcurrency(v3 bool, concurrency, writeQu
shqs := fakes.NewStreamHealthQueryService(destRdonly.Target())
shqs.AddDefaultHealthResponse()
destRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
12 changes: 8 additions & 4 deletions go/vt/worker/split_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,26 +294,30 @@ func testSplitDiff(t *testing.T, v3 bool, destinationTabletType topodatapb.Table

sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}

leftRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(leftRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(leftPrimary.Tablet))
leftRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
leftRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(leftRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(leftPrimary.Tablet))
leftRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
6 changes: 4 additions & 2 deletions go/vt/worker/vertical_split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ func TestVerticalSplitClone(t *testing.T) {
}
sourceRdonly.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
Expand Down Expand Up @@ -173,7 +174,8 @@ func TestVerticalSplitClone(t *testing.T) {

destRdonly.FakeMysqlDaemon.SetReplicationSourceInputs = append(destRdonly.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(destPrimary.Tablet))
destRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
12 changes: 8 additions & 4 deletions go/vt/worker/vertical_split_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,30 @@ func TestVerticalSplitDiff(t *testing.T) {

sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(sourceRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(sourcePrimary.Tablet))
sourceRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}

destRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs = append(destRdonly1.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(destPrimary.Tablet))
destRdonly1.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
destRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs = append(destRdonly2.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(destPrimary.Tablet))
destRdonly2.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
25 changes: 19 additions & 6 deletions go/vt/wrangler/testlib/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ func TestBackupRestore(t *testing.T) {
},
}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -150,6 +151,7 @@ func TestBackupRestore(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -186,12 +188,14 @@ func TestBackupRestore(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -243,6 +247,7 @@ func TestBackupRestore(t *testing.T) {
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -369,7 +374,8 @@ func TestBackupRestoreLagged(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -380,6 +386,7 @@ func TestBackupRestoreLagged(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -437,12 +444,14 @@ func TestBackupRestoreLagged(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -588,7 +597,8 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
}
sourceTablet.FakeMysqlDaemon.SetReplicationSourceInputs = []string{fmt.Sprintf("%s:%d", primary.Tablet.MysqlHostname, primary.Tablet.MysqlPort)}
sourceTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
// This first set of STOP and START commands come from
Expand All @@ -599,6 +609,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
// These commands come from SetReplicationSource RPC called
// to set the correct primary and semi-sync after Backup has concluded
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down Expand Up @@ -628,12 +639,14 @@ func TestRestoreUnreachablePrimary(t *testing.T) {
},
}
destTablet.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
"STOP SLAVE",
"RESET SLAVE ALL",
"FAKE SET SLAVE POSITION",
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/wrangler/testlib/copy_schema_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func copySchema(t *testing.T, useShardAsSource bool) {
sourceRdonly := NewFakeTablet(t, wr, "cell1", 1,
topodatapb.TabletType_RDONLY, sourceRdonlyDb, TabletKeyspaceShard(t, "ks", "-80"))
sourceRdonly.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
// These 2 statements come from tablet startup
// These 3 statements come from tablet startup
"RESET SLAVE ALL",
"FAKE SET MASTER",
"START SLAVE",
}
Expand Down
Loading

0 comments on commit 8bc83c4

Please sign in to comment.