Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bugfixes #1054

Merged
merged 16 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions client/incus_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ func (r *ProtocolIncus) tryCreateInstance(req api.InstancesPost, urls []string,
operation := req.Source.Operation

// Forward targetOp to remote op
chConnect := make(chan error, 1)
chWait := make(chan error, 1)

go func() {
success := false
var errors []remoteOperationResult
Expand Down Expand Up @@ -665,13 +668,35 @@ func (r *ProtocolIncus) tryCreateInstance(req api.InstancesPost, urls []string,
break
}

if !success {
rop.err = remoteOperationError("Failed instance creation", errors)
if success {
chConnect <- nil
close(chConnect)
} else {
chConnect <- remoteOperationError("Failed instance creation", errors)
close(chConnect)

if op != nil {
_ = op.Cancel()
}
}
}()

if op != nil {
go func() {
chWait <- op.Wait()
close(chWait)
}()
}

go func() {
var err error

select {
case err = <-chConnect:
case err = <-chWait:
}

rop.err = err
close(rop.chDone)
}()

Expand Down
14 changes: 4 additions & 10 deletions cmd/incus/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,6 @@ func (c *cmdImageCopy) Run(cmd *cobra.Command, args []string) error {
return err
}

// Revert project for `sourceServer` which may have been overwritten
// by `--project` flag in `GetImageServer` method
remote := conf.Remotes[remoteName]
if remote.Protocol == "incus" && !remote.Public {
d, ok := sourceServer.(incus.InstanceServer)
if ok {
sourceServer = d.UseProject(remote.Project)
}
}

// Parse destination remote
resources, err := c.global.ParseServers(args[1])
if err != nil {
Expand All @@ -228,8 +218,12 @@ func (c *cmdImageCopy) Run(cmd *cobra.Command, args []string) error {
imageType = "virtual-machine"
}

// Set the correct project on target.
remote := conf.Remotes[resources[0].remote]
if c.flagTargetProject != "" {
destinationServer = destinationServer.UseProject(c.flagTargetProject)
} else if remote.Protocol == "incus" {
destinationServer = destinationServer.UseProject(remote.Project)
}

// Copy the image
Expand Down
5 changes: 5 additions & 0 deletions cmd/incusd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ func (s *httpServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}

func setCORSHeaders(rw http.ResponseWriter, req *http.Request, config *clusterConfig.Config) {
// Check if we have a working config.
if config == nil {
return
}

allowedOrigin := config.HTTPSAllowedOrigin()
origin := req.Header.Get("Origin")
if allowedOrigin != "" && origin != "" {
Expand Down
34 changes: 18 additions & 16 deletions cmd/incusd/instance_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,25 +225,27 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}

// Checks for running instances.
if inst.IsRunning() && (req.Pool != "" || req.Project != "" || target != "") {
// Stateless migrations need the instance stopped.
if !req.Live {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved statelessly"))
}
if inst.IsRunning() {
if req.Pool != "" || req.Project != "" || target != "" {
// Stateless migrations need the instance stopped.
if !req.Live {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved statelessly"))
}

// Storage pool changes require a stopped instance.
if req.Pool != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across storage pools"))
}
// Storage pool changes require a stopped instance.
if req.Pool != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across storage pools"))
}

// Project changes require a stopped instance.
if req.Project != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across projects"))
}
// Project changes require a stopped instance.
if req.Project != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across projects"))
}

// Name changes require a stopped instance.
if req.Name != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to change their names"))
// Name changes require a stopped instance.
if req.Name != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to change their names"))
}
}
} else {
// Clear Live flag if instance isn't running.
Expand Down
12 changes: 6 additions & 6 deletions cmd/incusd/migrate_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ func (s *migrationSourceWs) Do(state *state.State, migrateOp *operations.Operati
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
defer cancel()

l.Info("Waiting for migration control connection on source")
l.Debug("Waiting for migration control connection on source")

_, err := s.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on source: %w", err)
}

l.Info("Migration control connection established on source")
l.Debug("Migration control connection established on source")

defer l.Info("Migration channels disconnected on source")
defer l.Debug("Migration channels disconnected on source")
defer s.disconnect()

stateConnFunc := func(ctx context.Context) (io.ReadWriteCloser, error) {
Expand Down Expand Up @@ -215,16 +215,16 @@ func (c *migrationSink) Do(state *state.State, instOp *operationlock.InstanceOpe
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*10)
defer cancel()

l.Info("Waiting for migration control connection on target")
l.Debug("Waiting for migration control connection on target")

_, err := c.conns[api.SecretNameControl].WebSocket(ctx)
if err != nil {
return fmt.Errorf("Failed waiting for migration control connection on target: %w", err)
}

l.Info("Migration control connection established on target")
l.Debug("Migration control connection established on target")

defer l.Info("Migration channels disconnected on target")
defer l.Debug("Migration channels disconnected on target")

if c.push {
defer c.disconnect()
Expand Down
34 changes: 26 additions & 8 deletions cmd/incusd/storage_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,21 @@ func storagePoolDelete(d *Daemon, r *http.Request) response.Response {
}
}

// If the pool requires deactivation, go through it first.
if !clusterNotification && pool.Driver().Info().Remote && pool.Driver().Info().Deactivate {
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}

return client.DeleteStoragePool(pool.Name())
})
if err != nil {
return response.SmartError(err)
}
}

