Skip to content

Commit

Permalink
feat: async migration
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Möller <[email protected]>
  • Loading branch information
jakobmoellerdev committed Aug 27, 2024
1 parent 0729895 commit cbf1fac
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 27 deletions.
6 changes: 5 additions & 1 deletion cmd/vgmanager/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lsblk"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvm"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvmd"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/pvworker"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/wipefs"
icsi "github.com/openshift/lvm-operator/v4/internal/csi"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -192,6 +193,8 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
}
}

hostLVM := lvm.NewDefaultHostLVM()

if err = (&vgmanager.Reconciler{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(vgmanager.ControllerName),
Expand All @@ -200,11 +203,12 @@ func run(cmd *cobra.Command, _ []string, opts *Options) error {
LSBLK: lsblk.NewDefaultHostLSBLK(),
Wipefs: wipefs.NewDefaultHostWipefs(),
Dmsetup: dmsetup.NewDefaultHostDmsetup(),
LVM: lvm.NewDefaultHostLVM(),
LVM: hostLVM,
NodeName: nodeName,
Namespace: operatorNamespace,
Filters: filter.DefaultFilters,
SymlinkResolveFn: filepath.EvalSymlinks,
ExtentMoveWorker: pvworker.NewAsyncExtentMover(hostLVM.MovePhysicalExtents),
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create controller VGManager: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/controllers/lvmcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ func (r *Reconciler) reconcile(ctx context.Context, instance *lvmv1alpha1.LVMClu
resourceSyncElapsedTime := time.Since(resourceSyncStart)
if len(errs) > 0 {
err := fmt.Errorf("LVMCluster's resources are not yet fully synchronized: %w", errors.Join(errs...))
r.WarningEvent(ctx, instance, EventReasonErrorResourceReconciliationIncomplete, err)
if logger.V(1).Enabled() {
r.WarningEvent(ctx, instance, EventReasonErrorResourceReconciliationIncomplete, err)
}
setResourcesAvailableConditionFalse(instance, err)
statusErr := r.updateLVMClusterStatus(ctx, instance)
if statusErr != nil {
Expand Down
66 changes: 47 additions & 19 deletions internal/controllers/vgmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vgmanager

import (
"context"
"errors"
"fmt"
"slices"
"strconv"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lsblk"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvm"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/lvmd"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/pvworker"
"github.com/openshift/lvm-operator/v4/internal/controllers/vgmanager/wipefs"
"k8s.io/utils/ptr"

Expand Down Expand Up @@ -102,6 +104,7 @@ type Reconciler struct {
Namespace string
Filters filter.FilterSetup
SymlinkResolveFn symlinkResolver.ResolveFn
ExtentMoveWorker pvworker.AsyncExtentMover
}

func (r *Reconciler) getFinalizer() string {
Expand Down Expand Up @@ -821,32 +824,57 @@ func (r *Reconciler) ensurePVConsistency(
}

// first off move all extents from the reduced paths to the paths that are to be left in the VG
// TODO: This can take a (very) long time based on the data, change to make async
logger.Info(
"moving extents from reduction candidates to left pvs to ensure consistent data after path removal",
"toReduce",
toReduce,
"toLeaveInVG",
toLeaveInVG,
)
r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationStarted,
fmt.Sprintf(
"moving extents from reduction candidates (%s) to left pvs (%s) "+
"to ensure consistent data after path removal",
strings.Join(toReduce, ", "),
strings.Join(toLeaveInVG, ", "),
), 0)
if err := r.LVM.MoveExtentsBetweenPVs(ctx, toReduce, toLeaveInVG); err != nil {
return fmt.Errorf("could not ensure that physical extents in vg %q were moved from reduction candidates (%s) to left pvs (%s)",
vg.Name, strings.Join(toReduce, ", "), strings.Join(toLeaveInVG, ", "))
}
r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationFinished,
fmt.Sprintf(
"moved extents from reduction candidates (%s) to left pvs (%s) "+
"to ensure consistent data after path removal",
strings.Join(toReduce, ", "),
strings.Join(toLeaveInVG, ", "),
), 0)

status := r.ExtentMoveWorker.Status()

if status.Error() != nil {
if err := r.ExtentMoveWorker.Reset(); err != nil {
return fmt.Errorf("failed to reset pv move worker after error: %w", errors.Join(status.Error(), err))
}
return fmt.Errorf("pv move worker is in error state, worker was reset: %w", status.Error())
}

if status.State() == pvworker.SyncStateSyncing {
currentFrom, currentTo := status.Operation()
return fmt.Errorf("waiting for pv move operation to finish for %q (moving extents from %s to %s)", vg.Name,
strings.Join(currentFrom, ", "), strings.Join(currentTo, ", "))
}

if status.State() == pvworker.SyncStateIdle {
r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationStarted,
fmt.Sprintf(
"moving extents from reduction candidates (%s) to left pvs (%s) "+
"to ensure consistent data after path removal",
strings.Join(toReduce, ", "),
strings.Join(toLeaveInVG, ", "),
), 0)
if err := r.ExtentMoveWorker.Sync(toReduce, toLeaveInVG); err != nil {
return fmt.Errorf("failed to start asynchronous pv move: %w", err)
}
return fmt.Errorf("pv move worker is now syncing, waiting for completion")
}

if status.State() == pvworker.SyncStateDone {
r.NormalEventWithMinimumVerbosity(ctx, vg, EventReasonExtentMigrationFinished,
fmt.Sprintf(
"moved extents from reduction candidates (%s) to left pvs (%s) "+
"to ensure consistent data after path removal within %s",
strings.Join(toReduce, ", "),
strings.Join(toLeaveInVG, ", "),
r.ExtentMoveWorker.Status().SyncDuration(),
), 0)
}

if err := r.ExtentMoveWorker.Reset(); err != nil {
return fmt.Errorf("failed to reset pv move worker after successful move: %w", err)
}

// now reduce the volume group
if err := r.LVM.ReduceVG(ctx, lvmVG.Name, toReduce); err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/controllers/vgmanager/lvm/lvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type LVM interface {
ExtendLV(ctx context.Context, lvName, vgName string, sizePercent int) error
ActivateLV(ctx context.Context, lvName, vgName string) error
DeleteLV(ctx context.Context, lvName, vgName string) error
MoveExtentsBetweenPVs(ctx context.Context, from []string, to []string) error
MovePhysicalExtents(ctx context.Context, from []string, to []string) error
ReduceVG(ctx context.Context, name string, reduce []string) error
}

Expand Down Expand Up @@ -545,7 +545,7 @@ func (hlvm *HostLVM) ActivateLV(ctx context.Context, lvName, vgName string) erro
return nil
}

func (hlvm *HostLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, to []string) error {
func (hlvm *HostLVM) MovePhysicalExtents(ctx context.Context, from []string, to []string) error {
if len(from) == 0 {
return fmt.Errorf("failed to move extents between physical volumes: from list is empty")
}
Expand All @@ -566,6 +566,11 @@ func (hlvm *HostLVM) MoveExtentsBetweenPVs(ctx context.Context, from []string, t
for _, from := range from {
args := append([]string{from, "--atomic"}, to...)
if err := hlvm.RunCommandAsHost(ctx, pvMoveCmd, args...); err != nil {
// if we already have the extents where they need to be, lvm
// will report an error, we skip them because for us that's still a success.
if strings.Contains(err.Error(), "No data to move") {
continue
}
errs = errors.Join(errs, fmt.Errorf("failed to move extents between physical volumes: %w", err))
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/controllers/vgmanager/lvm/mocks/mock_lvm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions internal/controllers/vgmanager/pvworker/move_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package pvworker

import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
)

var ErrWorkerAlreadySyncingPVs = errors.New("worker is already syncing PVs")

type MoveExtentFn = func(ctx context.Context, from []string, to []string) error

type AsyncExtentMover interface {
Sync(from, to []string) error
IsSyncing() bool
Status() *SyncStatus
Reset() error
}

type SyncStatus struct {
state SyncState
err error
start time.Time
end time.Time
from, to []string
}

func (s *SyncStatus) State() SyncState {
return s.state
}

func (s *SyncStatus) Error() error {
return s.err
}

func (s *SyncStatus) Operation() (from []string, to []string) {
return s.from, s.to
}

// SyncDuration returns the duration of the sync operation since it started and until it finished.
func (s *SyncStatus) SyncDuration() time.Duration {
if s.end.IsZero() {
return time.Since(s.start)
}
return s.end.Sub(s.start)
}

type SyncState string

const (
SyncStateSyncing SyncState = "syncing"
SyncStateError SyncState = "error"
SyncStateIdle SyncState = "idle"
SyncStateDone SyncState = "done"
)

func NewAsyncExtentMover(move MoveExtentFn) AsyncExtentMover {
worker := &asyncExtentMover{
syncStatus: atomic.Pointer[SyncStatus]{},
pvMove: move,
}
worker.syncStatus.Store(&SyncStatus{state: SyncStateIdle})
return worker
}

type asyncExtentMover struct {
syncStatus atomic.Pointer[SyncStatus]
pvMove MoveExtentFn
}

func (w *asyncExtentMover) Sync(from, to []string) error {
if w.IsSyncing() {
return ErrWorkerAlreadySyncingPVs
}

if w.syncStatus.Load().state == SyncStateError {
return fmt.Errorf("cannot start new sync operation when previous one failed, "+
"explicit reset is necessary: %w", w.syncStatus.Load().err)
}

w.startSync(from, to)
go func(ctx context.Context, from, to []string) {
err := w.pvMove(ctx, from, to)
w.finishSync(err)
}(context.Background(), from, to)

return nil
}

func (w *asyncExtentMover) Status() *SyncStatus {
return w.syncStatus.Load()
}

func (w *asyncExtentMover) IsSyncing() bool {
return w.syncStatus.Load().state == SyncStateSyncing
}

func (w *asyncExtentMover) startSync(from, to []string) {
w.syncStatus.Store(&SyncStatus{
state: SyncStateSyncing,
start: time.Now(),
from: from,
to: to,
})
}

func (w *asyncExtentMover) finishSync(err error) {
var state SyncState
if err != nil {
state = SyncStateError
} else {
state = SyncStateDone
}
status := w.syncStatus.Load()
status.err = err
status.state = state
status.end = time.Now()
w.syncStatus.Store(status)
}

func (w *asyncExtentMover) Reset() error {
if w.IsSyncing() {
return fmt.Errorf("cannot reset running worker: %w", ErrWorkerAlreadySyncingPVs)
}

w.syncStatus.Store(&SyncStatus{state: SyncStateIdle})

return nil
}

0 comments on commit cbf1fac

Please sign in to comment.