From f431ee289b8cfb3c7c10eff3add5abe730165642 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 10 Jun 2017 15:23:03 +0000 Subject: [PATCH 01/16] Refactor: create config in main function --- prog/kube-peers/main.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index a0b9cd85f5..75d7a6703f 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -10,16 +10,8 @@ import ( "k8s.io/client-go/rest" ) -func getKubePeers() ([]string, error) { - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - c, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - nodeList, err := c.Core().Nodes().List(api.ListOptions{}) +func getKubePeers(c *kubernetes.Clientset) ([]string, error) { + nodeList, err := c.Nodes().List(api.ListOptions{}) if err != nil { return nil, err } @@ -50,7 +42,15 @@ func getKubePeers() ([]string, error) { } func main() { - peers, err := getKubePeers() + config, err := rest.InClusterConfig() + if err != nil { + log.Fatalf("Could not get cluster config: %v", err) + } + c, err := kubernetes.NewForConfig(config) + if err != nil { + log.Fatalf("Could not make Kubernetes connection: %v", err) + } + peers, err := getKubePeers(c) if err != nil { log.Fatalf("Could not get peers: %v", err) } From ed8d92cfc62a7f9f58d785ea739fb5767522d9ff Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 11 Jun 2017 20:32:44 +0000 Subject: [PATCH 02/16] RBAC to read and write ConfigMaps --- prog/weave-kube/weave-daemonset-k8s-1.6.yaml | 38 ++++++++++++++++++++ prog/weave-kube/weave-daemonset-k8s-1.7.yaml | 38 ++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/prog/weave-kube/weave-daemonset-k8s-1.6.yaml b/prog/weave-kube/weave-daemonset-k8s-1.6.yaml index 243facef2e..e7b0e18dcf 100644 --- a/prog/weave-kube/weave-daemonset-k8s-1.6.yaml +++ b/prog/weave-kube/weave-daemonset-k8s-1.6.yaml @@ -47,6 +47,44 @@ items: - kind: ServiceAccount name: weave-net namespace: kube-system + - apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: Role + metadata: + name: weave-net-kube-peer + namespace: kube-system + labels: + name: weave-net-kube-peer + rules: + - apiGroups: + - '' + resources: + - configmaps + resourceNames: + - weave-net + verbs: + - get + - update + - apiGroups: + - '' + resources: + - configmaps + verbs: + - create + - apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: RoleBinding + metadata: + name: weave-net-kube-peer + namespace: kube-system + labels: + name: weave-net-kube-peer + roleRef: + kind: Role + name: weave-net-kube-peer + apiGroup: rbac.authorization.k8s.io + subjects: + - kind: ServiceAccount + name: weave-net + namespace: kube-system - apiVersion: extensions/v1beta1 kind: DaemonSet metadata: diff --git a/prog/weave-kube/weave-daemonset-k8s-1.7.yaml b/prog/weave-kube/weave-daemonset-k8s-1.7.yaml index 4cb9f35126..8100b60849 100644 --- a/prog/weave-kube/weave-daemonset-k8s-1.7.yaml +++ b/prog/weave-kube/weave-daemonset-k8s-1.7.yaml @@ -55,6 +55,44 @@ items: - kind: ServiceAccount name: weave-net namespace: kube-system + - apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: Role + metadata: + name: weave-net + namespace: kube-system + labels: + name: weave-net + rules: + - apiGroups: + - '' + resources: + - configmaps + resourceNames: + - weave-net + verbs: + - get + - update + - apiGroups: + - '' + resources: + - configmaps + verbs: + - create + - apiVersion: rbac.authorization.k8s.io/v1beta1 + kind: RoleBinding + metadata: + name: weave-net + namespace: kube-system + labels: + name: weave-net + roleRef: + kind: Role + name: weave-net + apiGroup: rbac.authorization.k8s.io + subjects: + - kind: ServiceAccount + name: weave-net + namespace: kube-system - apiVersion: extensions/v1beta1 kind: DaemonSet metadata: From 71b39489da365a47c10af2de468cf9f64b958138 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 10 Jun 2017 19:05:39 +0000 Subject: [PATCH 03/16] Maintain a list of peers in a Kubernetes annotation This is prework for a feature which will remove 'dead' peers --- prog/kube-peers/annotations.go | 114 +++++++++++++++++++++++++++++++++ prog/kube-peers/main.go | 65 +++++++++++++++++-- prog/weave-kube/launch.sh | 2 + 3 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 prog/kube-peers/annotations.go diff --git a/prog/kube-peers/annotations.go b/prog/kube-peers/annotations.go new file mode 100644 index 0000000000..e343769c48 --- /dev/null +++ b/prog/kube-peers/annotations.go @@ -0,0 +1,114 @@ +package main + +import ( + "encoding/json" + + "github.com/pkg/errors" + + v1 "k8s.io/api/core/v1" + kubeErrors "k8s.io/apimachinery/pkg/api/errors" + api "k8s.io/apimachinery/pkg/apis/meta/v1" + kubernetes "k8s.io/client-go/kubernetes" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type configMapAnnotations struct { + Name string + Namespace string + Client corev1client.ConfigMapsGetter + cm *v1.ConfigMap +} + +func newConfigMapAnnotations(ns string, name string, client *kubernetes.Clientset) *configMapAnnotations { + return &configMapAnnotations{ + Namespace: ns, + Name: name, + Client: client.CoreV1(), + } +} + +type peerList struct { + Peers []peerInfo +} + +type peerInfo struct { + PeerName string // Weave internal unique ID + Name string // Kubernetes node name +} + +func (pl peerList) contains(peerName string) bool { + for _, peer := range pl.Peers { + if peer.PeerName == peerName { + return true + } + } + return false +} + +func (pl *peerList) add(peerName string, name string) { + pl.Peers = append(pl.Peers, peerInfo{PeerName: peerName, Name: name}) +} + +const ( + // KubePeersAnnotationKey is the default annotation key + KubePeersAnnotationKey = "kube-peers.weave.works/peers" +) + +func (cml *configMapAnnotations) Init() error { + for { + // Since it's potentially racy to GET, then CREATE if not found, we wrap in a check loop + // so that if the configmap is created after our GET but before or CREATE, we'll gracefully + // re-try to get the configmap. + var err error + cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Get(cml.Name, api.GetOptions{}) + if err != nil { + if !kubeErrors.IsNotFound(err) { + return errors.Wrapf(err, "Unable to fetch ConfigMap %s/%s", cml.Namespace, cml.Name) + } + cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Create(&v1.ConfigMap{ + ObjectMeta: api.ObjectMeta{ + Name: cml.Name, + Namespace: cml.Namespace, + }, + }) + if err != nil { + if kubeErrors.IsAlreadyExists(err) { + continue + } + return errors.Wrapf(err, "Unable to create ConfigMap %s/%s", cml.Namespace, cml.Name) + } + } + break + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + return nil +} + +func (cml *configMapAnnotations) GetPeerList() (*peerList, error) { + var record peerList + if cml.cm == nil { + return nil, errors.New("endpoint not initialized, call Init first") + } + if recordBytes, found := cml.cm.Annotations[KubePeersAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Update will update and existing annotation on a given resource. +func (cml *configMapAnnotations) UpdatePeerList(list peerList) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call Init first") + } + recordBytes, err := json.Marshal(list) + if err != nil { + return err + } + cml.cm.Annotations[KubePeersAnnotationKey] = string(recordBytes) + cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + return err +} diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 75d7a6703f..b966e86bc2 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "fmt" "log" "net" @@ -10,12 +11,18 @@ import ( "k8s.io/client-go/rest" ) -func getKubePeers(c *kubernetes.Clientset) ([]string, error) { - nodeList, err := c.Nodes().List(api.ListOptions{}) +type nodeInfo struct { + name string + addr string +} + +// return the IP addresses of all nodes in the cluster +func getKubePeers(c *kubernetes.Clientset) ([]nodeInfo, error) { + nodeList, err := c.CoreV1().Nodes().List(api.ListOptions{}) if err != nil { return nil, err } - addresses := make([]string, 0, len(nodeList.Items)) + addresses := make([]nodeInfo, 0, len(nodeList.Items)) for _, peer := range nodeList.Items { var internalIP, externalIP string for _, addr := range peer.Status.Addresses { @@ -33,15 +40,51 @@ func getKubePeers(c *kubernetes.Clientset) ([]string, error) { // Fallback for cases where a Node has an ExternalIP but no InternalIP if internalIP != "" { - addresses = append(addresses, internalIP) + addresses = append(addresses, nodeInfo{name: peer.Name, addr: internalIP}) } else if externalIP != "" { - addresses = append(addresses, externalIP) + addresses = append(addresses, nodeInfo{name: peer.Name, addr: externalIP}) } } return addresses, nil } +const ( + configMapName = "weave-net" + configMapNamespace = "kube-system" +) + +// update the list of all peers that have gone through this code path +func addMyselfToPeerList(c *kubernetes.Clientset, peerName, name string) (*peerList, error) { + cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) + if err := cml.Init(); err != nil { + return nil, err + } + list, err := cml.GetPeerList() + if err != nil { + return nil, err + } + log.Println("Fetched existing peer list", list) + if !list.contains(peerName) { + list.add(peerName, name) + log.Println("Storing new peer list", list) + err = cml.UpdatePeerList(*list) + if err != nil { + return nil, err + } + } + return list, nil +} + func main() { + var ( + justReclaim bool + peerName string + nodeName string + ) + flag.StringVar(&peerName, "peer-name", "unknown", "name of this Weave Net peer") + flag.StringVar(&nodeName, "node-name", "unknown", "name of this Kubernetes node") + flag.Parse() + config, err := rest.InClusterConfig() if err != nil { log.Fatalf("Could not get cluster config: %v", err) @@ -54,7 +97,15 @@ func main() { if err != nil { log.Fatalf("Could not get peers: %v", err) } - for _, addr := range peers { - fmt.Println(addr) + if justReclaim { + log.Println("Checking if any peers need to be reclaimed") + _, err := addMyselfToPeerList(c, peerName, nodeName) + if err != nil { + log.Fatalf("Could not get peer list: %v", err) + } + return + } + for _, node := range peers { + fmt.Println(node.addr) } } diff --git a/prog/weave-kube/launch.sh b/prog/weave-kube/launch.sh index b44222bc4e..9111fbe1bd 100755 --- a/prog/weave-kube/launch.sh +++ b/prog/weave-kube/launch.sh @@ -108,6 +108,8 @@ post_start_actions() { export HOST_ROOT /home/weave/weave --local setup-cni + /home/weave/kube-peers -reclaim -node-name="$HOSTNAME" -peer-name="$(cat /sys/class/net/weave/address)" + # Expose the weave network so host processes can communicate with pods /home/weave/weave --local expose $WEAVE_EXPOSE_IP } From 497dddabc110e99e9a126e17a1aba87bcc7d6e17 Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Tue, 31 Oct 2017 14:11:02 +0000 Subject: [PATCH 04/16] Improve config locking by using optimistic locking strategy Loop using jitter until the update succeeds --- prog/kube-peers/annotations.go | 36 +++++++++++++++++++++++++++++++--- prog/kube-peers/main.go | 32 ++++++++++++++++-------------- 2 files changed, 50 insertions(+), 18 deletions(-) diff --git a/prog/kube-peers/annotations.go b/prog/kube-peers/annotations.go index e343769c48..1cdca11da1 100644 --- a/prog/kube-peers/annotations.go +++ b/prog/kube-peers/annotations.go @@ -2,12 +2,15 @@ package main import ( "encoding/json" + "log" + "time" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" kubeErrors "k8s.io/apimachinery/pkg/api/errors" api "k8s.io/apimachinery/pkg/apis/meta/v1" + wait "k8s.io/apimachinery/pkg/util/wait" kubernetes "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -50,6 +53,9 @@ func (pl *peerList) add(peerName string, name string) { } const ( + retryPeriod = time.Second * 2 + jitterFactor = 1.0 + // KubePeersAnnotationKey is the default annotation key KubePeersAnnotationKey = "kube-peers.weave.works/peers" ) @@ -99,7 +105,6 @@ func (cml *configMapAnnotations) GetPeerList() (*peerList, error) { return &record, nil } -// Update will update and existing annotation on a given resource. func (cml *configMapAnnotations) UpdatePeerList(list peerList) error { if cml.cm == nil { return errors.New("endpoint not initialized, call Init first") @@ -108,7 +113,32 @@ func (cml *configMapAnnotations) UpdatePeerList(list peerList) error { if err != nil { return err } - cml.cm.Annotations[KubePeersAnnotationKey] = string(recordBytes) - cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + cm := cml.cm + cm.Annotations[KubePeersAnnotationKey] = string(recordBytes) + cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + if err == nil { + cml.cm = cm + } + return err +} + +// Loop with jitter, fetching the cml data and calling f() until it +// doesn't get an optimistic locking conflict. +// If it succeeds or gets any other kind of error, stop the loop. +func (cml *configMapAnnotations) LoopUpdate(f func() error) error { + stop := make(chan struct{}) + var err error + wait.JitterUntil(func() { + if err = cml.Init(); err != nil { + close(stop) + return + } + err = f() + if err != nil && kubeErrors.IsConflict(err) { + log.Printf("Optimistic locking conflict: trying again: %s", err) + return + } + close(stop) + }, retryPeriod, jitterFactor, true, stop) return err } diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index b966e86bc2..4f5bab30c5 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -56,23 +56,25 @@ const ( // update the list of all peers that have gone through this code path func addMyselfToPeerList(c *kubernetes.Clientset, peerName, name string) (*peerList, error) { cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) - if err := cml.Init(); err != nil { - return nil, err - } - list, err := cml.GetPeerList() - if err != nil { - return nil, err - } - log.Println("Fetched existing peer list", list) - if !list.contains(peerName) { - list.add(peerName, name) - log.Println("Storing new peer list", list) - err = cml.UpdatePeerList(*list) + var list *peerList + err := cml.LoopUpdate(func() error { + var err error + list, err = cml.GetPeerList() if err != nil { - return nil, err + return err } - } - return list, nil + log.Println("Fetched existing peer list", list) + if !list.contains(peerName) { + list.add(peerName, name) + log.Println("Storing new peer list", list) + err = cml.UpdatePeerList(*list) + if err != nil { + return err + } + } + return nil + }) + return list, err } func main() { From 267bd8b51dec3ca927ba8b3aa96257dbf5c152d9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 13 Nov 2017 14:05:11 +0000 Subject: [PATCH 05/16] Pseudo-code to clean up dead peers --- prog/kube-peers/main.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 4f5bab30c5..90572c2d3e 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -77,12 +77,41 @@ func addMyselfToPeerList(c *kubernetes.Clientset, peerName, name string) (*peerL return list, err } +// For each of those peers that is no longer listed as a node by +// Kubernetes, remove it from Weave IPAM +func reclaimRemovedPeers(apl *peerList, nodes []nodeInfo) error { + // TODO + // Outline of function: + // 1. Compare peers stored in the peerList against all peers reported by k8s now. + // 2. Loop for each X in the first set and not in the second - we wish to remove X from our data structures + // 3. Check if there is an existing annotation with key X + // 4. If annotation already contains my identity, ok; + // 5. If non-existent, write an annotation with key X and contents "my identity" + // 6. If step 4 or 5 succeeded, rmpeer X + // 7aa. Remove any annotations Z* that have contents X + // 7a. Remove X from peerList + // 7b. Remove annotation with key X + // 8. If step 5 failed due to optimistic lock conflict, stop: someone else is handling X + // 9. Go back to step 1 until there is no difference between the two sets + + // Step 3-5 is to protect against two simultaneous rmpeers of X + // Step 4 is to pick up again after a restart between step 5 and step 7b + // If the peer doing the reclaim disappears between steps 5 and 7a, then someone will clean it up in step 7aa + // If peer doing the reclaim disappears forever between 7a and 7b then we get a dangling annotation + // This should be sufficiently rare that we don't care. + + // Question: Should we narrow step 2 by checking against Weave Net IPAM? + // i.e. If peer X owns any address space and is marked unreachable, we want to rmpeer X + return nil +} + func main() { var ( justReclaim bool peerName string nodeName string ) + flag.BoolVar(&justReclaim, "reclaim", false, "reclaim IP space from dead peers") flag.StringVar(&peerName, "peer-name", "unknown", "name of this Weave Net peer") flag.StringVar(&nodeName, "node-name", "unknown", "name of this Kubernetes node") flag.Parse() @@ -105,6 +134,10 @@ func main() { if err != nil { log.Fatalf("Could not get peer list: %v", err) } + err = reclaimRemovedPeers(list, peers) + if err != nil { + log.Fatalf("Error while reclaiming space: %v", err) + } return } for _, node := range peers { From a552f194b42d99de83d5f6bf2e6a8c286610b731 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 20 Jul 2017 18:13:30 +0000 Subject: [PATCH 06/16] Code to reclaim IP address space from removed peers --- api/ipam.go | 6 ++ prog/kube-peers/annotations.go | 58 ++++++++++++++++--- prog/kube-peers/main.go | 101 +++++++++++++++++++++++++-------- 3 files changed, 134 insertions(+), 31 deletions(-) diff --git a/api/ipam.go b/api/ipam.go index ea25b618e3..4ba34ac96d 100644 --- a/api/ipam.go +++ b/api/ipam.go @@ -62,6 +62,12 @@ func (client *Client) ReleaseIPsFor(ID string) error { return err } +// release all IP space owned by a peer +func (client *Client) RmPeer(peerName string) (string, error) { + result, err := client.httpVerb("DELETE", fmt.Sprintf("/peer/%s", peerName), nil) + return result, err +} + func (client *Client) DefaultSubnet() (*net.IPNet, error) { cidr, err := client.httpVerb("GET", fmt.Sprintf("/ipinfo/defaultsubnet"), nil) if err != nil { diff --git a/prog/kube-peers/annotations.go b/prog/kube-peers/annotations.go index 1cdca11da1..7c9064bea5 100644 --- a/prog/kube-peers/annotations.go +++ b/prog/kube-peers/annotations.go @@ -36,7 +36,7 @@ type peerList struct { type peerInfo struct { PeerName string // Weave internal unique ID - Name string // Kubernetes node name + NodeName string // Kubernetes node name } func (pl peerList) contains(peerName string) bool { @@ -49,7 +49,17 @@ func (pl peerList) contains(peerName string) bool { } func (pl *peerList) add(peerName string, name string) { - pl.Peers = append(pl.Peers, peerInfo{PeerName: peerName, Name: name}) + pl.Peers = append(pl.Peers, peerInfo{PeerName: peerName, NodeName: name}) +} + +func (pl *peerList) remove(peerNameToRemove string) { + for i := 0; i < len(pl.Peers); { + if pl.Peers[i].PeerName == peerNameToRemove { + pl.Peers = append(pl.Peers[:i], pl.Peers[i+1:]...) + } else { + i++ + } + } } const ( @@ -106,16 +116,50 @@ func (cml *configMapAnnotations) GetPeerList() (*peerList, error) { } func (cml *configMapAnnotations) UpdatePeerList(list peerList) error { - if cml.cm == nil { - return errors.New("endpoint not initialized, call Init first") - } recordBytes, err := json.Marshal(list) if err != nil { return err } + return cml.UpdateAnnotation(KubePeersAnnotationKey, string(recordBytes)) +} + +func (cml *configMapAnnotations) UpdateAnnotation(key, value string) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call Init first") + } + cm := cml.cm + cm.Annotations[key] = value + cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + if err == nil { + cml.cm = cm + } + return err +} + +func (cml *configMapAnnotations) RemoveAnnotation(key string) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call Init first") + } cm := cml.cm - cm.Annotations[KubePeersAnnotationKey] = string(recordBytes) - cm, err = cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + delete(cm.Annotations, key) + cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) + if err == nil { + cml.cm = cm + } + return err +} + +func (cml *configMapAnnotations) RemoveAnnotationsWithValue(valueToRemove string) error { + if cml.cm == nil { + return errors.New("endpoint not initialized, call Init first") + } + cm := cml.cm + for key, value := range cm.Annotations { + if value == valueToRemove { + delete(cm.Annotations, key) + } + } + cm, err := cml.Client.ConfigMaps(cml.Namespace).Update(cml.cm) if err == nil { cml.cm = cm } diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 90572c2d3e..12e74fae3b 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -5,10 +5,14 @@ import ( "fmt" "log" "net" + "os" api "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + + weaveapi "github.com/weaveworks/weave/api" + "github.com/weaveworks/weave/common" ) type nodeInfo struct { @@ -54,8 +58,7 @@ const ( ) // update the list of all peers that have gone through this code path -func addMyselfToPeerList(c *kubernetes.Clientset, peerName, name string) (*peerList, error) { - cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) +func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, peerName, name string) (*peerList, error) { var list *peerList err := cml.LoopUpdate(func() error { var err error @@ -79,27 +82,75 @@ func addMyselfToPeerList(c *kubernetes.Clientset, peerName, name string) (*peerL // For each of those peers that is no longer listed as a node by // Kubernetes, remove it from Weave IPAM -func reclaimRemovedPeers(apl *peerList, nodes []nodeInfo) error { - // TODO - // Outline of function: - // 1. Compare peers stored in the peerList against all peers reported by k8s now. - // 2. Loop for each X in the first set and not in the second - we wish to remove X from our data structures - // 3. Check if there is an existing annotation with key X - // 4. If annotation already contains my identity, ok; - // 5. If non-existent, write an annotation with key X and contents "my identity" - // 6. If step 4 or 5 succeeded, rmpeer X - // 7aa. Remove any annotations Z* that have contents X - // 7a. Remove X from peerList - // 7b. Remove annotation with key X - // 8. If step 5 failed due to optimistic lock conflict, stop: someone else is handling X - // 9. Go back to step 1 until there is no difference between the two sets +func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, nodes []nodeInfo, myPeerName string) error { + for { + // 1. Compare peers stored in the peerList against all peers reported by k8s now. + storedPeerList, err := cml.GetPeerList() + if err != nil { + return err + } + peerMap := make(map[string]peerInfo, len(storedPeerList.Peers)) + for _, peer := range storedPeerList.Peers { + peerMap[peer.NodeName] = peer + } + for _, node := range nodes { + delete(peerMap, node.name) + } + log.Println("Nodes that have disappeared:", peerMap) + if len(peerMap) == 0 { + break + } + // 2. Loop for each X in the first set and not in the second - we wish to remove X from our data structures + for _, peer := range peerMap { + common.Log.Debugln("Preparing to remove disappeared peer", peer) + okToRemove := false + // 3. Check if there is an existing annotation with key X + if existingAnnotation, found := cml.cm.Annotations[peer.PeerName]; found { + common.Log.Debugln("Existing annotation", existingAnnotation) + // 4. If annotation already contains my identity, ok; + if existingAnnotation == myPeerName { + okToRemove = true + } + } else { + // 5. If non-existent, write an annotation with key X and contents "my identity" + common.Log.Debugln("Noting I plan to remove ", peer.PeerName) + if err := cml.UpdateAnnotation(peer.PeerName, myPeerName); err == nil { + okToRemove = true + } + } + if okToRemove { + // 6. If step 4 or 5 succeeded, rmpeer X + result, err := weave.RmPeer(peer.PeerName) + if err != nil { + return err + } + log.Println("rmpeer of", peer.PeerName, ":", result) + cml.LoopUpdate(func() error { + // 7aa. Remove any annotations Z* that have contents X + if err := cml.RemoveAnnotationsWithValue(peer.PeerName); err != nil { + return err + } + // 7a. Remove X from peerList + storedPeerList.remove(peer.PeerName) + if err := cml.UpdatePeerList(*storedPeerList); err != nil { + return err + } + // 7b. Remove annotation with key X + return cml.RemoveAnnotation(peer.PeerName) + }) + common.Log.Debugln("Finished removal of ", peer.PeerName) + } + // 8. If step 5 failed due to optimistic lock conflict, stop: someone else is handling X - // Step 3-5 is to protect against two simultaneous rmpeers of X - // Step 4 is to pick up again after a restart between step 5 and step 7b - // If the peer doing the reclaim disappears between steps 5 and 7a, then someone will clean it up in step 7aa - // If peer doing the reclaim disappears forever between 7a and 7b then we get a dangling annotation - // This should be sufficiently rare that we don't care. + // Step 3-5 is to protect against two simultaneous rmpeers of X + // Step 4 is to pick up again after a restart between step 5 and step 7b + // If the peer doing the reclaim disappears between steps 5 and 7a, then someone will clean it up in step 7aa + // If peer doing the reclaim disappears forever between 7a and 7b then we get a dangling annotation + // This should be sufficiently rare that we don't care. + } + // 9. Go back to step 1 until there is no difference between the two sets + } // Question: Should we narrow step 2 by checking against Weave Net IPAM? // i.e. If peer X owns any address space and is marked unreachable, we want to rmpeer X return nil @@ -129,12 +180,14 @@ func main() { log.Fatalf("Could not get peers: %v", err) } if justReclaim { - log.Println("Checking if any peers need to be reclaimed") - _, err := addMyselfToPeerList(c, peerName, nodeName) + cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) + common.Log.Infoln("Adding myself to peer list") + _, err := addMyselfToPeerList(cml, c, peerName, nodeName) if err != nil { log.Fatalf("Could not get peer list: %v", err) } - err = reclaimRemovedPeers(list, peers) + weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log) + err = reclaimRemovedPeers(weave, cml, peers, peerName) if err != nil { log.Fatalf("Error while reclaiming space: %v", err) } From 6a0e39e5eee7fcdd622fffe6545481ca10e94f08 Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Tue, 31 Oct 2017 15:35:53 +0000 Subject: [PATCH 07/16] Minor refactor for clarity --- prog/kube-peers/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 12e74fae3b..6589e74aee 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -59,6 +59,7 @@ const ( // update the list of all peers that have gone through this code path func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, peerName, name string) (*peerList, error) { + common.Log.Infoln("Adding myself to peer list") var list *peerList err := cml.LoopUpdate(func() error { var err error @@ -181,11 +182,12 @@ func main() { } if justReclaim { cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) - common.Log.Infoln("Adding myself to peer list") + _, err := addMyselfToPeerList(cml, c, peerName, nodeName) if err != nil { log.Fatalf("Could not get peer list: %v", err) } + weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log) err = reclaimRemovedPeers(weave, cml, peers, peerName) if err != nil { From cb0c9e5fba1a00a69bcac52ca5a038ad5ed1b219 Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Tue, 31 Oct 2017 14:52:14 +0000 Subject: [PATCH 08/16] Document: Add file-level docstrings for kube-peers --- prog/kube-peers/annotations.go | 7 +++++++ prog/kube-peers/main.go | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/prog/kube-peers/annotations.go b/prog/kube-peers/annotations.go index 7c9064bea5..fae131e4ce 100644 --- a/prog/kube-peers/annotations.go +++ b/prog/kube-peers/annotations.go @@ -1,3 +1,10 @@ +/* +In order to keep track of active weave peers, we use annotations on the Kubernetes cluster. + +Kubernetes uses etcd to distribute and synchronise these annotations so we don't have to. + +This module deals with operations on the peerlist backed by Kubernetes' annotation mechanism. +*/ package main import ( diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 6589e74aee..81bfda9da0 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -1,3 +1,8 @@ +/* +Package main deals with weave-net peers on the cluster. + +This involves peer management, such as getting the latest peers or removing defunct peers from the cluster +*/ package main import ( From fd2f91e2eeef12121a7f1d0d36f5b4c500c27a9c Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Tue, 31 Oct 2017 14:53:15 +0000 Subject: [PATCH 09/16] Refactor: Improve naming --- prog/kube-peers/annotations.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/prog/kube-peers/annotations.go b/prog/kube-peers/annotations.go index fae131e4ce..dd8b2db8cd 100644 --- a/prog/kube-peers/annotations.go +++ b/prog/kube-peers/annotations.go @@ -23,17 +23,17 @@ import ( ) type configMapAnnotations struct { - Name string - Namespace string - Client corev1client.ConfigMapsGetter - cm *v1.ConfigMap + ConfigMapName string + Namespace string + Client corev1client.ConfigMapsGetter + cm *v1.ConfigMap } -func newConfigMapAnnotations(ns string, name string, client *kubernetes.Clientset) *configMapAnnotations { +func newConfigMapAnnotations(ns string, configMapName string, clientset *kubernetes.Clientset) *configMapAnnotations { return &configMapAnnotations{ - Namespace: ns, - Name: name, - Client: client.CoreV1(), + Namespace: ns, + ConfigMapName: configMapName, + Client: clientset.CoreV1(), } } @@ -83,14 +83,14 @@ func (cml *configMapAnnotations) Init() error { // so that if the configmap is created after our GET but before or CREATE, we'll gracefully // re-try to get the configmap. var err error - cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Get(cml.Name, api.GetOptions{}) + cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Get(cml.ConfigMapName, api.GetOptions{}) if err != nil { if !kubeErrors.IsNotFound(err) { - return errors.Wrapf(err, "Unable to fetch ConfigMap %s/%s", cml.Namespace, cml.Name) + return errors.Wrapf(err, "Unable to fetch ConfigMap %s/%s", cml.Namespace, cml.ConfigMapName) } cml.cm, err = cml.Client.ConfigMaps(cml.Namespace).Create(&v1.ConfigMap{ ObjectMeta: api.ObjectMeta{ - Name: cml.Name, + Name: cml.ConfigMapName, Namespace: cml.Namespace, }, }) @@ -98,7 +98,7 @@ func (cml *configMapAnnotations) Init() error { if kubeErrors.IsAlreadyExists(err) { continue } - return errors.Wrapf(err, "Unable to create ConfigMap %s/%s", cml.Namespace, cml.Name) + return errors.Wrapf(err, "Unable to create ConfigMap %s/%s", cml.Namespace, cml.ConfigMapName) } } break From 3afb8d3ddfdba0e9c66ededcce7b2c72ba190806 Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Tue, 31 Oct 2017 15:35:22 +0000 Subject: [PATCH 10/16] Add log messages to detect dying peer edge case --- prog/kube-peers/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 81bfda9da0..062a93ac60 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -138,9 +138,11 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node } // 7a. Remove X from peerList storedPeerList.remove(peer.PeerName) + common.Log.Infoln("Removing peer ", peer.PeerName, ". Expecting to remove linked annotation next.") if err := cml.UpdatePeerList(*storedPeerList); err != nil { return err } + common.Log.Infoln("Removing annotation ", peer.PeerName) // 7b. Remove annotation with key X return cml.RemoveAnnotation(peer.PeerName) }) From 70a13c9c1231fa2fd2f8f62b316be8bd95de1657 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 8 Nov 2017 11:27:39 +0000 Subject: [PATCH 11/16] Make kube-peers logging consistent and add -log-level flag Also tag kube-peer log messages with "[kube-peers]": in a standard deployment they end up interspersed with weaver log messages so this makes it easier to pick them out. --- prog/kube-peers/main.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index 062a93ac60..a4a41fa91b 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -8,7 +8,6 @@ package main import ( "flag" "fmt" - "log" "net" "os" @@ -64,7 +63,6 @@ const ( // update the list of all peers that have gone through this code path func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, peerName, name string) (*peerList, error) { - common.Log.Infoln("Adding myself to peer list") var list *peerList err := cml.LoopUpdate(func() error { var err error @@ -72,10 +70,8 @@ func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, pee if err != nil { return err } - log.Println("Fetched existing peer list", list) if !list.contains(peerName) { list.add(peerName, name) - log.Println("Storing new peer list", list) err = cml.UpdatePeerList(*list) if err != nil { return err @@ -83,6 +79,7 @@ func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, pee } return nil }) + common.Log.Infoln("[kube-peers] Added myself to peer list", list) return list, err } @@ -102,24 +99,24 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node for _, node := range nodes { delete(peerMap, node.name) } - log.Println("Nodes that have disappeared:", peerMap) + common.Log.Debugln("[kube-peers] Nodes that have disappeared:", peerMap) if len(peerMap) == 0 { break } // 2. Loop for each X in the first set and not in the second - we wish to remove X from our data structures for _, peer := range peerMap { - common.Log.Debugln("Preparing to remove disappeared peer", peer) + common.Log.Debugln("[kube-peers] Preparing to remove disappeared peer", peer) okToRemove := false // 3. Check if there is an existing annotation with key X if existingAnnotation, found := cml.cm.Annotations[peer.PeerName]; found { - common.Log.Debugln("Existing annotation", existingAnnotation) + common.Log.Debugln("[kube-peers] Existing annotation", existingAnnotation) // 4. If annotation already contains my identity, ok; if existingAnnotation == myPeerName { okToRemove = true } } else { // 5. If non-existent, write an annotation with key X and contents "my identity" - common.Log.Debugln("Noting I plan to remove ", peer.PeerName) + common.Log.Debugln("[kube-peers] Noting I plan to remove ", peer.PeerName) if err := cml.UpdateAnnotation(peer.PeerName, myPeerName); err == nil { okToRemove = true } @@ -127,10 +124,10 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node if okToRemove { // 6. If step 4 or 5 succeeded, rmpeer X result, err := weave.RmPeer(peer.PeerName) + common.Log.Infoln("[kube-peers] rmpeer of", peer.PeerName, ":", result) if err != nil { return err } - log.Println("rmpeer of", peer.PeerName, ":", result) cml.LoopUpdate(func() error { // 7aa. Remove any annotations Z* that have contents X if err := cml.RemoveAnnotationsWithValue(peer.PeerName); err != nil { @@ -138,15 +135,15 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node } // 7a. Remove X from peerList storedPeerList.remove(peer.PeerName) - common.Log.Infoln("Removing peer ", peer.PeerName, ". Expecting to remove linked annotation next.") + common.Log.Infoln("[kube-peers] Removing peer ", peer.PeerName, ". Expecting to remove linked annotation next.") if err := cml.UpdatePeerList(*storedPeerList); err != nil { return err } - common.Log.Infoln("Removing annotation ", peer.PeerName) + common.Log.Infoln("[kube-peers] Removing annotation ", peer.PeerName) // 7b. Remove annotation with key X return cml.RemoveAnnotation(peer.PeerName) }) - common.Log.Debugln("Finished removal of ", peer.PeerName) + common.Log.Debugln("[kube-peers] Finished removal of ", peer.PeerName) } // 8. If step 5 failed due to optimistic lock conflict, stop: someone else is handling X @@ -169,36 +166,39 @@ func main() { justReclaim bool peerName string nodeName string + logLevel string ) flag.BoolVar(&justReclaim, "reclaim", false, "reclaim IP space from dead peers") flag.StringVar(&peerName, "peer-name", "unknown", "name of this Weave Net peer") flag.StringVar(&nodeName, "node-name", "unknown", "name of this Kubernetes node") + flag.StringVar(&logLevel, "log-level", "info", "logging level (debug, info, warning, error)") flag.Parse() + common.SetLogLevel(logLevel) config, err := rest.InClusterConfig() if err != nil { - log.Fatalf("Could not get cluster config: %v", err) + common.Log.Fatalf("[kube-peers] Could not get cluster config: %v", err) } c, err := kubernetes.NewForConfig(config) if err != nil { - log.Fatalf("Could not make Kubernetes connection: %v", err) + common.Log.Fatalf("[kube-peers] Could not make Kubernetes connection: %v", err) } peers, err := getKubePeers(c) if err != nil { - log.Fatalf("Could not get peers: %v", err) + common.Log.Fatalf("[kube-peers] Could not get peers: %v", err) } if justReclaim { cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) _, err := addMyselfToPeerList(cml, c, peerName, nodeName) if err != nil { - log.Fatalf("Could not get peer list: %v", err) + common.Log.Fatalf("[kube-peers] Could not update peer list: %v", err) } weave := weaveapi.NewClient(os.Getenv("WEAVE_HTTP_ADDR"), common.Log) err = reclaimRemovedPeers(weave, cml, peers, peerName) if err != nil { - log.Fatalf("Error while reclaiming space: %v", err) + common.Log.Fatalf("[kube-peers] Error while reclaiming space: %v", err) } return } From 3501ad45e6458c8c4ec7252a5885813d770a55c5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 8 Nov 2017 13:38:05 +0000 Subject: [PATCH 12/16] Cope with a Kubernetes node being deleted and coming back Add features to weaver and kube-peers to find the peer name and check if it is in the list, then in launch.sh delete persisted data if not. --- prog/kube-peers/main.go | 26 ++++++++++++++++++++++++++ prog/weave-kube/launch.sh | 23 +++++++++++++++++++++-- prog/weaver/main.go | 9 +++++++-- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/prog/kube-peers/main.go b/prog/kube-peers/main.go index a4a41fa91b..3b03f437b2 100644 --- a/prog/kube-peers/main.go +++ b/prog/kube-peers/main.go @@ -83,6 +83,18 @@ func addMyselfToPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, pee return list, err } +func checkIamInPeerList(cml *configMapAnnotations, c *kubernetes.Clientset, peerName string) (bool, error) { + if err := cml.Init(); err != nil { + return false, err + } + list, err := cml.GetPeerList() + if err != nil { + return false, err + } + common.Log.Debugf("[kube-peers] Checking peer %q against list %v", peerName, list) + return list.contains(peerName), nil +} + // For each of those peers that is no longer listed as a node by // Kubernetes, remove it from Weave IPAM func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, nodes []nodeInfo, myPeerName string) error { @@ -164,11 +176,13 @@ func reclaimRemovedPeers(weave *weaveapi.Client, cml *configMapAnnotations, node func main() { var ( justReclaim bool + justCheck bool peerName string nodeName string logLevel string ) flag.BoolVar(&justReclaim, "reclaim", false, "reclaim IP space from dead peers") + flag.BoolVar(&justCheck, "check-peer-exists", false, "return success if peer name is stored in annotation") flag.StringVar(&peerName, "peer-name", "unknown", "name of this Weave Net peer") flag.StringVar(&nodeName, "node-name", "unknown", "name of this Kubernetes node") flag.StringVar(&logLevel, "log-level", "info", "logging level (debug, info, warning, error)") @@ -183,6 +197,18 @@ func main() { if err != nil { common.Log.Fatalf("[kube-peers] Could not make Kubernetes connection: %v", err) } + if justCheck { + cml := newConfigMapAnnotations(configMapNamespace, configMapName, c) + exists, err := checkIamInPeerList(cml, c, peerName) + if err != nil { + common.Log.Fatalf("[kube-peers] Could not check peer list: %v", err) + } + if exists { + os.Exit(0) + } else { + os.Exit(9) + } + } peers, err := getKubePeers(c) if err != nil { common.Log.Fatalf("[kube-peers] Could not get peers: %v", err) diff --git a/prog/weave-kube/launch.sh b/prog/weave-kube/launch.sh index 9111fbe1bd..36db07bb9f 100755 --- a/prog/weave-kube/launch.sh +++ b/prog/weave-kube/launch.sh @@ -30,6 +30,7 @@ HTTP_ADDR=${WEAVE_HTTP_ADDR:-127.0.0.1:6784} STATUS_ADDR=${WEAVE_STATUS_ADDR:-0.0.0.0:6782} HOST_ROOT=${HOST_ROOT:-/host} CONN_LIMIT=${CONN_LIMIT:-30} +DB_PREFIX=${DB_PREFIX:-/weavedb/weave-net} # Check if the IP range overlaps anything existing on the host /usr/bin/weaveutil netcheck $IPALLOC_RANGE weave @@ -87,6 +88,23 @@ if [ -z "$IPALLOC_INIT" ]; then IPALLOC_INIT="consensus=$(peer_count $KUBE_PEERS)" fi +# Find out what peer name we will be using +PEERNAME=$(/home/weave/weaver $EXTRA_ARGS --print-peer-name --host-root=$HOST_ROOT --db-prefix="$DB_PREFIX") +if [ -z "$PEERNAME" ]; then + echo "Unable to get peer name" >&2 + exit 2 +fi + +# If this peer name is not stored in the list, either we are a +# brand-new peer or we were removed by another peer while temporarily +# absent. In order to avoid a CRDT clash for the latter case, clean up +if ! /home/weave/kube-peers -check-peer-exists -peer-name="$PEERNAME" ; then + if [ -f ${DB_PREFIX}data.db ]; then + echo "Peer not in list; removing persisted data" >&2 + rm -f ${DB_PREFIX}data.db + fi +fi + post_start_actions() { # Wait for weave process to become responsive while true ; do @@ -108,7 +126,7 @@ post_start_actions() { export HOST_ROOT /home/weave/weave --local setup-cni - /home/weave/kube-peers -reclaim -node-name="$HOSTNAME" -peer-name="$(cat /sys/class/net/weave/address)" + /home/weave/kube-peers -reclaim -node-name="$HOSTNAME" -peer-name="$PEERNAME" # Expose the weave network so host processes can communicate with pods /home/weave/weave --local expose $WEAVE_EXPOSE_IP @@ -117,9 +135,10 @@ post_start_actions() { post_start_actions & /home/weave/weaver $EXTRA_ARGS --port=6783 $(router_bridge_opts) \ + --name="$PEERNAME" \ --host-root=$HOST_ROOT \ --http-addr=$HTTP_ADDR --status-addr=$STATUS_ADDR --docker-api='' --no-dns \ - --db-prefix="/weavedb/weave-net" \ + --db-prefix="$DB_PREFIX" \ --ipalloc-range=$IPALLOC_RANGE $NICKNAME_ARG \ --ipalloc-init $IPALLOC_INIT \ --conn-limit=$CONN_LIMIT \ diff --git a/prog/weaver/main.go b/prog/weaver/main.go index 321917fba3..05926a4d7a 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -131,6 +131,7 @@ func main() { var ( justVersion bool + justPeerName bool config mesh.Config bridgeConfig weavenet.BridgeConfig networkConfig weave.NetworkConfig @@ -163,6 +164,7 @@ func main() { ) mflag.BoolVar(&justVersion, []string{"-version"}, false, "print version and exit") + mflag.BoolVar(&justPeerName, []string{"-print-peer-name"}, false, "print peer name and exit") mflag.StringVar(&config.Host, []string{"-host"}, "", "router host") mflag.IntVar(&config.Port, []string{"-port"}, mesh.Port, "router port") mflag.IntVar(&protocolMinVersion, []string{"-min-protocol-version"}, mesh.ProtocolMinVersion, "minimum weave protocol version") @@ -230,6 +232,11 @@ func main() { fmt.Printf("weave %s\n", version) os.Exit(0) } + name := peerName(routerName, bridgeConfig.WeaveBridgeName, dbPrefix, hostRoot) + if justPeerName { + fmt.Printf("%s\n", name) + os.Exit(0) + } peers = mflag.Args() if resume && len(peers) > 0 { @@ -282,8 +289,6 @@ func main() { } } - name := peerName(routerName, bridgeConfig.WeaveBridgeName, dbPrefix, hostRoot) - bridgeConfig.Mac = name.String() bridgeConfig.Port = config.Port bridgeType, err := weavenet.EnsureBridge(procPath, &bridgeConfig, Log) From c67bc704e13ba6aa0c974a6a3154fc62030b5e8c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 8 Nov 2017 11:29:33 +0000 Subject: [PATCH 13/16] Temporarily raise kube-peers log level to debug --- prog/weave-kube/launch.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prog/weave-kube/launch.sh b/prog/weave-kube/launch.sh index 36db07bb9f..f91a0d1604 100755 --- a/prog/weave-kube/launch.sh +++ b/prog/weave-kube/launch.sh @@ -98,7 +98,7 @@ fi # If this peer name is not stored in the list, either we are a # brand-new peer or we were removed by another peer while temporarily # absent. In order to avoid a CRDT clash for the latter case, clean up -if ! /home/weave/kube-peers -check-peer-exists -peer-name="$PEERNAME" ; then +if ! /home/weave/kube-peers -check-peer-exists -peer-name="$PEERNAME" -log-level=debug ; then if [ -f ${DB_PREFIX}data.db ]; then echo "Peer not in list; removing persisted data" >&2 rm -f ${DB_PREFIX}data.db @@ -126,7 +126,7 @@ post_start_actions() { export HOST_ROOT /home/weave/weave --local setup-cni - /home/weave/kube-peers -reclaim -node-name="$HOSTNAME" -peer-name="$PEERNAME" + /home/weave/kube-peers -reclaim -node-name="$HOSTNAME" -peer-name="$PEERNAME" -log-level=debug # Expose the weave network so host processes can communicate with pods /home/weave/weave --local expose $WEAVE_EXPOSE_IP From 49ed40a3019676fa13250e31352fad271e5cd99d Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Mon, 13 Nov 2017 13:13:51 +0000 Subject: [PATCH 14/16] Add test function to recover clean output from command run on remote host --- test/config.sh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/config.sh b/test/config.sh index 6ed34cf4a8..0653c7553c 100644 --- a/test/config.sh +++ b/test/config.sh @@ -104,6 +104,13 @@ run_on() { remote $host $SSH $host "$@" } +get_command_output_on() { + host=$1 + shift 1 + [ -z "$DEBUG" ] || greyly echo "Running on $host: $@" >&2 + $SSH $host "$@" +} + docker_on() { host=$1 shift 1 From 987ac98fbfc7b226080ae1fd5872da87a4ef8c4d Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Mon, 13 Nov 2017 14:23:32 +0000 Subject: [PATCH 15/16] Add up command to integration tests to make launching cluster easier --- test/run-integration-tests.sh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/run-integration-tests.sh b/test/run-integration-tests.sh index 26ba7c0cb7..546ea0819b 100755 --- a/test/run-integration-tests.sh +++ b/test/run-integration-tests.sh @@ -354,6 +354,13 @@ function main() { exit $status ;; + up) # Setup a test environment without actually doing any testing. + provision on "$PROVIDER" + configure "$ssh_user" "$ssh_hosts" "${ssh_port:-22}" "$ssh_id_file" + "$DIR/setup.sh" + echo_export_hosts + ;; + provision) provision on "$PROVIDER" echo_export_hosts From 90f4f7b62cd6570eb8de10f154208d6682246e95 Mon Sep 17 00:00:00 2001 From: Brice Fernandes Date: Fri, 10 Nov 2017 12:27:50 +0000 Subject: [PATCH 16/16] Add test to ensure that weave recovers unreachable IPs on launch --- ...vers_unreachable_ips_on_relaunch_3_test.sh | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100755 test/870_weave_recovers_unreachable_ips_on_relaunch_3_test.sh diff --git a/test/870_weave_recovers_unreachable_ips_on_relaunch_3_test.sh b/test/870_weave_recovers_unreachable_ips_on_relaunch_3_test.sh new file mode 100755 index 0000000000..19d22592bb --- /dev/null +++ b/test/870_weave_recovers_unreachable_ips_on_relaunch_3_test.sh @@ -0,0 +1,117 @@ +#! /bin/bash + +. "$(dirname "$0")/config.sh" + +function howmany { echo $#; } + +# +# Test vars +# +TOKEN=112233.4455667788990000 +HOST1IP=$($SSH $HOST1 "getent hosts $HOST1 | cut -f 1 -d ' '") +NUM_HOSTS=$(howmany $HOSTS) +SUCCESS="$(( $NUM_HOSTS * ($NUM_HOSTS-1) )) established" +KUBECTL="sudo kubectl --kubeconfig /etc/kubernetes/admin.conf" +KUBE_PORT=6443 +IMAGE=weaveworks/network-tester:latest + +if [ -n "$COVERAGE" ]; then + COVERAGE_ARGS="env:\\n - name: EXTRA_ARGS\\n value: \"-test.coverprofile=/home/weave/cover.prof --\"" +else + COVERAGE_ARGS="env:" +fi + +# +# Utility functions +# +function teardown_kubernetes_cluster { + greyly echo "Tearing down kubernetes cluster" + for host in $HOSTS; do + run_on $host "sudo kubeadm reset && sudo rm -r -f /opt/cni/bin/*weave*" + done +} + +function setup_kubernetes_cluster { + teardown_kubernetes_cluster; + + greyly echo "Setting up kubernetes cluster" + + # kubeadm init upgrades to latest Kubernetes version by default, therefore we try to lock the version using the below option: + k8s_version="$(run_on $HOST1 "kubelet --version" | grep -oP "(?<=Kubernetes )v[\d\.\-beta]+")" + k8s_version_option="$([[ "$k8s_version" > "v1.6" ]] && echo "kubernetes-version" || echo "use-kubernetes-version")" + + for host in $HOSTS; do + if [ $host = $HOST1 ] ; then + run_on $host "sudo systemctl start kubelet && sudo kubeadm init --$k8s_version_option=$k8s_version --token=$TOKEN" + else + run_on $host "sudo systemctl start kubelet && sudo kubeadm join --token=$TOKEN $HOST1IP:$KUBE_PORT" + fi + done + + # Ensure Kubernetes uses locally built container images and inject code coverage environment variable (or do nothing depending on $COVERAGE): + sed -e "s%imagePullPolicy: Always%imagePullPolicy: Never%" \ + -e "s%env:%$COVERAGE_ARGS%" \ + "$(dirname "$0")/../prog/weave-kube/weave-daemonset-k8s-1.7.yaml" | run_on "$HOST1" "$KUBECTL apply -n kube-system -f -" +} + +function force_drop_node { + greyly echo "Dropping node $1 with 'sudo kubectl delete node'" + local target=$(echo "$1" | awk -F"." '{print $1}') + run_on $HOST1 "$KUBECTL delete node $target" +} + +function weave_connected { + run_on $HOST1 "curl -sS http://127.0.0.1:6784/status | grep \"$SUCCESS\"" +} + +function unreachable_ip_addresses_count { + # Note Well: This will return 0 if weave is not running at all. + local host=$1 + local count=$(get_command_output_on $host "sudo weave status ipam | grep unreachable | wc -l") + echo $count +} + +function relaunch_weave_pod { + local target=$(echo "$1" | awk -F"." '{print $1}') + + # This is a pretty complex jq query. Is there a simpler way to + # get the weave-net pod name back using kubectl templates? + local QUERY="$KUBECTL get pods -n kube-system -o json | jq -r \".items | map(select(.spec.nodeName == \\\"$target\\\")) | map(select(.metadata.labels.name == \\\"weave-net\\\")) | map(.metadata.name) | .[]\"" + local podname=$(get_command_output_on $HOST1 $QUERY) # Run on master node + run_on $HOST1 "$KUBECTL get pods -n kube-system \"$podname\" -o yaml | $KUBECTL replace --force -f -" +} + +# +# Suite +# +function main { + local IPAM_RECOVER_DELAY=15 + + start_suite "Test weave-net deallocates from IPAM on node failure"; + + setup_kubernetes_cluster; + + # Need to wait until all pods have come up + assert_raises "wait_for_x weave_connected 'pods to be running weave net'" + + greyly echo "Checking unreachable IPs" + assert "unreachable_ip_addresses_count $HOST1" "0"; + assert "unreachable_ip_addresses_count $HOST2" "0"; + assert "unreachable_ip_addresses_count $HOST3" "0"; + + force_drop_node $HOST2; + + relaunch_weave_pod $HOST3; + + sleep $IPAM_RECOVER_DELAY; + + greyly echo "Checking unreachable IPs" + assert "unreachable_ip_addresses_count $HOST1" "0"; + assert "unreachable_ip_addresses_count $HOST3" "0"; + + teardown_kubernetes_cluster; + + end_suite; +} + +main