Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Bug 1850057: update etcd followers first, use bfq on control plane #1946

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/huandu/xstrings v1.2.0 // indirect
github.com/imdario/mergo v0.3.9
github.com/json-iterator/go v1.1.10 // indirect
github.com/magiconair/properties v1.8.1 // indirect
github.com/opencontainers/go-digest v1.0.0
github.com/openshift/api v3.9.1-0.20191111211345-a27ff30ebf09+incompatible
Expand All @@ -55,8 +56,11 @@ require (
github.com/ultraware/funlen v0.0.2 // indirect
github.com/vincent-petithory/dataurl v0.0.0-20160330182126-9a301d65acbb
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/grpc v1.26.0
k8s.io/api v0.18.3
k8s.io/apiextensions-apiserver v0.18.0
k8s.io/apimachinery v0.18.3
Expand Down
87 changes: 12 additions & 75 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions manifests/machineconfigcontroller/events-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: machine-config-controller-events
namespace: {{.TargetNamespace}}
rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
12 changes: 12 additions & 0 deletions manifests/machineconfigcontroller/events-rolebinding-default.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: machine-config-controller-events
namespace: default
roleRef:
kind: ClusterRole
name: machine-config-controller-events
subjects:
- kind: ServiceAccount
namespace: {{.TargetNamespace}}
name: machine-config-controller
12 changes: 12 additions & 0 deletions manifests/machineconfigcontroller/events-rolebinding-target.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: machine-config-controller-events
namespace: {{.TargetNamespace}}
roleRef:
kind: ClusterRole
name: machine-config-controller-events
subjects:
- kind: ServiceAccount
namespace: {{.TargetNamespace}}
name: machine-config-controller
81 changes: 67 additions & 14 deletions pkg/controller/node/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,16 @@ func (ctrl *Controller) addNode(obj interface{}) {
}
}

func (ctrl *Controller) logPool(pool *mcfgv1.MachineConfigPool, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
glog.Infof("Pool %s: %s", pool.Name, msg)
}

func (ctrl *Controller) logPoolNode(pool *mcfgv1.MachineConfigPool, node *corev1.Node, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
glog.Infof("Pool %s: node %s: %s", pool.Name, node.Name, msg)
}

