diff --git a/cmd/incusd/api_cluster_evacuation.go b/cmd/incusd/api_cluster_evacuation.go index b8442cb05c1..50753ec519d 100644 --- a/cmd/incusd/api_cluster_evacuation.go +++ b/cmd/incusd/api_cluster_evacuation.go @@ -21,6 +21,7 @@ import ( dbCluster "github.com/lxc/incus/v6/internal/server/db/cluster" "github.com/lxc/incus/v6/internal/server/db/operationtype" "github.com/lxc/incus/v6/internal/server/instance" + "github.com/lxc/incus/v6/internal/server/lifecycle" "github.com/lxc/incus/v6/internal/server/operations" "github.com/lxc/incus/v6/internal/server/response" "github.com/lxc/incus/v6/internal/server/scriptlet" @@ -147,6 +148,11 @@ func evacuateClusterMember(ctx context.Context, s *state.State, gateway *cluster networkShutdown(s) reverter.Success() + + if mode != "heal" { + s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberEvacuated.Event(name, op.Requestor(), nil)) + } + return nil } @@ -458,6 +464,9 @@ func restoreClusterMember(d *Daemon, r *http.Request) response.Response { } reverter.Success() + + s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberRestored.Event(originName, op.Requestor(), nil)) + return nil } @@ -675,6 +684,8 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) { } func healClusterMember(d *Daemon, op *operations.Operation, name string) error { + s := d.State() + logger.Info("Starting cluster healing", logger.Ctx{"server": name}) defer logger.Info("Completed cluster healing", logger.Ctx{"server": name}) @@ -744,15 +755,14 @@ func healClusterMember(d *Daemon, op *operations.Operation, name string) error { // Attempt up to 5 evacuations. var err error for i := 0; i < 5; i++ { - err = evacuateClusterMember(context.Background(), d.State(), d.gateway, op, name, "heal", nil, migrateFunc) + err = evacuateClusterMember(context.Background(), s, d.gateway, op, name, "heal", nil, migrateFunc) if err == nil { + s.Events.SendLifecycle(api.ProjectDefaultName, lifecycle.ClusterMemberHealed.Event(name, op.Requestor(), nil)) + return nil } } - if err != nil { - logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err}) - } - + logger.Error("Failed to heal cluster member", logger.Ctx{"server": name, "err": err}) return err } diff --git a/internal/server/instance/drivers/driver_common.go b/internal/server/instance/drivers/driver_common.go index faef632ccc8..b3142dd44d4 100644 --- a/internal/server/instance/drivers/driver_common.go +++ b/internal/server/instance/drivers/driver_common.go @@ -888,7 +888,7 @@ func (d *common) onStopOperationSetup(target string) (*operationlock.InstanceOpe // If there is another ongoing operation that isn't in our inheritable list, wait until that has finished // before proceeding to run the hook. op := operationlock.Get(d.Project().Name, d.Name()) - if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore) { + if op != nil && !op.ActionMatch(operationlock.ActionStart, operationlock.ActionRestart, operationlock.ActionStop, operationlock.ActionRestore, operationlock.ActionMigrate) { d.logger.Debug("Waiting for existing operation lock to finish before running hook", logger.Ctx{"action": op.Action()}) _ = op.Wait(context.Background()) op = nil diff --git a/internal/server/instance/drivers/driver_lxc.go b/internal/server/instance/drivers/driver_lxc.go index 79d7883b14c..04b348a7912 100644 --- a/internal/server/instance/drivers/driver_lxc.go +++ b/internal/server/instance/drivers/driver_lxc.go @@ -2881,7 +2881,7 @@ func (d *lxc) Stop(stateful bool) error { } // Setup a new operation - op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true) + op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true) if err != nil { if errors.Is(err, operationlock.ErrNonReusuableSucceeded) { // An existing matching operation has now succeeded, return. @@ -5503,12 +5503,19 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { d.logger.Debug("Migration send starting") defer d.logger.Debug("Migration send stopped") + // Setup a new operation. + op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true) + if err != nil { + return err + } + // Wait for essential migration connections before negotiation. connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() filesystemConn, err := args.FilesystemConn(connectionsCtx) if err != nil { + op.Done(err) return err } @@ -5516,13 +5523,16 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { if args.Live { stateConn, err = args.StateConn(connectionsCtx) if err != nil { + op.Done(err) return err } } pool, err := storagePools.LoadByInstance(d.state, d) if err != nil { - return fmt.Errorf("Failed loading instance: %w", err) + err := fmt.Errorf("Failed loading instance: %w", err) + op.Done(err) + return err } // The refresh argument passed to MigrationTypes() is always set to false here. @@ -5530,7 +5540,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { // sink/receiver will know this, and adjust the migration types accordingly. poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots) if len(poolMigrationTypes) == 0 { - return fmt.Errorf("No source migration types available") + err := fmt.Errorf("No source migration types available") + op.Done(err) + return err } // Convert the pool's migration type options to an offer header to target. @@ -5560,7 +5572,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { // Add idmap info to source header for containers. idmapset, err := d.DiskIdmap() if err != nil { - return fmt.Errorf("Failed getting container disk idmap: %w", err) + err := fmt.Errorf("Failed getting container disk idmap: %w", err) + op.Done(err) + return err } else if idmapset != nil { offerHeader.Idmap = make([]*migration.IDMapType, 0, len(idmapset.Entries)) for _, ctnIdmap := range idmapset.Entries { @@ -5578,7 +5592,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op) if err != nil { - return fmt.Errorf("Failed generating instance migration config: %w", err) + err := fmt.Errorf("Failed generating instance migration config: %w", err) + op.Done(err) + return err } // If we are copying snapshots, retrieve a list of snapshots from source volume. @@ -5596,7 +5612,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { d.logger.Debug("Sending migration offer to target") err = args.ControlSend(offerHeader) if err != nil { - return fmt.Errorf("Failed sending migration offer: %w", err) + err := fmt.Errorf("Failed sending migration offer: %w", err) + op.Done(err) + return err } // Receive response from target. @@ -5604,7 +5622,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { respHeader := &migration.MigrationHeader{} err = args.ControlReceive(respHeader) if err != nil { - return fmt.Errorf("Failed receiving migration offer response: %w", err) + err := fmt.Errorf("Failed receiving migration offer response: %w", err) + op.Done(err) + return err } d.logger.Debug("Got migration offer response from target") @@ -5612,7 +5632,9 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { // Negotiated migration types. migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes) if err != nil { - return fmt.Errorf("Failed to negotiate migration type: %w", err) + err := fmt.Errorf("Failed to negotiate migration type: %w", err) + op.Done(err) + return err } volSourceArgs := &localMigration.VolumeSourceArgs{ @@ -5945,7 +5967,16 @@ func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error { } } - return err + if err != nil { + op.Done(err) + return err + } + + op.Done(nil) + + d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil)) + + return nil } } diff --git a/internal/server/instance/drivers/driver_qemu.go b/internal/server/instance/drivers/driver_qemu.go index 08319b58ea5..f83c77c0e4d 100644 --- a/internal/server/instance/drivers/driver_qemu.go +++ b/internal/server/instance/drivers/driver_qemu.go @@ -721,7 +721,7 @@ func (d *qemu) onStop(target string) error { // Log and emit lifecycle if not user triggered. if op.GetInstanceInitiated() { d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceShutdown.Event(d, nil)) - } else { + } else if op.Action() != operationlock.ActionMigrate { d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceStopped.Event(d, nil)) } @@ -4749,7 +4749,7 @@ func (d *qemu) Stop(stateful bool) error { // Don't allow reuse when creating a new stop operation. This prevents other operations from intefering. // Allow reuse of a reusable ongoing stop operation as Shutdown() may be called first, which allows reuse // of its operations. This allow for Stop() to inherit from Shutdown() where instance is stuck. - op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore}, false, true) + op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionStop, []operationlock.Action{operationlock.ActionRestart, operationlock.ActionRestore, operationlock.ActionMigrate}, false, true) if err != nil { if errors.Is(err, operationlock.ErrNonReusuableSucceeded) { // An existing matching operation has now succeeded, return. @@ -6429,18 +6429,27 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { return fmt.Errorf("Live migration requires migration.stateful to be set to true") } + // Setup a new operation. + op, err := operationlock.CreateWaitGet(d.Project().Name, d.Name(), operationlock.ActionMigrate, nil, false, true) + if err != nil { + return err + } + // Wait for essential migration connections before negotiation. connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() filesystemConn, err := args.FilesystemConn(connectionsCtx) if err != nil { + op.Done(err) return err } pool, err := storagePools.LoadByInstance(d.state, d) if err != nil { - return fmt.Errorf("Failed loading instance: %w", err) + err := fmt.Errorf("Failed loading instance: %w", err) + op.Done(err) + return err } // The refresh argument passed to MigrationTypes() is always set @@ -6449,7 +6458,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { // this, and adjust the migration types accordingly. poolMigrationTypes := pool.MigrationTypes(storagePools.InstanceContentType(d), false, args.Snapshots) if len(poolMigrationTypes) == 0 { - return fmt.Errorf("No source migration types available") + err := fmt.Errorf("No source migration types available") + op.Done(err) + return err } // Convert the pool's migration type options to an offer header to target. @@ -6463,7 +6474,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { // For VMs, send block device size hint in offer header so that target can create the volume the same size. blockSize, err := storagePools.InstanceDiskBlockSize(pool, d, d.op) if err != nil { - return fmt.Errorf("Failed getting source disk size: %w", err) + err := fmt.Errorf("Failed getting source disk size: %w", err) + op.Done(err) + return err } d.logger.Debug("Set migration offer volume size", logger.Ctx{"blockSize": blockSize}) @@ -6471,7 +6484,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { srcConfig, err := pool.GenerateInstanceBackupConfig(d, args.Snapshots, d.op) if err != nil { - return fmt.Errorf("Failed generating instance migration config: %w", err) + err := fmt.Errorf("Failed generating instance migration config: %w", err) + op.Done(err) + return err } // If we are copying snapshots, retrieve a list of snapshots from source volume. @@ -6497,7 +6512,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { d.logger.Debug("Sending migration offer to target") err = args.ControlSend(offerHeader) if err != nil { - return fmt.Errorf("Failed sending migration offer header: %w", err) + err := fmt.Errorf("Failed sending migration offer header: %w", err) + op.Done(err) + return err } // Receive response from target. @@ -6505,7 +6522,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { respHeader := &migration.MigrationHeader{} err = args.ControlReceive(respHeader) if err != nil { - return fmt.Errorf("Failed receiving migration offer response: %w", err) + err := fmt.Errorf("Failed receiving migration offer response: %w", err) + op.Done(err) + return err } d.logger.Debug("Got migration offer response from target") @@ -6513,7 +6532,9 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { // Negotiated migration types. migrationTypes, err := localMigration.MatchTypes(respHeader, migration.MigrationFSType_RSYNC, poolMigrationTypes) if err != nil { - return fmt.Errorf("Failed to negotiate migration type: %w", err) + err := fmt.Errorf("Failed to negotiate migration type: %w", err) + op.Done(err) + return err } volSourceArgs := &localMigration.VolumeSourceArgs{ @@ -6549,6 +6570,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { if args.Live && respHeader.Criu != nil && *respHeader.Criu == migration.CRIUType_VM_QEMU { stateConn, err = args.StateConn(connectionsCtx) if err != nil { + op.Done(err) return err } } @@ -6643,9 +6665,14 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error { { err := g.Wait() if err != nil { + op.Done(err) return err } + op.Done(nil) + + d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceMigrated.Event(d, nil)) + return nil } } diff --git a/internal/server/instance/operationlock/operationlock.go b/internal/server/instance/operationlock/operationlock.go index c4c60ee3113..5c7f1f1173e 100644 --- a/internal/server/instance/operationlock/operationlock.go +++ b/internal/server/instance/operationlock/operationlock.go @@ -33,6 +33,9 @@ const ActionUpdate Action = "update" // ActionDelete for deleting an instance. const ActionDelete Action = "delete" +// ActionMigrate for migrating an instance. +const ActionMigrate Action = "migrate" + // ErrNonReusuableSucceeded is returned when no operation is created due to having to wait for a matching // non-reusuable operation that has now completed successfully. var ErrNonReusuableSucceeded error = fmt.Errorf("A matching non-reusable operation has now succeeded") diff --git a/internal/server/lifecycle/cluster_member.go b/internal/server/lifecycle/cluster_member.go index 64b1c214826..4c2245cf221 100644 --- a/internal/server/lifecycle/cluster_member.go +++ b/internal/server/lifecycle/cluster_member.go @@ -10,10 +10,13 @@ type ClusterMemberAction string // All supported lifecycle events for cluster members. const ( - ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded) - ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved) - ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated) - ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed) + ClusterMemberAdded = ClusterMemberAction(api.EventLifecycleClusterMemberAdded) + ClusterMemberEvacuated = ClusterMemberAction(api.EventLifecycleClusterMemberEvacuated) + ClusterMemberHealed = ClusterMemberAction(api.EventLifecycleClusterMemberHealed) + ClusterMemberRemoved = ClusterMemberAction(api.EventLifecycleClusterMemberRemoved) + ClusterMemberRenamed = ClusterMemberAction(api.EventLifecycleClusterMemberRenamed) + ClusterMemberRestored = ClusterMemberAction(api.EventLifecycleClusterMemberRestored) + ClusterMemberUpdated = ClusterMemberAction(api.EventLifecycleClusterMemberUpdated) ) // Event creates the lifecycle event for an action on a cluster member. diff --git a/internal/server/lifecycle/instance.go b/internal/server/lifecycle/instance.go index 583e80b227d..51d83032fe8 100644 --- a/internal/server/lifecycle/instance.go +++ b/internal/server/lifecycle/instance.go @@ -18,25 +18,26 @@ type InstanceAction string // All supported lifecycle events for instances. const ( + InstanceConsole = InstanceAction(api.EventLifecycleInstanceConsole) + InstanceConsoleReset = InstanceAction(api.EventLifecycleInstanceConsoleReset) + InstanceConsoleRetrieved = InstanceAction(api.EventLifecycleInstanceConsoleRetrieved) InstanceCreated = InstanceAction(api.EventLifecycleInstanceCreated) - InstanceStarted = InstanceAction(api.EventLifecycleInstanceStarted) - InstanceStopped = InstanceAction(api.EventLifecycleInstanceStopped) - InstanceShutdown = InstanceAction(api.EventLifecycleInstanceShutdown) - InstanceRestarted = InstanceAction(api.EventLifecycleInstanceRestarted) + InstanceDeleted = InstanceAction(api.EventLifecycleInstanceDeleted) + InstanceExec = InstanceAction(api.EventLifecycleInstanceExec) + InstanceFileDeleted = InstanceAction(api.EventLifecycleInstanceFileDeleted) + InstanceFilePushed = InstanceAction(api.EventLifecycleInstanceFilePushed) + InstanceFileRetrieved = InstanceAction(api.EventLifecycleInstanceFileRetrieved) + InstanceMigrated = InstanceAction(api.EventLifecycleInstanceMigrated) InstancePaused = InstanceAction(api.EventLifecycleInstancePaused) InstanceReady = InstanceAction(api.EventLifecycleInstanceReady) - InstanceResumed = InstanceAction(api.EventLifecycleInstanceResumed) - InstanceRestored = InstanceAction(api.EventLifecycleInstanceRestored) - InstanceDeleted = InstanceAction(api.EventLifecycleInstanceDeleted) InstanceRenamed = InstanceAction(api.EventLifecycleInstanceRenamed) + InstanceRestarted = InstanceAction(api.EventLifecycleInstanceRestarted) + InstanceRestored = InstanceAction(api.EventLifecycleInstanceRestored) + InstanceResumed = InstanceAction(api.EventLifecycleInstanceResumed) + InstanceShutdown = InstanceAction(api.EventLifecycleInstanceShutdown) + InstanceStarted = InstanceAction(api.EventLifecycleInstanceStarted) + InstanceStopped = InstanceAction(api.EventLifecycleInstanceStopped) InstanceUpdated = InstanceAction(api.EventLifecycleInstanceUpdated) - InstanceExec = InstanceAction(api.EventLifecycleInstanceExec) - InstanceConsole = InstanceAction(api.EventLifecycleInstanceConsole) - InstanceConsoleRetrieved = InstanceAction(api.EventLifecycleInstanceConsoleRetrieved) - InstanceConsoleReset = InstanceAction(api.EventLifecycleInstanceConsoleReset) - InstanceFileRetrieved = InstanceAction(api.EventLifecycleInstanceFileRetrieved) - InstanceFilePushed = InstanceAction(api.EventLifecycleInstanceFilePushed) - InstanceFileDeleted = InstanceAction(api.EventLifecycleInstanceFileDeleted) ) // Event creates the lifecycle event for an action on an instance. diff --git a/internal/server/request/request.go b/internal/server/request/request.go index 8079a69be7a..3667035adc9 100644 --- a/internal/server/request/request.go +++ b/internal/server/request/request.go @@ -42,6 +42,12 @@ func CreateRequestor(r *http.Request) *api.EventLifecycleRequestor { requestor.Address = val } + // Strip port from address. + host, _, err := net.SplitHostPort(requestor.Address) + if err == nil { + requestor.Address = host + } + return requestor } diff --git a/shared/api/event_lifecycle.go b/shared/api/event_lifecycle.go index b0e29a97b34..4706e1cd762 100644 --- a/shared/api/event_lifecycle.go +++ b/shared/api/event_lifecycle.go @@ -13,8 +13,11 @@ const ( EventLifecycleClusterGroupRenamed = "cluster-group-renamed" EventLifecycleClusterGroupUpdated = "cluster-group-updated" EventLifecycleClusterMemberAdded = "cluster-member-added" + EventLifecycleClusterMemberEvacuated = "cluster-member-evacuated" + EventLifecycleClusterMemberHealed = "cluster-member-healed" EventLifecycleClusterMemberRemoved = "cluster-member-removed" EventLifecycleClusterMemberRenamed = "cluster-member-renamed" + EventLifecycleClusterMemberRestored = "cluster-member-restored" EventLifecycleClusterMemberUpdated = "cluster-member-updated" EventLifecycleClusterTokenCreated = "cluster-token-created" EventLifecycleConfigUpdated = "config-updated" @@ -48,6 +51,7 @@ const ( EventLifecycleInstanceMetadataTemplateDeleted = "instance-metadata-template-deleted" EventLifecycleInstanceMetadataTemplateRetrieved = "instance-metadata-template-retrieved" EventLifecycleInstanceMetadataUpdated = "instance-metadata-updated" + EventLifecycleInstanceMigrated = "instance-migrated" EventLifecycleInstancePaused = "instance-paused" EventLifecycleInstanceReady = "instance-ready" EventLifecycleInstanceRenamed = "instance-renamed" @@ -98,9 +102,6 @@ const ( EventLifecycleProjectDeleted = "project-deleted" EventLifecycleProjectRenamed = "project-renamed" EventLifecycleProjectUpdated = "project-updated" - EventLifecycleStoragePoolCreated = "storage-pool-created" - EventLifecycleStoragePoolDeleted = "storage-pool-deleted" - EventLifecycleStoragePoolUpdated = "storage-pool-updated" EventLifecycleStorageBucketBackupCreated = "storage-bucket-backup-created" EventLifecycleStorageBucketBackupDeleted = "storage-bucket-backup-deleted" EventLifecycleStorageBucketBackupRenamed = "storage-bucket-backup-renamed" @@ -111,11 +112,14 @@ const ( EventLifecycleStorageBucketKeyDeleted = "storage-bucket-key-deleted" EventLifecycleStorageBucketKeyUpdated = "storage-bucket-key-updated" EventLifecycleStorageBucketUpdated = "storage-bucket-updated" - EventLifecycleStorageVolumeCreated = "storage-volume-created" + EventLifecycleStoragePoolCreated = "storage-pool-created" + EventLifecycleStoragePoolDeleted = "storage-pool-deleted" + EventLifecycleStoragePoolUpdated = "storage-pool-updated" EventLifecycleStorageVolumeBackupCreated = "storage-volume-backup-created" EventLifecycleStorageVolumeBackupDeleted = "storage-volume-backup-deleted" EventLifecycleStorageVolumeBackupRenamed = "storage-volume-backup-renamed" EventLifecycleStorageVolumeBackupRetrieved = "storage-volume-backup-retrieved" + EventLifecycleStorageVolumeCreated = "storage-volume-created" EventLifecycleStorageVolumeDeleted = "storage-volume-deleted" EventLifecycleStorageVolumeRenamed = "storage-volume-renamed" EventLifecycleStorageVolumeRestored = "storage-volume-restored"