Skip to content

Commit

Permalink
Merge pull request #1073 from stgraber/main
Browse files Browse the repository at this point in the history
Improve lifecycle events on evacuation
  • Loading branch information
hallyn committed Aug 5, 2024
2 parents 6b7104e + 58a0f1a commit 51413bd
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 46 deletions.
20 changes: 15 additions & 5 deletions cmd/incusd/api_cluster_evacuation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dbCluster "github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/db/operationtype"
"github.com/lxc/incus/v6/internal/server/instance"
"github.com/lxc/incus/v6/internal/server/lifecycle"
"github.com/lxc/incus/v6/internal/server/operations"
"github.com/lxc/incus/v6/internal/server/response"
"github.com/lxc/incus/v6/internal/server/scriptlet"
Expand Down Expand Up @@ -147,6 +148,11 @@ func evacuateClusterMember(ctx context.Context, s *state.State, gateway *cluster
networkShutdown(s)

reverter.Success()

if mode != "heal" {
s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberEvacuated.Event(name, op.Requestor(), nil))
}

return nil
}

Expand Down Expand Up @@ -458,6 +464,9 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
}

reverter.Success()

s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberRestored.Event(originName, op.Requestor(), nil))

return nil
}

Expand Down Expand Up @@ -675,6 +684,8 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) {
}

func healClusterMember(d *Daemon, op *operations.Operation, name string) error {
s := d.State()

logger.Info("Starting cluster healing", logger.Ctx{"server": name})
defer logger.Info("Completed cluster healing", logger.Ctx{"server": name})

Expand Down Expand Up @@ -744,15 +755,14 @@ func healClusterMember(d *Daemon, op *operations.Operation, name string) error {
// Attempt up to 5 evacuations.
var err error
for i := 0; i < 5; i++ {
err = evacuateClusterMember(context.Background(), d.State(), d.gateway, op, name, "heal", nil, migrateFunc)
err = evacuateClusterMember(context.Background(), s, d.gateway, op, name, "heal", nil, migrateFunc)
if err == nil {
s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberHealed.Event(name, op.Requestor(), nil))

return nil
}
}

if err != nil {
logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err})
}

logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err})
return err
}
2 changes: 1 addition & 1 deletion internal/server/instance/drivers/driver_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (d *common) onStopOperationSetup(target string) (*operationlock.InstanceOpe
// If there is another ongoing operation that isn't in our inheritable list, wait until that has finished
// before proceeding to run the hook.
op := operationlock.Get(d.Project().Name, d.Name())
if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore) {
if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore, operationlock.ActionMigrate) {
d.logger.Debug("Waiting for existing operation lock to finish before running hook", logger.Ctx{"action": op.Action()})
_ = op.Wait(context.Background())
op = nil
Expand Down
49 changes: 40 additions & 9 deletions internal/server/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2881,7 +2881,7 @@ func (d *lxc) Stop(stateful bool) error {
}

// Setup a new operation
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true)
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true)
if err != nil {
if errors.Is(err, operationlock.ErrNonReusuableSucceeded) {
// An existing matching operation has now succeeded, return.
Expand Down Expand Up @@ -5503,34 +5503,46 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Setup a new operation.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true)
if err != nil {
return err
}

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

filesystemConn, err := args.FilesystemConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}

var stateConn io.ReadWriteCloser
if args.Live {
stateConn, err = args.StateConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}
}

pool, err := storagePools.LoadByInstance(d.state, d)
if err != nil {
return fmt.Errorf("Failed loading instance: %w", err)
err := fmt.Errorf("Failed loading instance: %w", err)
op.Done(err)
return err
}

// The refresh argument passed to MigrationTypes() is always set to false here.
// The migration source/sender doesn't need to care whether or not it's doing a refresh as the migration
// sink/receiver will know this, and adjust the migration types accordingly.
poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots)
if len(poolMigrationTypes) == 0 {
return fmt.Errorf("No source migration types available")
err := fmt.Errorf("No source migration types available")
op.Done(err)
return err
}

// Convert the pool's migration type options to an offer header to target.
Expand Down Expand Up @@ -5560,7 +5572,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
// Add idmap info to source header for containers.
idmapset, err := d.DiskIdmap()
if err != nil {
return fmt.Errorf("Failed getting container disk idmap: %w", err)
err := fmt.Errorf("Failed getting container disk idmap: %w", err)
op.Done(err)
return err
} else if idmapset != nil {
offerHeader.Idmap = make([]*migration.IDMapType, 0, len(idmapset.Entries))
for _, ctnIdmap := range idmapset.Entries {
Expand All @@ -5578,7 +5592,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {

srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op)
if err != nil {
return fmt.Errorf("Failed generating instance migration config: %w", err)
err := fmt.Errorf("Failed generating instance migration config: %w", err)
op.Done(err)
return err
}

// If we are copying snapshots, retrieve a list of snapshots from source volume.
Expand All @@ -5596,23 +5612,29 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Sending migration offer to target")
err = args.ControlSend(offerHeader)
if err != nil {
return fmt.Errorf("Failed sending migration offer: %w", err)
err := fmt.Errorf("Failed sending migration offer: %w", err)
op.Done(err)
return err
}

// Receive response from target.
d.logger.Debug("Waiting for migration offer response from target")
respHeader := &migration.MigrationHeader{}
err = args.ControlReceive(respHeader)
if err != nil {
return fmt.Errorf("Failed receiving migration offer response: %w", err)
err := fmt.Errorf("Failed receiving migration offer response: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Got migration offer response from target")

// Negotiated migration types.
migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes)
if err != nil {
return fmt.Errorf("Failed to negotiate migration type: %w", err)
err := fmt.Errorf("Failed to negotiate migration type: %w", err)
op.Done(err)
return err
}

volSourceArgs := &localMigration.VolumeSourceArgs{
Expand Down Expand Up @@ -5945,7 +5967,16 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
}
}

return err
if err != nil {
op.Done(err)
return err
}

op.Done(nil)

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil))

