Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lifecycle events on evacuation #1073

Merged
merged 12 commits into from
Aug 5, 2024
Merged
20 changes: 15 additions & 5 deletions cmd/incusd/api_cluster_evacuation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dbCluster "github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/db/operationtype"
"github.com/lxc/incus/v6/internal/server/instance"
"github.com/lxc/incus/v6/internal/server/lifecycle"
"github.com/lxc/incus/v6/internal/server/operations"
"github.com/lxc/incus/v6/internal/server/response"
"github.com/lxc/incus/v6/internal/server/scriptlet"
Expand Down Expand Up @@ -147,6 +148,11 @@ func evacuateClusterMember(ctx context.Context, s *state.State, gateway *cluster
networkShutdown(s)

reverter.Success()

if mode != "heal" {
s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberEvacuated.Event(name, op.Requestor(), nil))
}

return nil
}

Expand Down Expand Up @@ -458,6 +464,9 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response {
}

reverter.Success()

s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberRestored.Event(originName, op.Requestor(), nil))

return nil
}

Expand Down Expand Up @@ -675,6 +684,8 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) {
}

func healClusterMember(d *Daemon, op *operations.Operation, name string) error {
s := d.State()

logger.Info("Starting cluster healing", logger.Ctx{"server": name})
defer logger.Info("Completed cluster healing", logger.Ctx{"server": name})

Expand Down Expand Up @@ -744,15 +755,14 @@ func healClusterMember(d *Daemon, op *operations.Operation, name string) error {
// Attempt up to 5 evacuations.
var err error
for i := 0; i < 5; i++ {
err = evacuateClusterMember(context.Background(), d.State(), d.gateway, op, name, "heal", nil, migrateFunc)
err = evacuateClusterMember(context.Background(), s, d.gateway, op, name, "heal", nil, migrateFunc)
if err == nil {
s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberHealed.Event(name, op.Requestor(), nil))

return nil
}
}

if err != nil {
logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err})
}

logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err})
return err
}
2 changes: 1 addition & 1 deletion internal/server/instance/drivers/driver_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (d *common) onStopOperationSetup(target string) (*operationlock.InstanceOpe
// If there is another ongoing operation that isn't in our inheritable list, wait until that has finished
// before proceeding to run the hook.
op := operationlock.Get(d.Project().Name, d.Name())
if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore) {
if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore, operationlock.ActionMigrate) {
d.logger.Debug("Waiting for existing operation lock to finish before running hook", logger.Ctx{"action": op.Action()})
_ = op.Wait(context.Background())
op = nil
Expand Down
49 changes: 40 additions & 9 deletions internal/server/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2881,7 +2881,7 @@ func (d *lxc) Stop(stateful bool) error {
}

// Setup a new operation
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true)
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true)
if err != nil {
if errors.Is(err, operationlock.ErrNonReusuableSucceeded) {
// An existing matching operation has now succeeded, return.
Expand Down Expand Up @@ -5503,34 +5503,46 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Setup a new operation.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true)
if err != nil {
return err
}

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

filesystemConn, err := args.FilesystemConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}

var stateConn io.ReadWriteCloser
if args.Live {
stateConn, err = args.StateConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}
}

pool, err := storagePools.LoadByInstance(d.state, d)
if err != nil {
return fmt.Errorf("Failed loading instance: %w", err)
err := fmt.Errorf("Failed loading instance: %w", err)
op.Done(err)
return err
}

// The refresh argument passed to MigrationTypes() is always set to false here.
// The migration source/sender doesn't need to care whether or not it's doing a refresh as the migration
// sink/receiver will know this, and adjust the migration types accordingly.
poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots)
if len(poolMigrationTypes) == 0 {
return fmt.Errorf("No source migration types available")
err := fmt.Errorf("No source migration types available")
op.Done(err)
return err
}

// Convert the pool's migration type options to an offer header to target.
Expand Down Expand Up @@ -5560,7 +5572,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
// Add idmap info to source header for containers.
idmapset, err := d.DiskIdmap()
if err != nil {
return fmt.Errorf("Failed getting container disk idmap: %w", err)
err := fmt.Errorf("Failed getting container disk idmap: %w", err)
op.Done(err)
return err
} else if idmapset != nil {
offerHeader.Idmap = make([]*migration.IDMapType, 0, len(idmapset.Entries))
for _, ctnIdmap := range idmapset.Entries {
Expand All @@ -5578,7 +5592,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {

srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op)
if err != nil {
return fmt.Errorf("Failed generating instance migration config: %w", err)
err := fmt.Errorf("Failed generating instance migration config: %w", err)
op.Done(err)
return err
}

// If we are copying snapshots, retrieve a list of snapshots from source volume.
Expand All @@ -5596,23 +5612,29 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Sending migration offer to target")
err = args.ControlSend(offerHeader)
if err != nil {
return fmt.Errorf("Failed sending migration offer: %w", err)
err := fmt.Errorf("Failed sending migration offer: %w", err)
op.Done(err)
return err
}

// Receive response from target.
d.logger.Debug("Waiting for migration offer response from target")
respHeader := &migration.MigrationHeader{}
err = args.ControlReceive(respHeader)
if err != nil {
return fmt.Errorf("Failed receiving migration offer response: %w", err)
err := fmt.Errorf("Failed receiving migration offer response: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Got migration offer response from target")

// Negotiated migration types.
migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes)
if err != nil {
return fmt.Errorf("Failed to negotiate migration type: %w", err)
err := fmt.Errorf("Failed to negotiate migration type: %w", err)
op.Done(err)
return err
}

volSourceArgs := &localMigration.VolumeSourceArgs{
Expand Down Expand Up @@ -5945,7 +5967,16 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
}
}

return err
if err != nil {
op.Done(err)
return err
}

op.Done(nil)

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil))

