diff --git a/go/ioutil/writer.go b/go/ioutil/writer.go index 4aac07ba501..759ae1b7ab6 100644 --- a/go/ioutil/writer.go +++ b/go/ioutil/writer.go @@ -22,6 +22,7 @@ wrappers around WriteCloser and Writer. package ioutil import ( + "bytes" "io" "time" ) @@ -87,3 +88,16 @@ func NewMeteredWriter(tw io.Writer, fns ...func(int, time.Duration)) MeteredWrit func (tw *meteredWriter) Write(p []byte) (int, error) { return tw.meter.measure(tw.Writer.Write, p) } + +// BytesBufferWriter implements io.WriteCloser using an in-memory buffer. +type BytesBufferWriter struct { + *bytes.Buffer +} + +func (m BytesBufferWriter) Close() error { + return nil +} + +func NewBytesBufferWriter() BytesBufferWriter { + return BytesBufferWriter{bytes.NewBuffer(nil)} +} diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index bc14823ddcd..ef62f0ccb85 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -185,6 +185,9 @@ type FakeMysqlDaemon struct { // SemiSyncReplicaEnabled represents the state of rpl_semi_sync_replica_enabled. SemiSyncReplicaEnabled bool + // GlobalReadLock is used to test if a lock has been acquired already or not + GlobalReadLock bool + // TimeoutHook is a func that can be called at the beginning of // any method to fake a timeout. // All a test needs to do is make it { return context.DeadlineExceeded }. @@ -772,10 +775,20 @@ func (fmd *FakeMysqlDaemon) HostMetrics(ctx context.Context, cnf *Mycnf) (*mysql // AcquireGlobalReadLock is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error { - return errors.New("not implemented") + if fmd.GlobalReadLock { + return errors.New("lock already acquired") + } + + fmd.GlobalReadLock = true + return nil } // ReleaseGlobalReadLock is part of the MysqlDaemon interface. func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error { - return errors.New("not implemented") + if fmd.GlobalReadLock { + fmd.GlobalReadLock = false + return nil + } + + return errors.New("no read locks acquired yet") } diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go index 0c41056b621..b7405ce7eaa 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -56,6 +56,8 @@ var ( // disable redo logging and double write buffer mysqlShellSpeedUpRestore = false + mysqlShellBackupBinaryName = "mysqlsh" + // use when checking if we need to create the directory on the local filesystem or not. knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"} @@ -107,8 +109,8 @@ type MySQLShellBackupEngine struct { } const ( - mysqlShellBackupBinaryName = "mysqlsh" mysqlShellBackupEngineName = "mysqlshell" + mysqlShellLockMessage = "Global read lock has been released" ) func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result BackupResult, finalErr error) { @@ -152,6 +154,11 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back } lockAcquired := time.Now() // we will report how long we hold the lock for + // we need to release the global read lock in case the backup fails to start and + // the lock wasn't released by releaseReadLock() yet. context might be expired, + // so we pass a new one. + defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(context.Background()) }() + posBeforeBackup, err := params.Mysqld.PrimaryPosition(ctx) if err != nil { return BackupUnusable, vterrors.Wrap(err, "failed to fetch position") @@ -184,6 +191,7 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back // Get exit status. if err := cmd.Wait(); err != nil { + pipeWriter.Close() // make sure we close the writer so the goroutines above will complete. return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") } @@ -503,7 +511,7 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, if !released { - if !strings.Contains(line, "Global read lock has been released") { + if !strings.Contains(line, mysqlShellLockMessage) { continue } released = true @@ -521,6 +529,10 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams, if err := scanner.Err(); err != nil { params.Logger.Errorf("error reading from reader: %v", err) } + + if !released { + params.Logger.Errorf("could not release global lock earlier") + } } func cleanupMySQL(ctx context.Context, params RestoreParams, shouldDeleteUsers bool) error { diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go index a0d0c8d7a1d..67f27b5382e 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine_test.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -18,13 +18,16 @@ package mysqlctl import ( "context" + "encoding/json" "fmt" + "os" "path" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/ioutil" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/logutil" @@ -307,3 +310,120 @@ func TestCleanupMySQL(t *testing.T) { } } + +// this is a helper to write files in a temporary directory +func generateTestFile(t *testing.T, name, contents string) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0700) + require.NoError(t, err) + defer f.Close() + _, err = f.WriteString(contents) + require.NoError(t, err) + require.NoError(t, f.Close()) +} + +// This tests if we are properly releasing the global read lock we acquire +// during ExecuteBackup(), even if the backup didn't succeed. +func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { + originalLocation := mysqlShellBackupLocation + originalBinary := mysqlShellBackupBinaryName + mysqlShellBackupLocation = "logical" + mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh") + + defer func() { // restore the original values. + mysqlShellBackupLocation = originalLocation + mysqlShellBackupBinaryName = originalBinary + }() + + logger := logutil.NewMemoryLogger() + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysql := NewFakeMysqlDaemon(fakedb) + defer mysql.Close() + + be := &MySQLShellBackupEngine{} + params := BackupParams{ + TabletAlias: "test", + Logger: logger, + Mysqld: mysql, + } + bs := FakeBackupStorage{ + StartBackupReturn: FakeBackupStorageStartBackupReturn{}, + } + + t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) { + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysql shell completing without any issues. + generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage)) + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // check the manifest is valid. + var manifest MySQLShellBackupManifest + err = json.Unmarshal(manifestBuffer.Bytes(), &manifest) + require.NoError(t, err) + + require.Equal(t, mysqlShellBackupEngineName, manifest.BackupMethod) + + // did we notice the lock was release and did we release it ours as well? + require.Contains(t, logger.String(), "global read lock released after", + "failed to release the global lock after mysqlsh") + }) + + t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates mysqlshell completing, but we don't see the message that is released its lock. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + // in this case the backup was successful, but even if we didn't see mysqlsh release its lock + // we make sure it is released at the end. + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.NoError(t, err) + require.False(t, mysql.GlobalReadLock) // lock must be released. + + // make sure we are at least logging the lock wasn't able to be released earlier. + require.Contains(t, logger.String(), "could not release global lock earlier", + "failed to log error message when unable to release lock during backup") + }) + + t.Run("lock released when backup fails", func(t *testing.T) { + mysql.GlobalReadLock = false // clear lock status. + logger.Clear() + manifestBuffer := ioutil.NewBytesBufferWriter() + bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ + Dir: t.TempDir(), + AddFileReturn: FakeBackupHandleAddFileReturn{WriteCloser: manifestBuffer}, + } + + // this simulates the backup process failing. + generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1") + + bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) + require.NoError(t, err) + + _, err = be.ExecuteBackup(context.Background(), params, bh) + require.ErrorContains(t, err, "mysqlshell failed") + require.False(t, mysql.GlobalReadLock) // lock must be released. + }) + +}