Skip to content

Commit

Permalink
Merge pull request #13257 from mihalicyn/vm_instance_cpu_auto_pinning
Browse files Browse the repository at this point in the history
VM: Instance's CPU auto rebalancing/pinning
  • Loading branch information
tomponline authored Apr 5, 2024
2 parents d65ad02 + 801e031 commit 7e48a31
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 53 deletions.
50 changes: 19 additions & 31 deletions lxd/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,26 +390,26 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
}
}

// deviceTaskBalance is used to balance the CPU load across containers running on a host.
// deviceTaskBalance is used to balance the CPU load across instances running on a host.
// It first checks if CGroup support is available and returns if it isn't.
// It then retrieves the effective CPU list (the CPUs that are guaranteed to be online) and isolates any isolated CPUs.
// After that, it loads all instances of containers running on the node and iterates through them.
// After that, it loads all instances running on the node and iterates through them.
//
// For each container, it checks its CPU limits and determines whether it is pinned to specific CPUs or can use the load-balancing mechanism.
// For each instance, it checks its CPU limits and determines whether it is pinned to specific CPUs or can use the load-balancing mechanism.
// If it is pinned, the function adds it to the fixedInstances map with the CPU numbers it is pinned to.
// If not, the container will be included in the load-balancing calculation,
// If not, the instance will be included in the load-balancing calculation,
// and the number of CPUs it can use is determined by taking the minimum of its assigned CPUs and the available CPUs. Note that if
// NUMA placement is enabled (`limits.cpu.nodes` is not empty), we apply a similar load-balancing logic to the `fixedInstances` map
// with a constraint being the number of vCPUs and the CPU pool being the CPUs pinned to a set of NUMA nodes.
//
// Next, the function balance the CPU usage by iterating over all the CPUs and dividing the containers into those that
// are pinned to a specific CPU and those that are load-balanced. For the pinned containers,
// Next, the function balance the CPU usage by iterating over all the CPUs and dividing the instances into those that
// are pinned to a specific CPU and those that are load-balanced. For the pinned instances,
// it adds them to the pinning map with the CPU number it's pinned to.
// For the load-balanced containers, it sorts the available CPUs based on their usage count and assigns them to containers
// For the load-balanced instances, it sorts the available CPUs based on their usage count and assigns them to instances
// in ascending order until the required number of CPUs have been assigned.
// Finally, the pinning map is used to set the new CPU pinning for each container, updating it to the new balanced state.
// Finally, the pinning map is used to set the new CPU pinning for each instance, updating it to the new balanced state.
//
// Overall, this function ensures that the CPU resources of the host are utilized effectively amongst all the containers running on it.
// Overall, this function ensures that the CPU resources of the host are utilized effectively amongst all the instances running on it.
func deviceTaskBalance(s *state.State) {
min := func(x, y int) int {
if x < y {
Expand Down Expand Up @@ -465,7 +465,7 @@ func deviceTaskBalance(s *state.State) {
}

// Iterate through the instances
instances, err := instance.LoadNodeAll(s, instancetype.Container)
instances, err := instance.LoadNodeAll(s, instancetype.Any)
if err != nil {
logger.Error("Problem loading instances list", logger.Ctx{"err": err})
return
Expand Down Expand Up @@ -511,8 +511,8 @@ func deviceTaskBalance(s *state.State) {
cpulimit = effectiveCpus
}

// Check that the container is running.
// We use InitPID here rather than IsRunning because this task is triggered during the container's
// Check that the instance is running.
// We use InitPID here rather than IsRunning because this task can be triggered during the container's
// onStart hook, which is during the time that the start lock is held, which causes IsRunning to
// return false (because the container hasn't fully started yet) but it is sufficiently started to
// have its cgroup CPU limits set.
Expand All @@ -531,16 +531,16 @@ func deviceTaskBalance(s *state.State) {
}
} else {
// Pinned
containerCpus, err := resources.ParseCpuset(cpulimit)
instanceCpus, err := resources.ParseCpuset(cpulimit)
if err != nil {
return
}

if len(numaCpus) > 0 {
logger.Warnf("The pinned CPUs: %v, override the NUMA configuration with the CPUs: %v", containerCpus, numaCpus)
logger.Warnf("The pinned CPUs: %v, override the NUMA configuration with the CPUs: %v", instanceCpus, numaCpus)
}

fillFixedInstances(fixedInstances, c, cpus, containerCpus, len(containerCpus), false)
fillFixedInstances(fixedInstances, c, cpus, instanceCpus, len(instanceCpus), false)
}
}

Expand All @@ -561,7 +561,7 @@ func deviceTaskBalance(s *state.State) {
for cpu, ctns := range fixedInstances {
c, ok := usage[cpu]
if !ok {
logger.Errorf("Internal error: container using unavailable cpu")
logger.Errorf("Internal error: instance using unavailable cpu")
continue
}

Expand Down Expand Up @@ -603,22 +603,10 @@ func deviceTaskBalance(s *state.State) {
}

// Set the new pinning
for ctn, set := range pinning {
// Confirm the container didn't just stop
if ctn.InitPID() <= 0 {
continue
}

sort.Strings(set)
cg, err := ctn.CGroup()
if err != nil {
logger.Error("balance: Unable to get cgroup struct", logger.Ctx{"name": ctn.Name(), "err": err, "value": strings.Join(set, ",")})
continue
}

err = cg.SetCpuset(strings.Join(set, ","))
for inst, set := range pinning {
err = inst.SetAffinity(set)
if err != nil {
logger.Error("balance: Unable to set cpuset", logger.Ctx{"name": ctn.Name(), "err": err, "value": strings.Join(set, ",")})
logger.Error("Error setting CPU affinity for the instance", logger.Ctx{"project": inst.Project().Name, "instance": inst.Name(), "err": err})
}
}
}
Expand Down
34 changes: 31 additions & 3 deletions lxd/instance/drivers/driver_lxc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4185,7 +4185,6 @@ func (d *lxc) Update(args db.InstanceArgs, userRequested bool) error {
d.release()
d.cConfig = false
_, _ = d.initLXC(true)
cgroup.TaskSchedulerTrigger("container", d.name, "changed")
}
}()

Expand Down Expand Up @@ -4411,6 +4410,8 @@ func (d *lxc) Update(args db.InstanceArgs, userRequested bool) error {
}
}

cpuLimitWasChanged := false

// Apply the live changes
if isRunning {
cc, err := d.initLXC(false)
Expand Down Expand Up @@ -4665,8 +4666,7 @@ func (d *lxc) Update(args db.InstanceArgs, userRequested bool) error {
}
}
} else if key == "limits.cpu" || key == "limits.cpu.nodes" {
// Trigger a scheduler re-run
cgroup.TaskSchedulerTrigger("container", d.name, "changed")
cpuLimitWasChanged = true
} else if key == "limits.cpu.priority" || key == "limits.cpu.allowance" {
// Skip if no cpu CGroup
if !d.state.OS.CGInfo.Supports(cgroup.CPU, cg) {
Expand Down Expand Up @@ -4868,6 +4868,11 @@ func (d *lxc) Update(args db.InstanceArgs, userRequested bool) error {
// Success, update the closure to mark that the changes should be kept.
undoChanges = false

if cpuLimitWasChanged {
// Trigger a scheduler re-run
cgroup.TaskSchedulerTrigger("container", d.name, "changed")
}

if userRequested {
if d.isSnapshot {
d.state.Events.SendLifecycle(d.project.Name, lifecycle.InstanceSnapshotUpdated.Event(d, nil))
Expand Down Expand Up @@ -8193,6 +8198,29 @@ func (d *lxc) CGroup() (*cgroup.CGroup, error) {
return d.cgroup(cc, true)
}

// SetAffinity sets affinity in the container according with a set provided.
func (d *lxc) SetAffinity(set []string) error {
sort.Strings(set)
affinitySet := strings.Join(set, ",")

// Confirm the container didn't just stop
if d.InitPID() <= 0 {
return nil
}

cg, err := d.CGroup()
if err != nil {
return fmt.Errorf("Unable to get cgroup struct: %w", err)
}

err = cg.SetCpuset(affinitySet)
if err != nil {
return fmt.Errorf("Unable to set cgroup cpuset to %q: %w", affinitySet, err)
}

return nil
}

func (d *lxc) cgroup(cc *liblxc.Container, running bool) (*cgroup.CGroup, error) {
if cc == nil {
return nil, fmt.Errorf("Container not initialized for cgroup")
Expand Down
69 changes: 50 additions & 19 deletions lxd/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1704,27 +1704,11 @@ func (d *qemu) start(stateful bool, op *operationlock.InstanceOperation) error {
return err
}
}
} else {
// Confirm nothing weird is going on.
if len(cpuInfo.vcpus) != len(pids) {
err = fmt.Errorf("QEMU has less vCPUs than configured")
op.Done(err)
return err
}

for i, pid := range pids {
set := unix.CPUSet{}
set.Set(int(cpuInfo.vcpus[uint64(i)]))

// Apply the pin.
err := unix.SchedSetaffinity(pid, &set)
if err != nil {
op.Done(err)
return err
}
}
}

// Trigger a rebalance
cgroup.TaskSchedulerTrigger("virtual-machine", d.name, "started")

// Run monitor hooks from devices.
for _, monHook := range monHooks {
err = monHook(monitor)
Expand Down Expand Up @@ -4930,6 +4914,9 @@ func (d *qemu) Stop(stateful bool) error {
return err
}

// Trigger a rebalance
cgroup.TaskSchedulerTrigger("virtual-machine", d.name, "stopped")

return nil
}

Expand Down Expand Up @@ -5591,6 +5578,8 @@ func (d *qemu) Update(args db.InstanceArgs, userRequested bool) error {
return err
}

cpuLimitWasChanged := false

if isRunning {
// Only certain keys can be changed on a running VM.
liveUpdateKeys := []string{
Expand Down Expand Up @@ -5669,6 +5658,8 @@ func (d *qemu) Update(args db.InstanceArgs, userRequested bool) error {
if err != nil {
return fmt.Errorf("Failed updating cpu limit: %w", err)
}

cpuLimitWasChanged = true
} else if key == "limits.memory" {
err = d.updateMemoryLimit(value)
if err != nil {
Expand Down Expand Up @@ -5792,6 +5783,11 @@ func (d *qemu) Update(args db.InstanceArgs, userRequested bool) error {
// Changes have been applied and recorded, do not revert if an error occurs from here.
revert.Success()

if cpuLimitWasChanged {
// Trigger a scheduler re-run
cgroup.TaskSchedulerTrigger("virtual-machine", d.name, "changed")
}

if isRunning {
// Send devlxd notifications only for user.* key changes
for _, key := range changedConfig {
Expand Down Expand Up @@ -7464,6 +7460,41 @@ func (d *qemu) CGroup() (*cgroup.CGroup, error) {
return nil, instance.ErrNotImplemented
}

// SetAffinity sets affinity for QEMU processes according with a set provided.
func (d *qemu) SetAffinity(set []string) error {
monitor, err := qmp.Connect(d.monitorPath(), qemuSerialChardevName, d.getMonitorEventHandler())
if err != nil {
// this is not an error path, really. Instance can be stopped, for example.
d.logger.Debug("Failed connecting to the QMP monitor. Instance is not running?", logger.Ctx{"name": d.Name(), "err": err})
return nil
}

// Get the list of PIDs from the VM.
pids, err := monitor.GetCPUs()
if err != nil {
return fmt.Errorf("Failed to get VM instance's QEMU process list: %w", err)
}

// Confirm nothing weird is going on.
if len(set) != len(pids) {
return fmt.Errorf("QEMU has less vCPUs (%v) than configured (%v)", pids, set)
}

for i, pid := range pids {
affinitySet := unix.CPUSet{}
cpuCoreIndex, _ := strconv.Atoi(set[i])
affinitySet.Set(cpuCoreIndex)

// Apply the pin.
err := unix.SchedSetaffinity(pid, &affinitySet)
if err != nil {
return fmt.Errorf("Failed to set QEMU process affinity: %w", err)
}
}

return nil
}

// FileSFTPConn returns a connection to the agent SFTP endpoint.
func (d *qemu) FileSFTPConn() (net.Conn, error) {
// VMs, unlike containers, cannot perform file operations if not running and using the lxd-agent.
Expand Down
1 change: 1 addition & 0 deletions lxd/instance/instance_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Instance interface {
// Live configuration.
CGroup() (*cgroup.CGroup, error)
VolatileSet(changes map[string]string) error
SetAffinity(set []string) error

// File handling.
FileSFTPConn() (net.Conn, error)
Expand Down

0 comments on commit 7e48a31

Please sign in to comment.