Skip to content

Commit

Permalink
ipv6 updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mk01 committed Feb 15, 2019
1 parent 00824cd commit cdd3efe
Show file tree
Hide file tree
Showing 35 changed files with 4,998 additions and 2,446 deletions.
11 changes: 9 additions & 2 deletions build/image-assets/motd-kube-router.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/usr/bin/env sh

function ribInfo() {
OUT=$(gobgp global rib -a $1 | grep -v "not in table")
test -n "${OUT}" || return 0
echo "--- BGP Route Info ($1) ---"
echo "${OUT}"
}

echo "Welcome to kube-router on \"${NODE_NAME}\"!"
echo
echo "For debugging, the following tools are available:"
Expand Down Expand Up @@ -32,8 +39,8 @@ echo
echo "--- BGP Neighbors ---"
gobgp neighbor
echo
echo "--- BGP Route Info ---"
gobgp global rib
ribInfo ipv4
ribInfo ipv6
echo
echo "--- IPVS Services ---"
ipvsadm -ln
Expand Down
15 changes: 11 additions & 4 deletions pkg/cmd/kube-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"syscall"

"github.com/cloudnativelabs/kube-router/pkg/controllers"
"github.com/cloudnativelabs/kube-router/pkg/controllers/netpol"
"github.com/cloudnativelabs/kube-router/pkg/controllers/proxy"
"github.com/cloudnativelabs/kube-router/pkg/controllers/routing"
Expand Down Expand Up @@ -73,6 +74,12 @@ func CleanupConfigAndExit() {
nrc.Cleanup()
}

func (kr *KubeRouter) spawnController(healthCh chan *healthcheck.ControllerHeartbeat, stopCh chan struct{}, wg *sync.WaitGroup, cnt controllers.Controller) {
if err := cnt.Run(healthCh, stopCh, wg); err != nil {
glog.Errorf("Can't start controller: %s", err.Error())
}
}

