Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some clustering issues #1039

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cmd/incusd/api_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/pem"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -51,6 +52,7 @@ import (
apiScriptlet "github.com/lxc/incus/v6/shared/api/scriptlet"
"github.com/lxc/incus/v6/shared/logger"
"github.com/lxc/incus/v6/shared/osarch"
"github.com/lxc/incus/v6/shared/subprocess"
localtls "github.com/lxc/incus/v6/shared/tls"
"github.com/lxc/incus/v6/shared/util"
"github.com/lxc/incus/v6/shared/validate"
Expand Down Expand Up @@ -3082,7 +3084,7 @@ func clusterNodeStatePost(d *Daemon, r *http.Request) response.Response {
Live: live,
}

err := migrateInstance(ctx, s, inst, req, sourceMemberInfo, targetMemberInfo, op)
err := migrateInstance(ctx, s, inst, req, sourceMemberInfo, targetMemberInfo, "", op)
if err != nil {
return fmt.Errorf("Failed to migrate instance %q in project %q: %w", inst.Name(), inst.Project().Name, err)
}
Expand Down Expand Up @@ -4567,6 +4569,16 @@ func autoHealClusterTask(d *Daemon) (task.Func, task.Schedule) {
continue
}

// As an extra safety net, make sure the dead system doesn't still respond on the network.
hostAddress, _, err := net.SplitHostPort(member.Address)
if err == nil {
_, err := subprocess.RunCommand("ping", "-w1", "-c1", "-n", "-q", hostAddress)
if err == nil {
// Server isn't fully dead, not risking auto-healing.
continue
}
}

offlineMembers = append(offlineMembers, member)
}
}
Expand Down
44 changes: 43 additions & 1 deletion cmd/incusd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,49 @@ func allowPermission(objectType auth.ObjectType, entitlement auth.Entitlement, m
return projectName
}

objectName, err := auth.ObjectFromRequest(r, objectType, expandProject, expandFingerprint, muxVars...)
// Expansion function for volume location.
expandVolumeLocation := func(projectName string, poolName string, volumeTypeName string, volumeName string) string {
// The location field is only relevant in clusters.
if !d.serverClustered {
return ""
}

var err error
var nodes []db.NodeInfo
var poolID int64

// Convert the volume type name to our internal integer representation.
volumeType, err := storagePools.VolumeTypeNameToDBType(volumeTypeName)
if err != nil {
return ""
}

// Get the server list for the volume.
err = d.db.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error {
poolID, err = tx.GetStoragePoolID(ctx, poolName)
if err != nil {
return err
}

nodes, err = tx.GetStorageVolumeNodes(ctx, poolID, projectName, volumeName, volumeType)
if err != nil {
return err
}

return nil
})
if err != nil {
return ""
}

if len(nodes) != 1 {
return ""
}

return nodes[0].Name
}

objectName, err := auth.ObjectFromRequest(r, objectType, expandProject, expandFingerprint, expandVolumeLocation, muxVars...)
if err != nil {
return response.InternalError(fmt.Errorf("Failed to create authentication object: %w", err))
}
Expand Down
42 changes: 35 additions & 7 deletions cmd/incusd/instance_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/url"
"strings"

"github.com/gorilla/mux"

Expand Down Expand Up @@ -363,6 +364,12 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}
}

// If the user requested a specific server group, make sure we can have it recorded.
var targetGroupName string
if strings.HasPrefix(target, "@") {
targetGroupName = strings.TrimPrefix(target, "@")
}

// Check that we're not requested to move to the same location we're currently on.
if target != "" && targetMemberInfo.Name == inst.Location() {
return response.BadRequest(fmt.Errorf("Requested target server is the same as current server"))
Expand Down Expand Up @@ -395,7 +402,7 @@ func instancePost(d *Daemon, r *http.Request) response.Response {

// Setup the instance move operation.
run := func(op *operations.Operation) error {
return migrateInstance(context.TODO(), s, inst, req, sourceMemberInfo, targetMemberInfo, op)
return migrateInstance(context.TODO(), s, inst, req, sourceMemberInfo, targetMemberInfo, targetGroupName, op)
}

resources := map[string][]api.URL{}
Expand Down Expand Up @@ -445,7 +452,7 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}

// Perform the server-side migration.
func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance, req api.InstancePost, sourceMemberInfo *db.NodeInfo, targetMemberInfo *db.NodeInfo, op *operations.Operation) error {
func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance, req api.InstancePost, sourceMemberInfo *db.NodeInfo, targetMemberInfo *db.NodeInfo, targetGroupName string, op *operations.Operation) error {
// Load the instance storage pool.
sourcePool, err := storagePools.LoadByInstance(s, inst)
if err != nil {
Expand Down Expand Up @@ -496,6 +503,14 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
}
}

