Skip to content

Commit

Permalink
Merge pull request #82 from cloudnativelabs/network-policy-ga
Browse files Browse the repository at this point in the history
support for network policy GA
  • Loading branch information
murali-reddy authored Jul 28, 2017
2 parents 8a3d6c0 + b4c063e commit c0a14e8
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 52 deletions.
156 changes: 133 additions & 23 deletions app/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/janeczku/go-ipset/ipset"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
)

// Network policy controller provides an ingress firewall for the pods as per the defined network policies.
Expand All @@ -34,10 +36,11 @@ import (
// dropped by the rule in the pod chain, if there is no match.

type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
v1NetworkPolicy bool

// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
Expand Down Expand Up @@ -94,7 +97,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
glog.Infof("Performing periodic syn of the iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: ", err)
glog.Errorf("Error during periodic sync: " + err.Error())
}
} else {
continue
Expand Down Expand Up @@ -122,7 +125,6 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
}

func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
Expand All @@ -134,6 +136,12 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
}

func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {

// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}

glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
Expand Down Expand Up @@ -162,24 +170,33 @@ func (npc *NetworkPolicyController) Sync() error {
glog.Infof("sync iptables took %v", time.Since(start))
}()

npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error())
if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}

} else {
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}

}

activePolicyChains, err := npc.syncNetworkPolicyChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
}

activePodFwChains, err := npc.syncPodFirewallChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
}

err = cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil {
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error())
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: " + err.Error())
}

return nil
Expand Down Expand Up @@ -334,19 +351,19 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
}

// loop through the pods running on the node which has default ingress to be denied
podsOnNodeInfo, err := getPodsRunningOnNode(npc.nodeIP.String())
firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *podsOnNodeInfo {
for _, pod := range *firewallEnabledPods {

// below condition occurs when we get trasient update while removing or adding pod
// subseqent update will do the correct action
if len(pod.ip) == 0 || pod.ip == "" {
continue
}

// ensure pod specfic firewall chain exist for all the pods running on this node
// ensure pod specfic firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
Expand Down Expand Up @@ -557,20 +574,47 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
return nil
}

func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) {
func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) {

nodePods := make(map[string]podInfo)

for _, pod := range watchers.PodWatcher.List() {
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
if npc.v1NetworkPolicy {
podNeedsFirewall := false
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*networking.NetworkPolicy)
if policy.Namespace != pod.ObjectMeta.Namespace {
continue
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
policy.Spec.PodSelector.MatchLabels)
if err != nil {
return nil, fmt.Errorf("Failed to get the pods %s", err.Error())
}
for _, matchingPod := range matchingPods {
if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name {
podNeedsFirewall = true
break
}
}
if podNeedsFirewall {
break
}
}
if !podNeedsFirewall {
continue
}
} else {
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
}
}
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
Expand All @@ -584,7 +628,66 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policy := range watchers.NetworkPolicyWatcher.List() {
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {

policy, ok := policy_obj.(*networking.NetworkPolicy)
if !ok {
return nil, fmt.Errorf("Failed to convert")
}
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
labels: policy.Spec.PodSelector.MatchLabels,
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.destPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
if err == nil {
for _, matchingPod := range matchingPods {
newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels}
}
}

for _, specIngressRule := range policy.Spec.Ingress {
ingressRule := ingressRule{}

ingressRule.ports = make([]protocolAndPort, 0)
for _, port := range specIngressRule.Ports {
protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()}
ingressRule.ports = append(ingressRule.ports, protocolAndPort)
}

ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
if err == nil {
for _, matchingPod := range matchingPods {
ingressRule.srcPods = append(ingressRule.srcPods,
podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels})
}
}
}
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
}
NetworkPolicies = append(NetworkPolicies, newPolicy)
}

return &NetworkPolicies, nil
}

func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {

NetworkPolicies := make([]networkPolicyInfo, 0)

for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {

policy, _ := policy_obj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
Expand Down Expand Up @@ -781,6 +884,13 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options

npc.syncPeriod = config.IPTablesSyncPeriod

npc.v1NetworkPolicy = true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
npc.v1NetworkPolicy = false
}

node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil {
return nil, err
Expand Down
59 changes: 30 additions & 29 deletions app/watchers/network_policy_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package watchers

import (
"reflect"
"strconv"
"time"

"github.com/cloudnativelabs/kube-router/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
cache "k8s.io/client-go/tools/cache"
)

type NetworkPolicyUpdate struct {
NetworkPolicy *apiextensions.NetworkPolicy
NetworkPolicy interface{}
Op Operation
}

Expand All @@ -33,28 +35,16 @@ type NetworkPolicyUpdatesHandler interface {
}

func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj})
}

func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj})
}

func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) {
policy, ok := newObj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj})
}
}

Expand All @@ -64,13 +54,8 @@ func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHan
}))
}

func (npw *networkPolicyWatcher) List() []*apiextensions.NetworkPolicy {
obj_list := npw.networkPolicyLister.List()
np_instances := make([]*apiextensions.NetworkPolicy, len(obj_list))
for i, ins := range obj_list {
np_instances[i] = ins.(*apiextensions.NetworkPolicy)
}
return np_instances
func (npw *networkPolicyWatcher) List() []interface{} {
return npw.networkPolicyLister.List()
}

func (npw *networkPolicyWatcher) HasSynced() bool {
Expand All @@ -91,13 +76,29 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim
}

npw.clientset = clientset

v1NetworkPolicy := true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
v1NetworkPolicy = false
}

npw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw,
&apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
var lw *cache.ListWatch
if v1NetworkPolicy {
lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
} else {
lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}
networkPolicyStopCh = make(chan struct{})
go npw.networkPolicyController.Run(networkPolicyStopCh)
return &npw, nil
Expand Down

0 comments on commit c0a14e8

Please sign in to comment.