Skip to content

Commit

Permalink
Better fix
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Sep 26, 2024
1 parent 4925139 commit 2df7055
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func (be *BuiltinBackupEngine) backupFiles(
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
Expand Down Expand Up @@ -779,7 +779,7 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger, restore bool) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool) {
messageStr := "restoring "
if !restore {
messageStr = "backing up "
Expand All @@ -788,6 +788,9 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %q", bp.filename)
return
case <-bp.done:
logger.Infof("Completed %s %q", messageStr, bp.filename)
return
Expand Down Expand Up @@ -830,7 +833,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger, false /*restore*/)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1029,43 +1032,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()
}
}(i)
}
Expand Down Expand Up @@ -1095,7 +1108,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger, true /*restore*/)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down

0 comments on commit 2df7055

Please sign in to comment.