// Record the new group name if needed.
if targetGroupName != "" {
err = inst.VolatileSet(map[string]string{"volatile.cluster.group": targetGroupName})
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -810,14 +825,14 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
}

// Wait for the migration to complete.
err = destOp.Wait()
err = sourceOp.Wait(context.Background())
if err != nil {
return fmt.Errorf("Instance move to destination failed: %w", err)
return fmt.Errorf("Instance move to destination failed on source: %w", err)
}

err = sourceOp.Wait(context.Background())
err = destOp.Wait()
if err != nil {
return fmt.Errorf("Instance move to destination failed on source: %w", err)
return fmt.Errorf("Instance move to destination failed: %w", err)
}

// Update the database post-migration.
Expand All @@ -828,12 +843,25 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
return fmt.Errorf("Failed updating cluster member to %q for instance %q: %w", targetMemberInfo.Name, inst.Name(), err)
}

// Restore the original value of "volatile.apply_template".
id, err := dbCluster.GetInstanceID(ctx, tx.Tx(), inst.Project().Name, inst.Name())
if err != nil {
return fmt.Errorf("Failed to get ID of moved instance: %w", err)
}

// Set the cluster group record if needed.
if targetGroupName != "" {
err = tx.DeleteInstanceConfigKey(ctx, id, "volatile.cluster.group")
if err != nil {
return fmt.Errorf("Failed to remove volatile.cluster.group config key: %w", err)
}

err = tx.CreateInstanceConfig(ctx, int(id), map[string]string{"volatile.cluster.group": targetGroupName})
if err != nil {
return fmt.Errorf("Failed to set volatile.apply_template config key: %w", err)
}
}

// Restore the original value of "volatile.apply_template".
err = tx.DeleteInstanceConfigKey(ctx, id, "volatile.apply_template")
if err != nil {
return fmt.Errorf("Failed to remove volatile.apply_template config key: %w", err)
Expand Down
12 changes: 8 additions & 4 deletions internal/server/auth/authorization_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func NewObject(objectType ObjectType, projectName string, identifierElements ...
// Mux vars must be provided in the order that they are found in the endpoint path. If the object
// requires a project name, this is taken from the project query parameter unless the URL begins
// with /1.0/projects.
func ObjectFromRequest(r *http.Request, objectType ObjectType, expandProject func(string) string, expandFingerprint func(string, string) string, muxVars ...string) (Object, error) {
func ObjectFromRequest(r *http.Request, objectType ObjectType, expandProject func(string) string, expandFingerprint func(string, string) string, expandVolumeLocation func(string, string, string, string) string, muxVars ...string) (Object, error) {
// Shortcut for server objects which don't require any arguments.
if objectType == ObjectTypeServer {
return ObjectServer(), nil
Expand Down Expand Up @@ -196,11 +196,15 @@ func ObjectFromRequest(r *http.Request, objectType ObjectType, expandProject fun

if muxVar == "location" {
// Special handling for the location which is not present as a real mux var.
if location == "" {
continue
if location != "" {
muxValue = location
} else if objectType == ObjectTypeStorageVolume {
muxValue = expandVolumeLocation(projectName, vars["poolName"], vars["type"], vars["volumeName"])
}

muxValue = location
if muxValue == "" {
continue
}
} else {
muxValue, err = url.PathUnescape(vars[muxVar])
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -6415,7 +6415,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {

// Check for stateful support.
if args.Live && util.IsFalseOrEmpty(d.expandedConfig["migration.stateful"]) {
return fmt.Errorf("Stateful migration requires migration.stateful to be set to true")
return fmt.Errorf("Live migration requires migration.stateful to be set to true")
}

// Wait for essential migration connections before negotiation.
Expand Down
Loading