Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing backup_pitr flaky tests via wait-for loop on topo reads #13781

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtctlbackup

import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
XtraBackup = iota
BuiltinBackup
Mysqlctld
timeout = time.Duration(60 * time.Second)
timeout = time.Duration(60 * time.Second)
topoConsistencyTimeout = 20 * time.Second
)

var (
Expand Down Expand Up @@ -1077,12 +1079,13 @@ func terminateRestore(t *testing.T) {

func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) {
replica := getReplica(t, replicaIndex)
numBackups := len(waitForNumBackups(t, -1))

err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias)
require.Nil(t, err)

backups, err = localCluster.ListBackups(shardKsName)
require.Nil(t, err)
backups = waitForNumBackups(t, numBackups+1)
require.NotEmpty(t, backups)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())

Expand Down Expand Up @@ -1203,7 +1206,38 @@ func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.B
return readManifestFile(t, backupLocation)
}

// waitForNumBackups waits for GetBackups to list exactly the given expected number.
// If expectNumBackups < 0 then any response is considered valid
func waitForNumBackups(t *testing.T, expectNumBackups int) []string {
ctx, cancel := context.WithTimeout(context.Background(), topoConsistencyTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
if expectNumBackups < 0 {
// any result is valid
return backups
}
if len(backups) == expectNumBackups {
// what we waited for
return backups
}
assert.Less(t, len(backups), expectNumBackups)
select {
case <-ctx.Done():
assert.Failf(t, ctx.Err().Error(), "expected %d backups, got %d", expectNumBackups, len(backups))
return nil
case <-ticker.C:
}
}
}

func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
numBackups := len(waitForNumBackups(t, -1))
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
Expand All @@ -1216,8 +1250,9 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre
}
require.NoErrorf(t, err, "output: %v", output)

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
backups := waitForNumBackups(t, numBackups+1)
require.NotEmptyf(t, backups, "output: %v", output)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
Expand Down
24 changes: 20 additions & 4 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -170,10 +178,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if tc.writeBeforeBackup {
InsertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
// randomly flush binary logs 0, 1 or 2 times
FlushBinaryLogsOnReplica(t, 0, rand.Intn(3))
Expand Down Expand Up @@ -295,6 +303,14 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -333,10 +349,10 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
if tc.writeBeforeBackup {
insertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
waitForReplica(t, 0)
rowsBeforeBackup := ReadRowsFromReplica(t, 0)
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func Backup(ctx context.Context, params BackupParams) error {
if err != nil {
return vterrors.Wrap(err, "StartBackup failed")
}
params.Logger.Infof("Starting backup %v", bh.Name())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want the backup process to print out the name of the backup. It's so useful. I'd also look into formalizing it into something that is machine readable rather than human readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach Would this have potential to create (a lot) of logging noise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbussink it's a single additional line per backup. A full backup usually has hundreds of log lines. An incremental backup usually has a 5-6 log lines. One extra line is nothing.


// Scope stats to selected backup engine.
beParams := params.Copy()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
return false, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup)
}
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Wrapf(err, "empty binlog name in response. Request=%v, Response=%v", req, resp)
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a logical error. The previous vterrors.Wrapf used to return nil, because err was nil. This change ensures to return a non-nil error.

}
incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(logutil.ProtoToTime(resp.FirstTimestamp)),
Expand Down
Loading