diff --git a/cmd/vgmanager/vgmanager.go b/cmd/vgmanager/vgmanager.go index 2c287e526..97fb10894 100644 --- a/cmd/vgmanager/vgmanager.go +++ b/cmd/vgmanager/vgmanager.go @@ -39,6 +39,7 @@ import ( "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lsblk" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvm" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvmd" + "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/pvworker" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/wipefs" icsi "github.com/openshift/lvm-operator/v4/internal/csi" "github.com/spf13/cobra" @@ -192,6 +193,8 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error { } } + hostLVM := lvm.NewDefaultHostLVM() + if err = (&vgmanager.Reconciler{ Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(vgmanager.ControllerName), @@ -200,11 +203,12 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error { LSBLK: lsblk.NewDefaultHostLSBLK(), Wipefs: wipefs.NewDefaultHostWipefs(), Dmsetup: dmsetup.NewDefaultHostDmsetup(), - LVM: lvm.NewDefaultHostLVM(), + LVM: hostLVM, NodeName: nodeName, Namespace: operatorNamespace, Filters: filter.DefaultFilters, SymlinkResolveFn: filepath.EvalSymlinks, + ExtentMoveWorker: pvworker.NewAsyncExtentMover(hostLVM.MovePhysicalExtents), }).SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to create controller VGManager: %w", err) } diff --git a/internal/controllers/lvmcluster/controller.go b/internal/controllers/lvmcluster/controller.go index dbf7cda8a..28a17fda8 100644 --- a/internal/controllers/lvmcluster/controller.go +++ b/internal/controllers/lvmcluster/controller.go @@ -241,7 +241,9 @@ func (r *Reconciler) reconcile(ctx context.Context, instance *lvmv1alpha1.LVMClu resourceSyncElapsedTime := time.Since(resourceSyncStart) if len(errs) > 0 { err := fmt.Errorf("LVMCluster's resources are not yet fully synchronized: %w", errors.Join(errs...)) - r.WarningEvent(ctx, instance, EventReasonErrorResourceReconciliationIncomplete, err) + if logger.V(1).Enabled() { + r.WarningEvent(ctx, instance, EventReasonErrorResourceReconciliationIncomplete, err) + } setResourcesAvailableConditionFalse(instance, err) statusErr := r.updateLVMClusterStatus(ctx, instance) if statusErr != nil { diff --git a/internal/controllers/vgmanager/controller.go b/internal/controllers/vgmanager/controller.go index 424fc5f69..ef966204f 100644 --- a/internal/controllers/vgmanager/controller.go +++ b/internal/controllers/vgmanager/controller.go @@ -18,6 +18,7 @@ package vgmanager import ( "context" + "errors" "fmt" "slices" "strconv" @@ -33,6 +34,7 @@ import ( "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lsblk" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvm" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvmd" + "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/pvworker" "github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/wipefs" "k8s.io/utils/ptr" @@ -102,6 +104,7 @@ type Reconciler struct { Namespace string Filters filter.FilterSetup SymlinkResolveFn symlinkResolver.ResolveFn + ExtentMoveWorker pvworker.AsyncExtentMover } func (r *Reconciler) getFinalizer() string { @@ -821,7 +824,6 @@ func (r *Reconciler) ensurePVConsistency( } // first off move all extents from the reduced paths to the paths that are to be left in the VG - // TODO: This can take a (very) long time based on the data, change to make async logger.Info( "moving extents from reduction candidates to left pvs to ensure consistent data after path removal", "toReduce", @@ -829,24 +831,50 @@ func (r *Reconciler) ensurePVConsistency( "toLeaveInVG", toLeaveInVG, ) - r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationStarted, - fmt.Sprintf( - "moving extents from reduction candidates (%s) to left pvs (%s) "+ - "to ensure consistent data after path removal", - strings.Join(toReduce, ", "), - strings.Join(toLeaveInVG, ", "), - ), 0) - if err := r.LVM.MoveExtentsBetweenPVs(ctx, toReduce, toLeaveInVG); err != nil { - return fmt.Errorf("could not ensure that physical extents in vg %q were moved from reduction candidates (%s) to left pvs (%s)", - vg.Name, strings.Join(toReduce, ", "), strings.Join(toLeaveInVG, ", ")) - } - r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationFinished, - fmt.Sprintf( - "moved extents from reduction candidates (%s) to left pvs (%s) "+ - "to ensure consistent data after path removal", - strings.Join(toReduce, ", "), - strings.Join(toLeaveInVG, ", "), - ), 0) + + status := r.ExtentMoveWorker.Status() + + if status.Error() != nil { + if err := r.ExtentMoveWorker.Reset(); err != nil { + return fmt.Errorf("failed to reset pv move worker after error: %w", errors.Join(status.Error(), err)) + } + return fmt.Errorf("pv move worker is in error state, worker was reset: %w", status.Error()) + } + + if status.State() == pvworker.SyncStateSyncing { + currentFrom, currentTo := status.Operation() + return fmt.Errorf("waiting for pv move operation to finish for %q (moving extents from %s to %s)", vg.Name, + strings.Join(currentFrom, ", "), strings.Join(currentTo, ", ")) + } + + if status.State() == pvworker.SyncStateIdle { + r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationStarted, + fmt.Sprintf( + "moving extents from reduction candidates (%s) to left pvs (%s) "+ + "to ensure consistent data after path removal", + strings.Join(toReduce, ", "), + strings.Join(toLeaveInVG, ", "), + ), 0) + if err := r.ExtentMoveWorker.Sync(toReduce, toLeaveInVG); err != nil { + return fmt.Errorf("failed to start asynchronous pv move: %w", err) + } + return fmt.Errorf("pv move worker is now syncing, waiting for completion") + } + + if status.State() == pvworker.SyncStateDone { + r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationFinished, + fmt.Sprintf( + "moved extents from reduction candidates (%s) to left pvs (%s) "+ + "to ensure consistent data after path removal within %s", + strings.Join(toReduce, ", "), + strings.Join(toLeaveInVG, ", "), + r.ExtentMoveWorker.Status().SyncDuration(), + ), 0) + } + + if err := r.ExtentMoveWorker.Reset(); err != nil { + return fmt.Errorf("failed to reset pv move worker after successful move: %w", err) + } // now reduce the volume group if err := r.LVM.ReduceVG(ctx, lvmVG.Name, toReduce); err != nil { diff --git a/internal/controllers/vgmanager/lvm/lvm.go b/internal/controllers/vgmanager/lvm/lvm.go index e7a3e39c8..1ec760b95 100644 --- a/internal/controllers/vgmanager/lvm/lvm.go +++ b/internal/controllers/vgmanager/lvm/lvm.go @@ -120,7 +120,7 @@ type LVM interface { ExtendLV(ctx context.Context, lvName, vgName string, sizePercent int) error ActivateLV(ctx context.Context, lvName, vgName string) error DeleteLV(ctx context.Context, lvName, vgName string) error - MoveExtentsBetweenPVs(ctx context.Context, from []string, to []string) error + MovePhysicalExtents(ctx context.Context, from []string, to []string) error ReduceVG(ctx context.Context, name string, reduce []string) error } @@ -545,7 +545,7 @@ func (hlvm *HostLVM) ActivateLV(ctx context.Context, lvName, vgName string) erro return nil } -func (hlvm *HostLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, to []string) error { +func (hlvm *HostLVM) MovePhysicalExtents(ctx context.Context, from []string, to []string) error { if len(from) == 0 { return fmt.Errorf("failed to move extents between physical volumes: from list is empty") } @@ -566,6 +566,11 @@ func (hlvm *HostLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, t for _, from := range from { args := append([]string{from, "--atomic"}, to...) if err := hlvm.RunCommandAsHost(ctx, pvMoveCmd, args...); err != nil { + // if we already have the extents where they need to be, lvm + // will report an error, we skip them because for us that's still a success. + if strings.Contains(err.Error(), "No data to move") { + continue + } errs = errors.Join(errs, fmt.Errorf("failed to move extents between physical volumes: %w", err)) } } diff --git a/internal/controllers/vgmanager/lvm/mocks/mock_lvm.go b/internal/controllers/vgmanager/lvm/mocks/mock_lvm.go index 3c1fdbc71..611860bf3 100644 --- a/internal/controllers/vgmanager/lvm/mocks/mock_lvm.go +++ b/internal/controllers/vgmanager/lvm/mocks/mock_lvm.go @@ -768,11 +768,11 @@ func (_c *MockLVM_ListVGs_Call) RunAndReturn(run func(context.Context, bool) ([] } // MoveExtentsBetweenPVs provides a mock function with given fields: ctx, from, to -func (_m *MockLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, to []string) error { +func (_m *MockLVM) MovePhysicalExtents(ctx context.Context, from []string, to []string) error { ret := _m.Called(ctx, from, to) if len(ret) == 0 { - panic("no return value specified for MoveExtentsBetweenPVs") + panic("no return value specified for MovePhysicalExtents") } var r0 error @@ -785,7 +785,7 @@ func (_m *MockLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, to return r0 } -// MockLVM_MoveExtentsBetweenPVs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MoveExtentsBetweenPVs' +// MockLVM_MoveExtentsBetweenPVs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MovePhysicalExtents' type MockLVM_MoveExtentsBetweenPVs_Call struct { *mock.Call } @@ -795,7 +795,7 @@ type MockLVM_MoveExtentsBetweenPVs_Call struct { // - from []string // - to []string func (_e *MockLVM_Expecter) MoveExtentsBetweenPVs(ctx interface{}, from interface{}, to interface{}) *MockLVM_MoveExtentsBetweenPVs_Call { - return &MockLVM_MoveExtentsBetweenPVs_Call{Call: _e.mock.On("MoveExtentsBetweenPVs", ctx, from, to)} + return &MockLVM_MoveExtentsBetweenPVs_Call{Call: _e.mock.On("MovePhysicalExtents", ctx, from, to)} } func (_c *MockLVM_MoveExtentsBetweenPVs_Call) Run(run func(ctx context.Context, from []string, to []string)) *MockLVM_MoveExtentsBetweenPVs_Call { diff --git a/internal/controllers/vgmanager/pvworker/move_worker.go b/internal/controllers/vgmanager/pvworker/move_worker.go new file mode 100644 index 000000000..976b517b9 --- /dev/null +++ b/internal/controllers/vgmanager/pvworker/move_worker.go @@ -0,0 +1,131 @@ +package pvworker + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" +) + +var ErrWorkerAlreadySyncingPVs = errors.New("worker is already syncing PVs") + +type MoveExtentFn = func(ctx context.Context, from []string, to []string) error + +type AsyncExtentMover interface { + Sync(from, to []string) error + IsSyncing() bool + Status() *SyncStatus + Reset() error +} + +type SyncStatus struct { + state SyncState + err error + start time.Time + end time.Time + from, to []string +} + +func (s *SyncStatus) State() SyncState { + return s.state +} + +func (s *SyncStatus) Error() error { + return s.err +} + +func (s *SyncStatus) Operation() (from []string, to []string) { + return s.from, s.to +} + +// SyncDuration returns the duration of the sync operation since it started and until it finished. +func (s *SyncStatus) SyncDuration() time.Duration { + if s.end.IsZero() { + return time.Since(s.start) + } + return s.end.Sub(s.start) +} + +type SyncState string + +const ( + SyncStateSyncing SyncState = "syncing" + SyncStateError SyncState = "error" + SyncStateIdle SyncState = "idle" + SyncStateDone SyncState = "done" +) + +func NewAsyncExtentMover(move MoveExtentFn) AsyncExtentMover { + worker := &asyncExtentMover{ + syncStatus: atomic.Pointer[SyncStatus]{}, + pvMove: move, + } + worker.syncStatus.Store(&SyncStatus{state: SyncStateIdle}) + return worker +} + +type asyncExtentMover struct { + syncStatus atomic.Pointer[SyncStatus] + pvMove MoveExtentFn +} + +func (w *asyncExtentMover) Sync(from, to []string) error { + if w.IsSyncing() { + return ErrWorkerAlreadySyncingPVs + } + + if w.syncStatus.Load().state == SyncStateError { + return fmt.Errorf("cannot start new sync operation when previous one failed, "+ + "explicit reset is necessary: %w", w.syncStatus.Load().err) + } + + w.startSync(from, to) + go func(ctx context.Context, from, to []string) { + err := w.pvMove(ctx, from, to) + w.finishSync(err) + }(context.Background(), from, to) + + return nil +} + +func (w *asyncExtentMover) Status() *SyncStatus { + return w.syncStatus.Load() +} + +func (w *asyncExtentMover) IsSyncing() bool { + return w.syncStatus.Load().state == SyncStateSyncing +} + +func (w *asyncExtentMover) startSync(from, to []string) { + w.syncStatus.Store(&SyncStatus{ + state: SyncStateSyncing, + start: time.Now(), + from: from, + to: to, + }) +} + +func (w *asyncExtentMover) finishSync(err error) { + var state SyncState + if err != nil { + state = SyncStateError + } else { + state = SyncStateDone + } + status := w.syncStatus.Load() + status.err = err + status.state = state + status.end = time.Now() + w.syncStatus.Store(status) +} + +func (w *asyncExtentMover) Reset() error { + if w.IsSyncing() { + return fmt.Errorf("cannot reset running worker: %w", ErrWorkerAlreadySyncingPVs) + } + + w.syncStatus.Store(&SyncStatus{state: SyncStateIdle}) + + return nil +}