if pool.LocalStatus() != api.StoragePoolStatusPending {
err = pool.Delete(clientType, nil)
if err != nil {
Expand All @@ -1024,15 +1039,18 @@ func storagePoolDelete(d *Daemon, r *http.Request) response.Response {
return response.EmptySyncResponse
}

// If we are clustered, also notify all other nodes.
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}
// If clustered and dealing with a normal pool, notify all other nodes.
if !pool.Driver().Info().Remote || !pool.Driver().Info().Deactivate {
err = notifier(func(client incus.InstanceServer) error {
_, _, err := client.GetServer()
if err != nil {
return err
}

return client.DeleteStoragePool(pool.Name())
})
}

return client.DeleteStoragePool(pool.Name())
})
if err != nil {
return response.SmartError(err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/cluster/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Connect(address string, networkCert *localtls.CertInfo, serverCert *localtl
defer cancel()
err := EventListenerWait(ctx, address)
if err != nil {
return nil, fmt.Errorf("Missing event connection with target cluster member")
return nil, err
}
}

Expand Down
17 changes: 16 additions & 1 deletion internal/server/cluster/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"fmt"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -115,6 +116,7 @@ var eventHubAddresses []string
var eventHubPushCh = make(chan api.Event, 10) // Buffer size to accommodate slow consumers before dropping events.
var eventHubPushChTimeout = time.Duration(time.Second)
var listeners = map[string]*eventListenerClient{}
var listenersUnavailable = map[string]bool{}
var listenersNotify = map[chan struct{}][]string{}
var listenersLock sync.Mutex
var listenersUpdateLock sync.Mutex
Expand Down Expand Up @@ -149,6 +151,11 @@ func EventListenerWait(ctx context.Context, address string) error {
return nil
}

if listenersUnavailable[address] {
listenersLock.Unlock()
return fmt.Errorf("Server isn't ready yet")
}

listenAddresses := []string{address}

// Check if operating in event hub mode and if one of the event hub connections is available.
Expand Down Expand Up @@ -181,7 +188,11 @@ func EventListenerWait(ctx context.Context, address string) error {
case <-connected:
return nil
case <-ctx.Done():
return ctx.Err()
if ctx.Err() != nil {
return fmt.Errorf("Missing event connection with target cluster member")
}

return nil
}
}

Expand Down Expand Up @@ -306,6 +317,9 @@ func EventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,
l := logger.AddContext(logger.Ctx{"local": localAddress, "remote": m.Address})

if !HasConnectivity(endpoints.NetworkCert(), serverCert(), m.Address, true) {
listenersLock.Lock()
listenersUnavailable[m.Address] = true
listenersLock.Unlock()
return
}

Expand All @@ -325,6 +339,7 @@ func EventsUpdateListeners(endpoints *endpoints.Endpoints, cluster *db.Cluster,

listenersLock.Lock()
listeners[m.Address] = listener
listenersUnavailable[m.Address] = false

// Indicate to any notifiers waiting for this member's address that it is connected.
for connected, notifyAddresses := range listenersNotify {
Expand Down
14 changes: 7 additions & 7 deletions internal/server/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3397,7 +3397,7 @@ func (d *lxc) Freeze() error {

// Check if the CGroup is available
if !d.state.OS.CGInfo.Supports(cgroup.Freezer, cg) {
d.logger.Info("Unable to freeze container (lack of kernel support)", ctxMap)
d.logger.Warn("Unable to freeze container (lack of kernel support)", ctxMap)
return nil
}

Expand Down Expand Up @@ -3447,7 +3447,7 @@ func (d *lxc) Unfreeze() error {

// Check if the CGroup is available
if !d.state.OS.CGInfo.Supports(cgroup.Freezer, cg) {
d.logger.Info("Unable to unfreeze container (lack of kernel support)", ctxMap)
d.logger.Warn("Unable to unfreeze container (lack of kernel support)", ctxMap)
return nil
}

Expand Down Expand Up @@ -5500,8 +5500,8 @@ fi
}

func (d *lxc) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Info("Migration send starting")
defer d.logger.Info("Migration send stopped")
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -6087,8 +6087,8 @@ func (d *lxc) resetContainerDiskIdmap(srcIdmap *idmap.Set) error {
}

func (d *lxc) MigrateReceive(args instance.MigrateReceiveArgs) error {
d.logger.Info("Migration receive starting")
defer d.logger.Info("Migration receive stopped")
d.logger.Debug("Migration receive starting")
defer d.logger.Debug("Migration receive stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down Expand Up @@ -6811,7 +6811,7 @@ func (d *lxc) migrate(args *instance.CriuMigrationArgs) error {
if migrateErr != nil {
log, err2 := getCRIULogErrors(finalStateDir, prettyCmd)
if err2 == nil {
d.logger.Info("Failed migrating container", ctxMap)
d.logger.Warn("Failed migrating container", ctxMap)
migrateErr = fmt.Errorf("%s %s failed\n%s", args.Function, prettyCmd, log)
}

Expand Down
15 changes: 5 additions & 10 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,10 +964,7 @@ func (d *qemu) restoreState(monitor *qmp.Monitor) error {
}

go func() {
_, err := io.Copy(pipeWrite, stateConn)
if err != nil {
d.logger.Warn("Failed reading from state connection", logger.Ctx{"err": err})
}
_, _ = io.Copy(pipeWrite, stateConn)

_ = pipeRead.Close()
_ = pipeWrite.Close()
Expand Down Expand Up @@ -4782,8 +4779,6 @@ func (d *qemu) Stop(stateful bool) error {
return err
}

d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceStopped.Event(d, nil))

op.Done(nil)
return nil
}
Expand Down Expand Up @@ -6426,8 +6421,8 @@ func (d *qemu) Export(w io.Writer, properties map[string]string, expiration time

// MigrateSend is not currently supported.
func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
d.logger.Info("Migration send starting")
defer d.logger.Info("Migration send stopped")
d.logger.Debug("Migration send starting")
defer d.logger.Debug("Migration send stopped")

// Check for stateful support.
if args.Live && util.IsFalseOrEmpty(d.expandedConfig["migration.stateful"]) {
Expand Down Expand Up @@ -7004,8 +6999,8 @@ func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName str
}

func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
d.logger.Info("Migration receive starting")
defer d.logger.Info("Migration receive stopped")
d.logger.Debug("Migration receive starting")
defer d.logger.Debug("Migration receive stopped")

// Wait for essential migration connections before negotiation.
connectionsCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
Expand Down
Loading
Loading