Skip to content

Commit

Permalink
used shared informers for events and listing resources
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsykim committed Apr 8, 2018
1 parent fadfc41 commit 1007c5a
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 227 deletions.
2 changes: 1 addition & 1 deletion app/controllers/metrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (mc *MetricsController) Run(healthChan chan<- *ControllerHeartbeat, stopCh
}

// NewMetricsController returns new MetricController object
func NewMetricsController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*MetricsController, error) {
func NewMetricsController(clientset kubernetes.Interface, config *options.KubeRouterConfig) (*MetricsController, error) {
mc := MetricsController{}
mc.MetricsPath = config.MetricsPath
mc.MetricsPort = config.MetricsPort
Expand Down
212 changes: 134 additions & 78 deletions app/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controllers
import (
"crypto/sha256"
"encoding/base32"
"encoding/json"
"errors"
"fmt"
"net"
Expand All @@ -13,15 +12,18 @@ import (
"time"

"github.com/cloudnativelabs/kube-router/app/options"
"github.com/cloudnativelabs/kube-router/app/watchers"
"github.com/cloudnativelabs/kube-router/utils"
"github.com/coreos/go-iptables/iptables"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"

api "k8s.io/api/core/v1"
apiextensions "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

const (
Expand Down Expand Up @@ -51,6 +53,14 @@ type NetworkPolicyController struct {
// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
ipSetHandler *utils.IPSet

podLister cache.Indexer
npLister cache.Indexer
nsLister cache.Indexer

PodEventHandler cache.ResourceEventHandler
NamespaceEventHandler cache.ResourceEventHandler
NetworkPolicyEventHandler cache.ResourceEventHandler
}

// internal structure to represent a network policy
Expand Down Expand Up @@ -120,16 +130,12 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
default:
}

if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: " + err.Error())
} else {
sendHeartBeat(healthChan, "NPC")
}
glog.V(1).Info("Performing periodic sync of iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: " + err.Error())
} else {
continue
sendHeartBeat(healthChan, "NPC")
}

select {
Expand All @@ -142,46 +148,37 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
}

// OnPodUpdate handles updates to pods from the Kubernetes api server
func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", podUpdate.Pod.Namespace, podUpdate.Pod.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: %s", err)
}
} else {
glog.V(2).Infof("Received pod update, but controller not in sync")
func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received pod update namespace:%s pod name:%s", pod.Namespace, pod.Name)

err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on pod update: %s", err)
}
}

// OnNetworkPolicyUpdate handles updates to network policy from the kubernetes api server
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: %s", err)
}
} else {
glog.V(2).Info("Received network policy update, but controller not in sync")
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on network policy update: %s", err)
}
}

// OnNamespaceUpdate handles updates to namespace from kubernetes api server
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {

func (npc *NetworkPolicyController) OnNamespaceUpdate(obj interface{}) {
// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}

glog.V(2).Infof("Received namespace update namespace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: %s", err)
}
} else {
glog.V(2).Info("Received namespace update, but controller not in sync")
namespace := obj.(*api.Namespace)
glog.V(2).Infof("Received update for namespace: %s", namespace.Name)

err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing on namespace update: %s", err)
}
}

