Skip to content

Commit

Permalink
support for network policy GA
Browse files Browse the repository at this point in the history
with this refactoring support for network policy V1 (or GA) is added.
Changes are backward compatible so beta network policy semantics
are still available for k8s ver 1.6.* and less

Fixes #16
  • Loading branch information
Murali Reddy committed Jul 28, 2017
1 parent 8a3d6c0 commit b4c063e
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 52 deletions.
156 changes: 133 additions & 23 deletions app/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/janeczku/go-ipset/ipset"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
)

// Network policy controller provides an ingress firewall for the pods as per the defined network policies.
Expand All @@ -34,10 +36,11 @@ import (
// dropped by the rule in the pod chain, if there is no match.

type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
v1NetworkPolicy bool

// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
Expand Down Expand Up @@ -94,7 +97,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
glog.Infof("Performing periodic syn of the iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: ", err)
glog.Errorf("Error during periodic sync: " + err.Error())
}
} else {
continue
Expand Down Expand Up @@ -122,7 +125,6 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
}

func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
Expand All @@ -134,6 +136,12 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
}

func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {

// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}

glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
Expand Down Expand Up @@ -162,24 +170,33 @@ func (npc *NetworkPolicyController) Sync() error {
glog.Infof("sync iptables took %v", time.Since(start))
}()

npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error())
if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}

} else {
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}

}

activePolicyChains, err := npc.syncNetworkPolicyChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
}

activePodFwChains, err := npc.syncPodFirewallChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
}

err = cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil {
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error())
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: " + err.Error())
}

return nil
Expand Down Expand Up @@ -334,19 +351,19 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
}

// loop through the pods running on the node which has default ingress to be denied
podsOnNodeInfo, err := getPodsRunningOnNode(npc.nodeIP.String())
firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *podsOnNodeInfo {
for _, pod := range *firewallEnabledPods {

// below condition occurs when we get trasient update while removing or adding pod
// subseqent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}

// ensure pod specfic firewall chain exist for all the pods running on this node
// ensure pod specfic firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
Expand Down Expand Up @@ -557,20 +574,47 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
return nil
}

func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) {
func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) {

nodePods := make(map[string]podInfo)

for _, pod := range watchers.PodWatcher.List() {
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
if npc.v1NetworkPolicy {
podNeedsFirewall := false
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*networking.NetworkPolicy)
if policy.Namespace != pod.ObjectMeta.Namespace {
continue
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
policy.Spec.PodSelector.MatchLabels)
if err != nil {
return nil, fmt.Errorf("Failed to get the pods %s", err.Error())
}
for _, matchingPod := range matchingPods {
if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name {
podNeedsFirewall = true
break
}
}
if podNeedsFirewall {
break
}
}
if !podNeedsFirewall {
continue
}
} else {
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
}
}
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
Expand All @@ -584,7 +628,66 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policy := range watchers.NetworkPolicyWatcher.List() {
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {

policy, ok := policy_obj.(*networking.NetworkPolicy)
if !ok {
return nil, fmt.Errorf("Failed to convert")
}
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
labels: policy.Spec.PodSelector.MatchLabels,
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.destPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
if err == nil {
for _, matchingPod := range matchingPods {
newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels}
}
}

for _, specIngressRule := range policy.Spec.Ingress {
ingressRule := ingressRule{}

ingressRule.ports = make([]protocolAndPort, 0)
for _, port := range specIngressRule.Ports {
protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()}
ingressRule.ports = append(ingressRule.ports, protocolAndPort)
}

ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
if err == nil {
for _, matchingPod := range matchingPods {
ingressRule.srcPods = append(ingressRule.srcPods,
podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels})
}
}
}
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
}
NetworkPolicies = append(NetworkPolicies, newPolicy)
}

return &NetworkPolicies, nil
}

func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {

policy, _ := policy_obj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
Expand Down Expand Up @@ -781,6 +884,13 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options

npc.syncPeriod = config.IPTablesSyncPeriod

npc.v1NetworkPolicy = true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
npc.v1NetworkPolicy = false
}

node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil {
return nil, err
Expand Down
59 changes: 30 additions & 29 deletions app/watchers/network_policy_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package watchers

import (
"reflect"
"strconv"
"time"

"github.com/cloudnativelabs/kube-router/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
cache "k8s.io/client-go/tools/cache"
)

type NetworkPolicyUpdate struct {
NetworkPolicy *apiextensions.NetworkPolicy
NetworkPolicy interface{}
Op Operation
}

Expand All @@ -33,28 +35,16 @@ type NetworkPolicyUpdatesHandler interface {
}

func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj})
}

func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj})
}

func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) {
policy, ok := newObj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj})
}
}

Expand All @@ -64,13 +54,8 @@ func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHan
}))
}

func (npw *networkPolicyWatcher) List() []*apiextensions.NetworkPolicy {
obj_list := npw.networkPolicyLister.List()
np_instances := make([]*apiextensions.NetworkPolicy, len(obj_list))
for i, ins := range obj_list {
np_instances[i] = ins.(*apiextensions.NetworkPolicy)
}
return np_instances
func (npw *networkPolicyWatcher) List() []interface{} {
return npw.networkPolicyLister.List()
}

func (npw *networkPolicyWatcher) HasSynced() bool {
Expand All @@ -91,13 +76,29 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim
}

npw.clientset = clientset

v1NetworkPolicy := true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
v1NetworkPolicy = false
}

npw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw,
&apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
var lw *cache.ListWatch
if v1NetworkPolicy {
lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
} else {
lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}
networkPolicyStopCh = make(chan struct{})
go npw.networkPolicyController.Run(networkPolicyStopCh)
return &npw, nil
Expand Down

0 comments on commit b4c063e

Please sign in to comment.