Skip to content

Commit

Permalink
ip-reconciler: also account for the pods IP addresses
Browse files Browse the repository at this point in the history
Only mark a pod as "alive" when the pod's annotations feature the
IP being de-allocated.

This makes the reconciler binary *dependent* on multus, which adds
these `network-status` annotations into the pod.

Signed-off-by: Miguel Duarte Barroso <[email protected]>
  • Loading branch information
maiqueb committed Jul 19, 2021
1 parent 71472dd commit 84966c3
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 39 deletions.
60 changes: 51 additions & 9 deletions cmd/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/dougbtv/whereabouts/pkg/api/v1alpha1"
"github.com/dougbtv/whereabouts/pkg/reconciler"
"github.com/dougbtv/whereabouts/pkg/types"
multusv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -20,10 +22,11 @@ import (

var _ = Describe("Whereabouts IP reconciler", func() {
const (
ipRange = "10.10.10.0/16"
namespace = "testns"
networkName = "net1"
podName = "pod1"
firstIPInRange = "10.10.10.1"
ipRange = "10.10.10.0/16"
namespace = "testns"
networkName = "net1"
podName = "pod1"
)

var (
Expand All @@ -41,7 +44,8 @@ var _ = Describe("Whereabouts IP reconciler", func() {

BeforeEach(func() {
var err error
pod, err = k8sClientSet.CoreV1().Pods(namespace).Create(generatePod(namespace, podName, networkName))
pod, err = k8sClientSet.CoreV1().Pods(namespace).Create(
generatePod(namespace, podName, ipInNetwork{ip: firstIPInRange, networkName: networkName}))
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -97,13 +101,18 @@ var _ = Describe("Whereabouts IP reconciler", func() {
deadPodIndex = 0
livePodIndex = 1
numberOfPods = 2
secondIPInRange = "10.10.10.2"
)

var pods []v1.Pod

BeforeEach(func() {
ips := []string{firstIPInRange, secondIPInRange}
for i := 0; i < numberOfPods; i++ {
pod := generatePod(namespace, fmt.Sprintf("pod%d", i+1), networkName)
pod := generatePod(namespace, fmt.Sprintf("pod%d", i+1), ipInNetwork{
ip: ips[i],
networkName: networkName,
})
if i == livePodIndex {
_, err := k8sClientSet.CoreV1().Pods(namespace).Create(pod)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -189,13 +198,17 @@ func generateIPPoolSpec(ipRange string, namespace string, poolName string, podNa
}
}

func generatePod(namespace string, podName string, networks ...string) *v1.Pod {
networkAnnotations := map[string]string{"k8s.v1.cni.cncf.io/networks": strings.Join(networks, ",")}
type ipInNetwork struct {
ip string
networkName string
}

func generatePod(namespace string, podName string, ipNetworks ...ipInNetwork) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Annotations: networkAnnotations,
Annotations: generatePodAnnotations(ipNetworks...),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
Expand All @@ -208,3 +221,32 @@ func generatePod(namespace string, podName string, networks ...string) *v1.Pod {
},
}
}

func generatePodAnnotations(ipNetworks ...ipInNetwork) map[string]string {
var networks []string
for _, ipNetworkInfo := range ipNetworks {
networks = append(networks, ipNetworkInfo.networkName)
}
networkAnnotations := map[string]string{
reconciler.MultusNetworkAnnotation: strings.Join(networks, ","),
reconciler.MultusNetworkStatusAnnotation: generatePodNetworkStatusAnnotation(ipNetworks...),
}
return networkAnnotations
}

func generatePodNetworkStatusAnnotation(ipNetworks ...ipInNetwork) string {
var networkStatus []multusv1.NetworkStatus
for i, ipNetworkInfo := range ipNetworks {
networkStatus = append(networkStatus, multusv1.NetworkStatus{
Name: ipNetworkInfo.networkName,
Interface: fmt.Sprintf("net%d", i + 1),
IPs: []string{ipNetworkInfo.ip},
})
}
networkStatusStr, err := json.Marshal(networkStatus)
if err != nil {
return ""
}

return string(networkStatusStr)
}
59 changes: 29 additions & 30 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/dougbtv/whereabouts/pkg/storage"
"github.com/dougbtv/whereabouts/pkg/storage/kubernetes"
"github.com/dougbtv/whereabouts/pkg/types"
v1 "k8s.io/api/core/v1"
)

type ReconcileLooper struct {
cancelFunc func()
ctx context.Context
k8sClient kubernetes.Client
livePodRefs []string
livePods map[string]podWrapper
orphanedIPs []OrphanedIPReservations
}

Expand All @@ -33,17 +34,17 @@ func NewReconcileLooper(kubeConfigPath string) (*ReconcileLooper, error) {
logging.Debugf("successfully read the kubernetes configuration file located at: %s", kubeConfigPath)

ctx, cancel := context.WithTimeout(context.Background(), storage.RequestTimeout)
podRefs, err := getPodRefs(*k8sClient)
pods, err := k8sClient.ListPods()
if err != nil {
cancel()
return nil, err
}

looper := &ReconcileLooper{
cancelFunc: cancel,
ctx: ctx,
k8sClient: *k8sClient,
livePodRefs: podRefs,
cancelFunc: cancel,
ctx: ctx,
k8sClient: *k8sClient,
livePods: indexPods(pods) ,
}

if err := looper.findOrphanedIPsPerPool(); err != nil {
Expand All @@ -52,19 +53,6 @@ func NewReconcileLooper(kubeConfigPath string) (*ReconcileLooper, error) {
return looper, nil
}

func getPodRefs(k8sClient kubernetes.Client) ([]string, error) {
pods, err := k8sClient.ListPods()
if err != nil {
return nil, err
}

var podRefs []string
for _, pod := range pods {
podRefs = append(podRefs, fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()))
}
return podRefs, err
}

func (rl *ReconcileLooper) findOrphanedIPsPerPool() error {
ipPools, err := rl.k8sClient.ListIPPools(rl.ctx)
if err != nil {
Expand All @@ -75,15 +63,15 @@ func (rl *ReconcileLooper) findOrphanedIPsPerPool() error {
orphanIP := OrphanedIPReservations{
Pool: pool,
}
for _, allocation := range pool.Allocations() {
logging.Debugf("the IP reservation: %s", allocation)
if allocation.PodRef == "" {
_ = logging.Errorf("pod ref missing for Allocations: %s", allocation)
for _, ipReservation := range pool.Allocations() {
logging.Debugf("the IP reservation: %s", ipReservation)
if ipReservation.PodRef == "" {
_ = logging.Errorf("pod ref missing for Allocations: %s", ipReservation)
continue
}
if !rl.isPodAlive(allocation.PodRef) {
logging.Debugf("pod ref %s is not listed in the live pods list", allocation.PodRef)
orphanIP.Allocations = append(orphanIP.Allocations, allocation)
if !rl.isPodAlive(ipReservation) {
logging.Debugf("pod ref %s is not listed in the live pods list", ipReservation.PodRef)
orphanIP.Allocations = append(orphanIP.Allocations, ipReservation)
}
}
if len(orphanIP.Allocations) > 0 {
Expand All @@ -94,15 +82,26 @@ func (rl *ReconcileLooper) findOrphanedIPsPerPool() error {
return nil
}

func (rl ReconcileLooper) isPodAlive(podRef string) bool {
for _, livePodRef := range rl.livePodRefs {
if podRef == livePodRef {
return true
func (rl ReconcileLooper) isPodAlive(allocation types.IPReservation) bool {
for livePodRef, livePod := range rl.livePods {
if allocation.PodRef == livePodRef {
livePodIPs := livePod.ips
logging.Debugf(
"pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s",
livePodRef,
allocation.IP.String(),
livePodIPs)
_, isFound := livePodIPs[allocation.IP.String()]
return isFound
}
}
return false
}

func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

func (rl ReconcileLooper) ReconcileIPPools() ([]types.IPReservation, error) {
defer rl.cancelFunc()

Expand Down
62 changes: 62 additions & 0 deletions pkg/reconciler/wrappedPod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package reconciler

import (
"encoding/json"

"github.com/dougbtv/whereabouts/pkg/logging"
k8snetworkplumbingwgv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"

v1 "k8s.io/api/core/v1"
)

const (
multusInterfaceNamePrefix = "net"
multusPrefixSize = len(multusInterfaceNamePrefix)
MultusNetworkAnnotation = "k8s.v1.cni.cncf.io/networks"
MultusNetworkStatusAnnotation = "k8s.v1.cni.cncf.io/networks-status"
)

type podWrapper struct {
ips map[string]void
}

type void struct {}

func wrapPod(pod v1.Pod) *podWrapper {
return &podWrapper{
ips: getFlatIPSet(pod),
}
}

func indexPods(podList []v1.Pod) map[string]podWrapper {
podMap := map[string]podWrapper{}

for _, pod := range podList {
wrappedPod := wrapPod(pod)
if wrappedPod != nil {
podMap[composePodRef(pod)] = *wrappedPod
}
}
return podMap
}

func getFlatIPSet(pod v1.Pod) map[string]void {
var empty void
ipSet := map[string]void{}
networkStatusAnnotationValue := []byte(pod.Annotations[MultusNetworkStatusAnnotation])
var networkStatusList []k8snetworkplumbingwgv1.NetworkStatus
if err := json.Unmarshal(networkStatusAnnotationValue, &networkStatusList); err != nil {
_ = logging.Errorf("could not parse network annotation %s for pod: %s; error: %v", networkStatusAnnotationValue, composePodRef(pod), err)
return ipSet
}

for _, network := range networkStatusList {
// we're only after multus secondary interfaces
if network.Interface[:multusPrefixSize] == multusInterfaceNamePrefix {
for _, ip := range network.IPs {
ipSet[ip] = empty
}
}
}
return ipSet
}
Loading

0 comments on commit 84966c3

Please sign in to comment.