Expand All @@ -204,13 +201,13 @@ func (npc *NetworkPolicyController) Sync() error {
glog.V(1).Info("Starting periodic sync of iptables")

if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
npc.networkPoliciesInfo, err = npc.buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
} else {
// TODO remove the Beta support
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
npc.networkPoliciesInfo, err = npc.buildBetaNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
Expand Down Expand Up @@ -948,7 +945,9 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets
func (npc *NetworkPolicyController) getIngressNetworkPolicyEnabledPods(nodeIp string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)

for _, pod := range watchers.PodWatcher.List() {
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)

if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
Expand All @@ -975,7 +974,9 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str

nodePods := make(map[string]podInfo)

for _, pod := range watchers.PodWatcher.List() {
for _, obj := range npc.podLister.List() {
pod := obj.(*api.Pod)

if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
Expand All @@ -997,11 +998,11 @@ func (npc *NetworkPolicyController) getEgressNetworkPolicyEnabledPods(nodeIp str
return &nodePods, nil
}

func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
func (npc *NetworkPolicyController) buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj := range npc.npLister.List() {

policy, ok := policyObj.(*networking.NetworkPolicy)
if !ok {
Expand Down Expand Up @@ -1042,7 +1043,7 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
}
}

matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.targetPods = make(map[string]podInfo)
if err == nil {
for _, matchingPod := range matchingPods {
Expand Down Expand Up @@ -1094,15 +1095,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
for _, peer := range specIngressRule.From {
// spec must have either of PodSelector or NamespaceSelector
if peer.PodSelector != nil {
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
peer.PodSelector.MatchLabels)
} else if peer.NamespaceSelector != nil {
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
for _, namespace := range namespaces {
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
Expand Down Expand Up @@ -1155,15 +1156,15 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
for _, peer := range specEgressRule.To {
// spec must have either of PodSelector or NamespaceSelector
if peer.PodSelector != nil {
matchingPods, err = watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
matchingPods, err = npc.ListPodsByNamespaceAndLabels(policy.Namespace,
peer.PodSelector.MatchLabels)
} else if peer.NamespaceSelector != nil {
namespaces, err := watchers.NamespaceWatcher.ListByLabels(peer.NamespaceSelector.MatchLabels)
namespaces, err := npc.ListNamespaceByLabels(peer.NamespaceSelector.MatchLabels)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
for _, namespace := range namespaces {
namespacePods, err := watchers.PodWatcher.ListByNamespaceAndLabels(namespace.Name, nil)
namespacePods, err := npc.ListPodsByNamespaceAndLabels(namespace.Name, nil)
if err != nil {
return nil, errors.New("Failed to build network policies info due to " + err.Error())
}
Expand Down Expand Up @@ -1192,19 +1193,37 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
return &NetworkPolicies, nil
}

func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
func (npc *NetworkPolicyController) ListPodsByNamespaceAndLabels(namespace string, labelsToMatch labels.Set) (ret []*api.Pod, err error) {
podLister := listers.NewPodLister(npc.podLister)
allMatchedNameSpacePods, err := podLister.Pods(namespace).List(labelsToMatch.AsSelector())
if err != nil {
return nil, err
}
return allMatchedNameSpacePods, nil
}

func (npc *NetworkPolicyController) ListNamespaceByLabels(set labels.Set) ([]*api.Namespace, error) {
namespaceLister := listers.NewNamespaceLister(npc.npLister)
matchedNamespaces, err := namespaceLister.List(set.AsSelector())
if err != nil {
return nil, err
}
return matchedNamespaces, nil
}

func (npc *NetworkPolicyController) buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policyObj := range watchers.NetworkPolicyWatcher.List() {
for _, policyObj := range npc.npLister.List() {

policy, _ := policyObj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
labels: policy.Spec.PodSelector.MatchLabels,
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.targetPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
if err == nil {
Expand All @@ -1227,7 +1246,7 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
matchingPods, err := npc.ListPodsByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
if err == nil {
for _, matchingPod := range matchingPods {
ingressRule.srcPods = append(ingressRule.srcPods,
Expand All @@ -1246,25 +1265,6 @@ func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
return &NetworkPolicies, nil
}

func getNameSpaceDefaultPolicy(namespace string) (string, error) {
for _, nspw := range watchers.NamespaceWatcher.List() {
if strings.Compare(namespace, nspw.Name) == 0 {
networkPolicy, ok := nspw.ObjectMeta.Annotations[networkPolicyAnnotation]
var annot map[string]map[string]string
if ok {
err := json.Unmarshal([]byte(networkPolicy), &annot)
if err == nil {
return annot["ingress"]["isolation"], nil
}
glog.Errorf("Skipping invalid network-policy for namespace \"%s\": %s", namespace, err)
return "DefaultAllow", errors.New("Invalid NetworkPolicy")
}
return "DefaultAllow", nil
}
}
return "", errors.New("Failed to get the default ingress policy for the namespace: " + namespace)
}

func podFirewallChainName(namespace, podName string) string {
hash := sha256.Sum256([]byte(namespace + podName))
encoded := base32.StdEncoding.EncodeToString(hash[:])
Expand Down Expand Up @@ -1385,8 +1385,59 @@ func (npc *NetworkPolicyController) Cleanup() {
glog.Infof("Successfully cleaned the iptables configuration done by kube-router")
}

func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)

},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnPodUpdate(newObj)

},
DeleteFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
},
}
}

func (npc *NetworkPolicyController) newNamespaceEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNamespaceUpdate(obj)

},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNamespaceUpdate(newObj)

},
DeleteFunc: func(obj interface{}) {
npc.OnNamespaceUpdate(obj)

},
}
}

func (npc *NetworkPolicyController) newNetworkPolicyEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
npc.OnNetworkPolicyUpdate(obj)

},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnNetworkPolicyUpdate(newObj)
},
DeleteFunc: func(obj interface{}) {
npc.OnNetworkPolicyUpdate(obj)

},
}
}

// NewNetworkPolicyController returns new NetworkPolicyController object
func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options.KubeRouterConfig) (*NetworkPolicyController, error) {
func NewNetworkPolicyController(clientset kubernetes.Interface,
config *options.KubeRouterConfig, podInformer cache.SharedIndexInformer,
npInformer cache.SharedIndexInformer, nsInformer cache.SharedIndexInformer) (*NetworkPolicyController, error) {
npc := NetworkPolicyController{}

if config.MetricsEnabled {
Expand Down Expand Up @@ -1427,9 +1478,14 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
}
npc.ipSetHandler = ipset

watchers.PodWatcher.RegisterHandler(&npc)
watchers.NetworkPolicyWatcher.RegisterHandler(&npc)
watchers.NamespaceWatcher.RegisterHandler(&npc)
npc.podLister = podInformer.GetIndexer()
npc.PodEventHandler = npc.newPodEventHandler()

npc.nsLister = nsInformer.GetIndexer()
npc.NamespaceEventHandler = npc.newNetworkPolicyEventHandler()

npc.npLister = npInformer.GetIndexer()
npc.NetworkPolicyEventHandler = npc.newNetworkPolicyEventHandler()

return &npc, nil
}
Loading

0 comments on commit 1007c5a

Please sign in to comment.