// Run starts the controllers and waits forever till we get SIGINT or SIGTERM
func (kr *KubeRouter) Run() error {
var err error
Expand Down Expand Up @@ -118,7 +125,7 @@ func (kr *KubeRouter) Run() error {
return errors.New("Failed to create metrics controller: " + err.Error())
}
wg.Add(1)
go mc.Run(healthChan, stopCh, &wg)
go kr.spawnController(healthChan, stopCh, &wg, mc)

} else if kr.Config.MetricsPort > 65535 {
glog.Errorf("Metrics port must be over 0 and under 65535, given port: %d", kr.Config.MetricsPort)
Expand All @@ -139,7 +146,7 @@ func (kr *KubeRouter) Run() error {
npInformer.AddEventHandler(npc.NetworkPolicyEventHandler)

wg.Add(1)
go npc.Run(healthChan, stopCh, &wg)
go kr.spawnController(healthChan, stopCh, &wg, npc)
}

if kr.Config.RunRouter {
Expand All @@ -153,7 +160,7 @@ func (kr *KubeRouter) Run() error {
epInformer.AddEventHandler(nrc.EndpointsEventHandler)

wg.Add(1)
go nrc.Run(healthChan, stopCh, &wg)
go kr.spawnController(healthChan, stopCh, &wg, nrc)
}

if kr.Config.RunServiceProxy {
Expand All @@ -167,7 +174,7 @@ func (kr *KubeRouter) Run() error {
epInformer.AddEventHandler(nsc.EndpointsEventHandler)

wg.Add(1)
go nsc.Run(healthChan, stopCh, &wg)
go kr.spawnController(healthChan, stopCh, &wg, nsc)
}

// Handle SIGINT and SIGTERM
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
package controllers

import (
"sync"

"github.com/cloudnativelabs/kube-router/pkg/healthcheck"
)

type Controller interface {
GetName() string
Run(chan<- *healthcheck.ControllerHeartbeat, <-chan struct{}, *sync.WaitGroup) error
}
52 changes: 30 additions & 22 deletions pkg/controllers/netpol/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"

"github.com/cloudnativelabs/kube-router/pkg/utils/net-tools"
api "k8s.io/api/core/v1"
apiextensions "k8s.io/api/extensions/v1beta1"
networking "k8s.io/api/networking/v1"
Expand All @@ -31,6 +32,8 @@ import (
)

const (
CONTROLLER_NAME = "Policy controller"

networkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
kubePodFirewallChainPrefix = "KUBE-POD-FW-"
kubeNetworkPolicyChainPrefix = "KUBE-NWPLCY-"
Expand Down Expand Up @@ -61,7 +64,7 @@ type NetworkPolicyController struct {

// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
ipSetHandler *utils.IPSet
ipSetHandler *netutils.IPSet

podLister cache.Indexer
npLister cache.Indexer
Expand Down Expand Up @@ -132,8 +135,12 @@ func newProtocolAndPort(protocol string, port *intstr.IntOrString) protocolAndPo
return protocolAndPort{protocol: protocol, port: strPort}
}

func (npc *NetworkPolicyController) GetName() string {
return CONTROLLER_NAME
}

// Run runs forver till we receive notification on stopCh
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) {
func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.ControllerHeartbeat, stopCh <-chan struct{}, wg *sync.WaitGroup) error {
t := time.NewTicker(npc.syncPeriod)
defer t.Stop()
defer wg.Done()
Expand All @@ -145,7 +152,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle
select {
case <-stopCh:
glog.Info("Shutting down network policies controller")
return
return nil
default:
}

Expand All @@ -161,10 +168,11 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *healthcheck.Controlle
select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
return
return nil
case <-t.C:
}
}
return nil
}

// OnPodUpdate handles updates to pods from the Kubernetes api server
Expand Down Expand Up @@ -297,14 +305,14 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map

// create a ipset for all destination pod ip's matched by the policy spec PodSelector
targetDestPodIpSetName := policyDestinationPodIpSetName(policy.namespace, policy.name)
targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
targetDestPodIpSet, err := npc.ipSetHandler.Create(targetDestPodIpSetName, netutils.TypeHashIP, netutils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}

// create a ipset for all source pod ip's matched by the policy spec PodSelector
targetSourcePodIpSetName := policySourcePodIpSetName(policy.namespace, policy.name)
targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
targetSourcePodIpSet, err := npc.ipSetHandler.Create(targetSourcePodIpSetName, netutils.TypeHashIP, netutils.OptionTimeout, "0")
if err != nil {
return nil, nil, fmt.Errorf("failed to create ipset: %s", err.Error())
}
Expand All @@ -317,11 +325,11 @@ func (npc *NetworkPolicyController) syncNetworkPolicyChains(version string) (map
currnetPodIps = append(currnetPodIps, ip)
}

err = targetSourcePodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
err = targetSourcePodIpSet.Refresh(currnetPodIps, netutils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh targetSourcePodIpSet: " + err.Error())
}
err = targetDestPodIpSet.Refresh(currnetPodIps, utils.OptionTimeout, "0")
err = targetDestPodIpSet.Refresh(currnetPodIps, netutils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh targetDestPodIpSet: " + err.Error())
}
Expand Down Expand Up @@ -364,7 +372,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

if len(ingressRule.srcPods) != 0 {
srcPodIpSetName := policyIndexedSourcePodIpSetName(policy.namespace, policy.name, i)
srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
srcPodIpSet, err := npc.ipSetHandler.Create(srcPodIpSetName, netutils.TypeHashIP, netutils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
Expand All @@ -375,7 +383,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo
for _, pod := range ingressRule.srcPods {
ingressRuleSrcPodIps = append(ingressRuleSrcPodIps, pod.ip)
}
err = srcPodIpSet.Refresh(ingressRuleSrcPodIps, utils.OptionTimeout, "0")
err = srcPodIpSet.Refresh(ingressRuleSrcPodIps, netutils.OptionTimeout, "0")
if err != nil {
glog.Errorf("failed to refresh srcPodIpSet: " + err.Error())
}
Expand Down Expand Up @@ -457,7 +465,7 @@ func (npc *NetworkPolicyController) processIngressRules(policy networkPolicyInfo

if len(ingressRule.srcIPBlocks) != 0 {
srcIpBlockIpSetName := policyIndexedSourceIpBlockIpSetName(policy.namespace, policy.name, i)
srcIpBlockIpSet, err := npc.ipSetHandler.Create(srcIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
srcIpBlockIpSet, err := npc.ipSetHandler.Create(srcIpBlockIpSetName, netutils.TypeHashNet, netutils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
Expand Down Expand Up @@ -527,7 +535,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,

if len(egressRule.dstPods) != 0 {
dstPodIpSetName := policyIndexedDestinationPodIpSetName(policy.namespace, policy.name, i)
dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, utils.TypeHashIP, utils.OptionTimeout, "0")
dstPodIpSet, err := npc.ipSetHandler.Create(dstPodIpSetName, netutils.TypeHashIP, netutils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
Expand All @@ -538,7 +546,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
for _, pod := range egressRule.dstPods {
egressRuleDstPodIps = append(egressRuleDstPodIps, pod.ip)
}
dstPodIpSet.Refresh(egressRuleDstPodIps, utils.OptionTimeout, "0")
dstPodIpSet.Refresh(egressRuleDstPodIps, netutils.OptionTimeout, "0")

if len(egressRule.ports) != 0 {
// case where 'ports' details and 'from' details specified in the egress rule
Expand Down Expand Up @@ -616,7 +624,7 @@ func (npc *NetworkPolicyController) processEgressRules(policy networkPolicyInfo,
}
if len(egressRule.dstIPBlocks) != 0 {
dstIpBlockIpSetName := policyIndexedDestinationIpBlockIpSetName(policy.namespace, policy.name, i)
dstIpBlockIpSet, err := npc.ipSetHandler.Create(dstIpBlockIpSetName, utils.TypeHashNet, utils.OptionTimeout, "0")
dstIpBlockIpSet, err := npc.ipSetHandler.Create(dstIpBlockIpSetName, netutils.TypeHashNet, netutils.OptionTimeout, "0")
if err != nil {
return fmt.Errorf("failed to create ipset: %s", err.Error())
}
Expand Down Expand Up @@ -901,13 +909,13 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains, activePolicyIPSets

cleanupPodFwChains := make([]string, 0)
cleanupPolicyChains := make([]string, 0)
cleanupPolicyIPSets := make([]*utils.Set, 0)
cleanupPolicyIPSets := make([]*netutils.Set, 0)

iptablesCmdHandler, err := iptables.New()
if err != nil {
glog.Fatalf("failed to initialize iptables command executor due to %s", err.Error())
}
ipsets, err := utils.NewIPSet(false)
ipsets, err := netutils.NewIPSet()
if err != nil {
glog.Fatalf("failed to create ipsets command executor due to %s", err.Error())
}
Expand Down Expand Up @@ -1324,15 +1332,15 @@ func (npc *NetworkPolicyController) evalIPBlockPeer(peer networking.NetworkPolic
ipBlock := make([][]string, 0)
if peer.PodSelector == nil && peer.NamespaceSelector == nil && peer.IPBlock != nil {
if cidr := peer.IPBlock.CIDR; strings.HasSuffix(cidr, "/0") {
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0"}, []string{"128.0.0.0/1", utils.OptionTimeout, "0"})
ipBlock = append(ipBlock, []string{"0.0.0.0/1", netutils.OptionTimeout, "0"}, []string{"128.0.0.0/1", netutils.OptionTimeout, "0"})
} else {
ipBlock = append(ipBlock, []string{cidr, utils.OptionTimeout, "0"})
ipBlock = append(ipBlock, []string{cidr, netutils.OptionTimeout, "0"})
}
for _, except := range peer.IPBlock.Except {
if strings.HasSuffix(except, "/0") {
ipBlock = append(ipBlock, []string{"0.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch}, []string{"128.0.0.0/1", utils.OptionTimeout, "0", utils.OptionNoMatch})
ipBlock = append(ipBlock, []string{"0.0.0.0/1", netutils.OptionTimeout, "0", netutils.OptionNoMatch}, []string{"128.0.0.0/1", netutils.OptionTimeout, "0", netutils.OptionNoMatch})
} else {
ipBlock = append(ipBlock, []string{except, utils.OptionTimeout, "0", utils.OptionNoMatch})
ipBlock = append(ipBlock, []string{except, netutils.OptionTimeout, "0", netutils.OptionNoMatch})
}
}
}
Expand Down Expand Up @@ -1524,7 +1532,7 @@ func (npc *NetworkPolicyController) Cleanup() {
}

// delete all ipsets
ipset, err := utils.NewIPSet(false)
ipset, err := netutils.NewIPSet()
if err != nil {
glog.Errorf("Failed to clean up ipsets: " + err.Error())
}
Expand Down Expand Up @@ -1629,7 +1637,7 @@ func NewNetworkPolicyController(clientset kubernetes.Interface,
}
npc.nodeIP = nodeIP

ipset, err := utils.NewIPSet(false)
ipset, err := netutils.NewIPSet()
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit cdd3efe

Please sign in to comment.