return nil
}
}

Expand Down
45 changes: 36 additions & 9 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (d *qemu) onStop(target string) error {
// Log and emit lifecycle if not user triggered.
if op.GetInstanceInitiated() {
d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceShutdown.Event(d, nil))
} else {
} else if op.Action() != operationlock.ActionMigrate {
d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceStopped.Event(d, nil))
}

Expand Down Expand Up @@ -4749,7 +4749,7 @@ func (d *qemu) Stop(stateful bool) error {
// Don't allow reuse when creating a new stop operation. This prevents other operations from intefering.
// Allow reuse of a reusable ongoing stop operation as Shutdown() may be called first, which allows reuse
// of its operations. This allow for Stop() to inherit from Shutdown() where instance is stuck.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true)
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true)
if err != nil {
if errors.Is(err, operationlock.ErrNonReusuableSucceeded) {
// An existing matching operation has now succeeded, return.
Expand Down Expand Up @@ -6429,18 +6429,27 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
return fmt.Errorf("Live migration requires migration.stateful to be set to true")
}

// Setup a new operation.
op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true)
if err != nil {
return err
}

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

filesystemConn, err := args.FilesystemConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}

pool, err := storagePools.LoadByInstance(d.state, d)
if err != nil {
return fmt.Errorf("Failed loading instance: %w", err)
err := fmt.Errorf("Failed loading instance: %w", err)
op.Done(err)
return err
}

// The refresh argument passed to MigrationTypes() is always set
Expand All @@ -6449,7 +6458,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
// this, and adjust the migration types accordingly.
poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots)
if len(poolMigrationTypes) == 0 {
return fmt.Errorf("No source migration types available")
err := fmt.Errorf("No source migration types available")
op.Done(err)
return err
}

// Convert the pool's migration type options to an offer header to target.
Expand All @@ -6463,15 +6474,19 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
// For VMs, send block device size hint in offer header so that target can create the volume the same size.
blockSize, err := storagePools.InstanceDiskBlockSize(pool, d, d.op)
if err != nil {
return fmt.Errorf("Failed getting source disk size: %w", err)
err := fmt.Errorf("Failed getting source disk size: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Set migration offer volume size", logger.Ctx{"blockSize": blockSize})
offerHeader.VolumeSize = &blockSize

srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op)
if err != nil {
return fmt.Errorf("Failed generating instance migration config: %w", err)
err := fmt.Errorf("Failed generating instance migration config: %w", err)
op.Done(err)
return err
}

// If we are copying snapshots, retrieve a list of snapshots from source volume.
Expand All @@ -6497,23 +6512,29 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Debug("Sending migration offer to target")
err = args.ControlSend(offerHeader)
if err != nil {
return fmt.Errorf("Failed sending migration offer header: %w", err)
err := fmt.Errorf("Failed sending migration offer header: %w", err)
op.Done(err)
return err
}

// Receive response from target.
d.logger.Debug("Waiting for migration offer response from target")
respHeader := &migration.MigrationHeader{}
err = args.ControlReceive(respHeader)
if err != nil {
return fmt.Errorf("Failed receiving migration offer response: %w", err)
err := fmt.Errorf("Failed receiving migration offer response: %w", err)
op.Done(err)
return err
}

d.logger.Debug("Got migration offer response from target")

// Negotiated migration types.
migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes)
if err != nil {
return fmt.Errorf("Failed to negotiate migration type: %w", err)
err := fmt.Errorf("Failed to negotiate migration type: %w", err)
op.Done(err)
return err
}

volSourceArgs := &localMigration.VolumeSourceArgs{
Expand Down Expand Up @@ -6549,6 +6570,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
if args.Live && respHeader.Criu != nil && *respHeader.Criu == migration.CRIUType_VM_QEMU {
stateConn, err = args.StateConn(connectionsCtx)
if err != nil {
op.Done(err)
return err
}
}
Expand Down Expand Up @@ -6643,9 +6665,14 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
{
err := g.Wait()
if err != nil {
op.Done(err)
return err
}

op.Done(nil)

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil))

return nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/server/instance/operationlock/operationlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const ActionUpdate Action = "update"
// ActionDelete for deleting an instance.
const ActionDelete Action = "delete"

// ActionMigrate for migrating an instance.
const ActionMigrate Action = "migrate"

// ErrNonReusuableSucceeded is returned when no operation is created due to having to wait for a matching
// non-reusuable operation that has now completed successfully.
var ErrNonReusuableSucceeded error = fmt.Errorf("A matching non-reusable operation has now succeeded")
Expand Down
11 changes: 7 additions & 4 deletions internal/server/lifecycle/cluster_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ type ClusterMemberAction string

// All supported lifecycle events for cluster members.
const (
ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded)
ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved)
ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated)
ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed)
ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded)
ClusterMemberEvacuated = ClusterMemberAction(api.EventLifecycleClusterMemberEvacuated)
ClusterMemberHealed = ClusterMemberAction(api.EventLifecycleClusterMemberHealed)
ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved)
ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed)
ClusterMemberRestored = ClusterMemberAction(api.EventLifecycleClusterMemberRestored)
ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated)
)

// Event creates the lifecycle event for an action on a cluster member.
Expand Down
Loading
Loading