func (ctrl *Controller) updateNode(old, cur interface{}) {
oldNode := old.(*corev1.Node)
curNode := cur.(*corev1.Node)
Expand Down Expand Up @@ -441,31 +451,37 @@ func (ctrl *Controller) updateNode(old, cur interface{}) {
if oldReady != newReady {
changed = true
if newReadyErr != nil {
glog.Infof("Pool %s: node %s is now reporting unready: %v", pool.Name, curNode.Name, newReadyErr)
ctrl.logPoolNode(pool, curNode, "Reporting unready: %v", newReadyErr)
} else {
glog.Infof("Pool %s: node %s is now reporting ready", pool.Name, curNode.Name)
ctrl.logPoolNode(pool, curNode, "Reporting ready")
}
}

// Specifically log when a node has completed an update so the MCC logs are a useful central aggregate of state changes
if oldNode.Annotations[daemonconsts.CurrentMachineConfigAnnotationKey] != oldNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] &&
isNodeDone(curNode) {
glog.Infof("Pool %s: node %s has completed update to %s", pool.Name, curNode.Name, curNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey])
ctrl.logPoolNode(pool, curNode, "Completed update to %s", curNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey])
changed = true
} else {
annos := []string{
daemonconsts.CurrentMachineConfigAnnotationKey,
daemonconsts.DesiredMachineConfigAnnotationKey,
daemonconsts.MachineConfigDaemonStateAnnotationKey,
daemonconsts.UpdateDisruptionScoreAnnotationKey,
}
for _, anno := range annos {
if oldNode.Annotations[anno] != curNode.Annotations[anno] {
glog.Infof("Pool %s: node %s changed %s = %s", pool.Name, curNode.Name, anno, curNode.Annotations[anno])
newValue := curNode.Annotations[anno]
if oldNode.Annotations[anno] != newValue {
ctrl.logPoolNode(pool, curNode, "changed annotation %s = %s", anno, newValue)
changed = true
// For the control plane, emit events for these since they're important
if pool.Name == masterPoolName {
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "AnnotationChange", "Node %s now has %s=%s", curNode.Name, anno, newValue)
}
}
}
if !reflect.DeepEqual(oldNode.Labels, curNode.Labels) {
glog.Infof("Pool %s: node %s changed labels", pool.Name, curNode.Name)
ctrl.logPoolNode(pool, curNode, "changed labels")
changed = true
}
}
Expand Down Expand Up @@ -742,7 +758,7 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error {

candidates, capacity := getAllCandidateMachines(pool, nodes, maxunavail)
if len(candidates) > 0 {
glog.Infof("Pool %s: %d candidate nodes for update, capacity: %d", pool.Name, len(candidates), capacity)
ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(candidates), capacity)
if err := ctrl.updateCandidateMachines(pool, candidates, capacity); err != nil {
if syncErr := ctrl.syncStatusOnly(pool); syncErr != nil {
return goerrs.Wrapf(err, "error setting desired machine config annotation for pool %q, sync error: %v", pool.Name, syncErr)
Expand Down Expand Up @@ -783,7 +799,6 @@ func (ctrl *Controller) getNodesForPool(pool *mcfgv1.MachineConfigPool) ([]*core
}

func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfig string) error {
glog.Infof("Setting node %s to desired config %s", nodeName, currentConfig)
return clientretry.RetryOnConflict(nodeUpdateBackoff, func() error {
oldNode, err := ctrl.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -861,15 +876,43 @@ func getCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.
return nodes[:capacity]
}

// getCurrentEtcdLeader is not yet implemented
// getCurrentEtcdLeader looks at the candidate nodes (should be control plane)
// at the "update disruption score" key which is currently just a function
// of the etcd leader, but in the future we might change to something beyond
// just 0/1 and/or expand beyond the control plane.
func (ctrl *Controller) getCurrentEtcdLeader(candidates []*corev1.Node) (*corev1.Node, error) {
return nil, nil
var leader *corev1.Node
foundAnno := false
key := daemonconsts.UpdateDisruptionScoreAnnotationKey
for _, node := range candidates {
if v, ok := node.Annotations[key]; ok {
if v == "1" {
if leader != nil {
glog.Warningf("Multiple etcd leaders, also found %s", node.Name)
} else {
leader = node
}
} else if v != "0" {
glog.Warningf("Unknown %s %s: %s", node.Name, key, v)
continue
}
foundAnno = true
}
}
if !foundAnno {
return nil, fmt.Errorf("Didn't find annotation %s on any candidate", key)
}
if leader != nil {
// Take me to your leader
return leader, nil
}
return nil, fmt.Errorf("no leader")
}

// filterControlPlaneCandidateNodes adjusts the candidates and capacity specifically
// for the control plane, e.g. based on which node is the etcd leader at the time.
// nolint:unparam
func (ctrl *Controller) filterControlPlaneCandidateNodes(candidates []*corev1.Node, capacity uint) ([]*corev1.Node, uint, error) {
func (ctrl *Controller) filterControlPlaneCandidateNodes(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) ([]*corev1.Node, uint, error) {
if len(candidates) <= 1 {
return candidates, capacity, nil
}
Expand All @@ -880,6 +923,8 @@ func (ctrl *Controller) filterControlPlaneCandidateNodes(candidates []*corev1.No
var newCandidates []*corev1.Node
for _, node := range candidates {
if node == etcdLeader {
// For now make this an event so we know it's working, even though it's more of a non-event
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "DeferringEtcdLeaderUpdate", "Deferring update of etcd leader %s", node.Name)
glog.Infof("Deferring update of etcd leader: %s", node.Name)
continue
}
Expand All @@ -892,23 +937,31 @@ func (ctrl *Controller) filterControlPlaneCandidateNodes(candidates []*corev1.No
func (ctrl *Controller) updateCandidateMachines(pool *mcfgv1.MachineConfigPool, candidates []*corev1.Node, capacity uint) error {
if pool.Name == masterPoolName {
var err error
candidates, capacity, err = ctrl.filterControlPlaneCandidateNodes(candidates, capacity)
candidates, capacity, err = ctrl.filterControlPlaneCandidateNodes(pool, candidates, capacity)
if err != nil {
return err
}
// In practice right now these counts will be 1 but let's stay general to support 5 etcd nodes in the future
glog.Infof("Pool %s: filtered to %d candidate nodes for update, capacity: %d", pool.Name, len(candidates), capacity)
ctrl.logPool(pool, "filtered to %d candidate nodes for update, capacity: %d", len(candidates), capacity)
}
if capacity < uint(len(candidates)) {
// Arbitrarily pick the first N candidates; no attempt at sorting.
// Perhaps later we allow admins to weight somehow, or do something more intelligent.
candidates = candidates[:capacity]
}
targetConfig := pool.Spec.Configuration.Name
for _, node := range candidates {
if err := ctrl.setDesiredMachineConfigAnnotation(node.Name, pool.Spec.Configuration.Name); err != nil {
ctrl.logPool(pool, "Setting node %s target to %s", node.Name, targetConfig)
if err := ctrl.setDesiredMachineConfigAnnotation(node.Name, targetConfig); err != nil {
return goerrs.Wrapf(err, "setting desired config for node %s", node.Name)
}
}
if len(candidates) == 1 {
candidate := candidates[0]
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "SetDesiredConfig", "Targeted node %s to config %s", candidate.Name, targetConfig)
} else {
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "SetDesiredConfig", "Set target for %d nodes to config %s", targetConfig)
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/node/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func (ctrl *Controller) syncStatusOnly(pool *mcfgv1.MachineConfigPool) error {
newPool := pool
newPool.Status = newStatus
_, err = ctrl.client.MachineconfigurationV1().MachineConfigPools().UpdateStatus(context.TODO(), newPool, metav1.UpdateOptions{})
if pool.Spec.Configuration.Name != newPool.Spec.Configuration.Name {
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "Updating", "Pool %s now targeting %s", pool.Name, newPool.Spec.Configuration.Name)
}
if pool.Status.Configuration.Name != newPool.Status.Configuration.Name {
ctrl.eventRecorder.Eventf(pool, corev1.EventTypeNormal, "Completed", "Pool %s has completed update to %s", pool.Name, newPool.Status.Configuration.Name)
}
return err
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/daemon/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
MachineConfigDaemonStateAnnotationKey = "machineconfiguration.openshift.io/state"
// OpenShiftOperatorManagedLabel is used to filter out kube objects that don't need to be synced by the MCO
OpenShiftOperatorManagedLabel = "openshift.io/operator-managed"
// UpdateDisruptionScore is the MCD's estimate of node update disruption; currently
// it is 1 for the etcd leader on control plane nodes, and 0 otherwise.
UpdateDisruptionScoreAnnotationKey = "machineconfiguration.openshift.io/uds"
// MachineConfigDaemonStateWorking is set by daemon when it is applying an update.
MachineConfigDaemonStateWorking = "Working"
// MachineConfigDaemonStateDone is set by daemon when it is done applying an update.
Expand Down
92 changes: 92 additions & 0 deletions pkg/daemon/controlplane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package daemon

// This file provides changes that we make to the control plane
// only.

import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"

"github.com/golang/glog"
"github.com/pkg/errors"
)

// setRootDeviceSchedulerBFQ switches to the `bfq` I/O scheduler
// for the root block device to better share I/O between etcd
// and other processes. See
// https://github.com/openshift/machine-config-operator/issues/1897
// Note this is the current systemd default in Fedora, but not RHEL8,
// except for NVMe devices.
func setRootDeviceSchedulerBFQ() error {
sched := "bfq"

rootDevSysfs, err := getRootBlockDeviceSysfs()
if err != nil {
return err
}

schedulerPath := filepath.Join(rootDevSysfs, "/queue/scheduler")
schedulerContentsBuf, err := ioutil.ReadFile(schedulerPath)
if err != nil {
return err
}
schedulerContents := string(schedulerContentsBuf)
if strings.Contains(schedulerContents, fmt.Sprintf("[%s]", sched)) {
glog.Infof("Device %s already uses scheduler %s", rootDevSysfs, sched)
return nil
}

f, err := os.OpenFile(schedulerPath, os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write([]byte(sched))
if err != nil {
return err
}
glog.Infof("Set root blockdev %s to use scheduler %v", rootDevSysfs, sched)

return nil
}

// updateOstreeObjectSync enables "per-object-fsync" which helps avoid
// latency spikes for etcd; see https://github.com/ostreedev/ostree/pull/2152
func updateOstreeObjectSync() error {
if err := exec.Command("ostree", "--repo=/sysroot/ostree/repo", "config", "set", "core.per-object-fsync", "true").Run(); err != nil {
return errors.Wrapf(err, "Failed to set per-object-fsync for ostree")
}
return nil
}

// initializeControlPlane performs setup for the node that should
// only occur on the control plane. Currently this switches the IO
// scheduler and starts a goroutine acting as a small controller
// for reflecting the etcd leader status in the node object to help
// the MCC coordinate control plane updates.
func (dn *Daemon) initializeControlPlane() error {
if err := setRootDeviceSchedulerBFQ(); err != nil {
return err
}
if err := updateOstreeObjectSync(); err != nil {
return err
}
ioniceEtcd(dn.stopCh)
go func() {
c := watchCurrentEtcdLeader(dn.stopCh)
for {
select {
case leader := <-c:
glog.Infof("node is etcd leader: %v", leader)
dn.nodeWriter.SetEtcdLeader(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name, leader)
case <-dn.stopCh:
return
}
}
}()
return nil
}
52 changes: 52 additions & 0 deletions pkg/daemon/coreos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package daemon

// This file provides routines that apply on Fedora CoreOS style systems,
// including deriviatives like RHEL CoreOS.

import (
"fmt"
"os"
"path/filepath"

"github.com/pkg/errors"
)

// byLabel returns the udev generated symlink to the block device with the given label
func byLabel(label string) string {
return fmt.Sprintf("/dev/disk/by-label/%s", label)
}

// getParentDeviceSysfs returns e.g. /sys/devices/pci0000:00/0000:00:05.0/virtio2/block/vda from /dev/vda4, though
// it can be more complex than that with e.g. NVMe.
func getParentDeviceSysfs(device string) (string, error) {
target, err := os.Readlink(device)
if err != nil {
return "", errors.Wrapf(err, "reading %s", device)
}
sysfsDevLink := fmt.Sprintf("/sys/class/block/%s", filepath.Base(target))
sysfsDev, err := filepath.EvalSymlinks(sysfsDevLink)
if err != nil {
return "", errors.Wrapf(err, "parsing %s", sysfsDevLink)
}
if _, err := os.Stat(filepath.Join(sysfsDev, "partition")); err == nil {
sysfsDev = filepath.Dir(sysfsDev)
}
return sysfsDev, nil
}

// getRootBlockDeviceSysfs returns the path to the block
// device backing the root partition on a FCOS system
func getRootBlockDeviceSysfs() (string, error) {
// Check for the `crypt_rootfs` label; this exists for RHCOS >= 4.3 but <= 4.6.
// See https://github.com/openshift/enhancements/blob/master/enhancements/rhcos/automated-policy-based-disk-encryption.md
luksRoot := byLabel("crypt_rootfs")
if _, err := os.Stat(luksRoot); err == nil {
return getParentDeviceSysfs(luksRoot)
}
// This is what we expect on FCOS and RHCOS <= 4.2
root := byLabel("root")
if _, err := os.Stat(root); err == nil {
return getParentDeviceSysfs(root)
}
return "", fmt.Errorf("Failed to find %s", root)
}
Loading