return nil
}
}

Expand Down
45 changes: 36 additions & 9 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (d *qemu) onStop(target string) error {
// Log and emit lifecycle if not user triggered.
if op.GetInstanceInitiated() {
d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceShutdown.Event(d, nil))
} else {
} else if op.Action() != operationlock.ActionMigrate {
d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceStopped.Event(d, nil))
}

Expand Down Expand Up @@ -4749,7 +4749,7 @@ func (d *qemu) Stop(stateful bool) error {
// Don't allow reuse when creating a new stop operation. This prevents other operations from intefering.
// Allow reuse of a reusable ongoing stop operation as Shutdown() may be called first, which allows reuse
// of its operations. This allow for Stop() to inherit from Shutdown() where instance is stuck.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true)
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true)
if err != nil {
if errors.Is(err, operationlock.ErrNonReusuableSucceeded) {
// An existing matching operation has now succeeded, return.
Expand Down Expand Up @@ -6429,18 +6429,27 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
return fmt.Errorf("Live migration requires migration.stateful to be set to true")
}

// Setup a new operation.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true)
if err != nil {
return err
}

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

filesystemConn, err := args.FilesystemConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}

pool, err := storagePools.LoadByInstance(d.state, d)
if err != nil {
return fmt.Errorf("Failed loading instance: %w", err)
err := fmt.Errorf("Failed loading instance: %w", err)
op.Done(err)
return err
}

// The refresh argument passed to MigrationTypes() is always set
Expand All @@ -6449,7 +6458,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
// this, and adjust the migration types accordingly.
poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots)
if len(poolMigrationTypes) == 0 {
return fmt.Errorf("No source migration types available")
err := fmt.Errorf("No source migration types available")
op.Done(err)
return err
}

// Convert the pool's migration type options to an offer header to target.
Expand All @@ -6463,15 +6474,19 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
// For VMs, send block device size hint in offer header so that target can create the volume the same size.
blockSize, err := storagePools.InstanceDiskBlockSize(pool, d, d.op)
if err != nil {
return fmt.Errorf("Failed getting source disk size: %w", err)
err := fmt.Errorf("Failed getting source disk size: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Set migration offer volume size", logger.Ctx{"blockSize": blockSize})
offerHeader.VolumeSize = &blockSize

srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op)
if err != nil {
return fmt.Errorf("Failed generating instance migration config: %w", err)
err := fmt.Errorf("Failed generating instance migration config: %w", err)
op.Done(err)
return err
}

// If we are copying snapshots, retrieve a list of snapshots from source volume.
Expand All @@ -6497,23 +6512,29 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Sending migration offer to target")
err = args.ControlSend(offerHeader)
if err != nil {
return fmt.Errorf("Failed sending migration offer header: %w", err)
err := fmt.Errorf("Failed sending migration offer header: %w", err)
op.Done(err)
return err
}

// Receive response from target.
d.logger.Debug("Waiting for migration offer response from target")
respHeader := &migration.MigrationHeader{}
err = args.ControlReceive(respHeader)
if err != nil {
return fmt.Errorf("Failed receiving migration offer response: %w", err)
err := fmt.Errorf("Failed receiving migration offer response: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Got migration offer response from target")

// Negotiated migration types.
migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes)
if err != nil {
return fmt.Errorf("Failed to negotiate migration type: %w", err)
err := fmt.Errorf("Failed to negotiate migration type: %w", err)
op.Done(err)
return err
}

volSourceArgs := &localMigration.VolumeSourceArgs{
Expand Down Expand Up @@ -6549,6 +6570,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
if args.Live && respHeader.Criu != nil && *respHeader.Criu == migration.CRIUType_VM_QEMU {
stateConn, err = args.StateConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}
}
Expand Down Expand Up @@ -6643,9 +6665,14 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
{
err := g.Wait()
if err != nil {
op.Done(err)
return err
}

op.Done(nil)

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil))

return nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/server/instance/operationlock/operationlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const ActionUpdate Action = "update"
// ActionDelete for deleting an instance.
const ActionDelete Action = "delete"

// ActionMigrate for migrating an instance.
const ActionMigrate Action = "migrate"

// ErrNonReusuableSucceeded is returned when no operation is created due to having to wait for a matching
// non-reusuable operation that has now completed successfully.
var ErrNonReusuableSucceeded error = fmt.Errorf("A matching non-reusable operation has now succeeded")
Expand Down
11 changes: 7 additions & 4 deletions internal/server/lifecycle/cluster_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ type ClusterMemberAction string

// All supported lifecycle events for cluster members.
const (
ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded)
ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved)
ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated)
ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed)
ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded)
ClusterMemberEvacuated = ClusterMemberAction(api.EventLifecycleClusterMemberEvacuated)
ClusterMemberHealed = ClusterMemberAction(api.EventLifecycleClusterMemberHealed)
ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved)
ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed)
ClusterMemberRestored = ClusterMemberAction(api.EventLifecycleClusterMemberRestored)
ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated)
)

// Event creates the lifecycle event for an action on a cluster member.
Expand Down
Loading

0 comments on commit 51413bd

Please sign in to comment.