Skip to content

Commit

Permalink
Allow Node access all local Pods to perform probes (#104)
Browse files Browse the repository at this point in the history
Kubernetes Nodes need to perform liveness and readiness probes which
might be an HTTP and TCP request, it requires NetworkPolicy
implementation always allow a Node access all its local Pods.

This patch appends the Node gateway IP which will be used when a Node
accesses Pods to the FromAddresses of the CompletedRule to achieve it.
  • Loading branch information
tnqn authored Nov 18, 2019
1 parent 6e33a61 commit fe07367
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func run(o *Options) error {
ofClient,
nodeConfig)

networkPolicyController := networkpolicy.NewNetworkPolicyController(antreaClient, ofClient, ifaceStore, nodeConfig.Name)
networkPolicyController := networkpolicy.NewNetworkPolicyController(antreaClient, ofClient, ifaceStore, nodeConfig.Name, nodeConfig.IP.String())

cniServer := cniserver.New(
o.config.CNISocket,
Expand Down
21 changes: 16 additions & 5 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type ruleCache struct {
rules cache.Indexer
// dirtyRuleHandler is a callback that is run upon finding a rule out-of-sync.
dirtyRuleHandler func(string)

// defaultFromAddresses is a list of addresses which will be in the FromAddresses of
// all Ingress rules.
defaultFromAddresses []string
}

// ruleKeyFunc knows how to get key of a *rule.
Expand Down Expand Up @@ -129,16 +133,17 @@ func policyIndexFunc(obj interface{}) ([]string, error) {
}

// newRuleCache returns a new *ruleCache.
func newRuleCache(dirtyRuleHandler func(string)) *ruleCache {
func newRuleCache(dirtyRuleHandler func(string), defaultFromAddresses []string) *ruleCache {
rules := cache.NewIndexer(
ruleKeyFunc,
cache.Indexers{addressGroupIndex: addressGroupIndexFunc, appliedToGroupIndex: appliedToGroupIndexFunc, policyIndex: policyIndexFunc},
)
return &ruleCache{
podSetByGroup: make(map[string]podSet),
addressSetByGroup: make(map[string]sets.String),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
podSetByGroup: make(map[string]podSet),
addressSetByGroup: make(map[string]sets.String),
rules: rules,
dirtyRuleHandler: dirtyRuleHandler,
defaultFromAddresses: defaultFromAddresses,
}
}

Expand Down Expand Up @@ -311,6 +316,12 @@ func (c *ruleCache) GetCompletedRule(ruleID string) (completedRule *CompletedRul
var fromAddresses, toAddresses sets.String
if r.Direction == v1beta1.DirectionIn {
fromAddresses, completed = c.unionAddressGroups(r.From.AddressGroups)

if completed {
for _, address := range c.defaultFromAddresses {
fromAddresses.Insert(address)
}
}
} else {
toAddresses, completed = c.unionAddressGroups(r.To.AddressGroups)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestRuleCacheAddAddressGroup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
for _, rule := range tt.rules {
c.rules.Add(rule)
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestRuleCacheAddAppliedToGroup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
for _, rule := range tt.rules {
c.rules.Add(rule)
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestRuleCacheAddNetworkPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
c.AddNetworkPolicy(tt.args)
actualRules := c.rules.List()
if !assert.ElementsMatch(t, tt.expectedRules, actualRules) {
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestRuleCacheDeleteNetworkPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
for _, rule := range tt.rules {
c.rules.Add(rule)
}
Expand All @@ -389,8 +389,8 @@ func TestRuleCacheDeleteNetworkPolicy(t *testing.T) {
}

func TestRuleCacheGetCompletedRule(t *testing.T) {
addressGroup1 := sets.NewString("1.1.1.1", "1.1.1.2")
addressGroup2 := sets.NewString("1.1.1.2", "1.1.1.3")
addressGroup1 := sets.NewString("1.1.1.1", "1.1.1.2", "192.168.1.1")
addressGroup2 := sets.NewString("1.1.1.2", "1.1.1.3", "192.168.1.1")
appliedToGroup1 := newPodSet(v1beta1.PodReference{"pod1", "ns1"}, v1beta1.PodReference{"pod2", "ns1"})
appliedToGroup2 := newPodSet(v1beta1.PodReference{"pod2", "ns1"}, v1beta1.PodReference{"pod3", "ns1"})
rule1 := &rule{
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestRuleCacheGetCompletedRule(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
c.addressSetByGroup["addressGroup1"] = addressGroup1
c.addressSetByGroup["addressGroup2"] = addressGroup2
c.podSetByGroup["appliedToGroup1"] = appliedToGroup1
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
c.podSetByGroup = tt.podSetByGroup
for _, rule := range tt.rules {
c.rules.Add(rule)
Expand Down Expand Up @@ -623,7 +623,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
c.addressSetByGroup = tt.addressSetByGroup
for _, rule := range tt.rules {
c.rules.Add(rule)
Expand Down Expand Up @@ -699,7 +699,7 @@ func TestRuleCacheUpdateNetworkPolicy(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
recorder := newDirtyRuleRecorder()
c := newRuleCache(recorder.Record)
c := newRuleCache(recorder.Record, []string{"192.168.1.1"})
for _, rule := range tt.rules {
c.rules.Add(rule)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ type Controller struct {
}

// NewNetworkPolicyController returns a new *Controller.
func NewNetworkPolicyController(antreaClient versioned.Interface, ofClient openflow.Client, ifaceStore agent.InterfaceStore, nodeName string) *Controller {
func NewNetworkPolicyController(antreaClient versioned.Interface, ofClient openflow.Client, ifaceStore agent.InterfaceStore, nodeName string, gatewayIP string) *Controller {
c := &Controller{
antreaClient: antreaClient,
nodeName: nodeName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "networkpolicyrule"),
reconciler: newReconciler(ofClient, ifaceStore),
}
c.ruleCache = newRuleCache(c.enqueueRule)
// Set Node gateway IP as the defaultFromAddresses so that Node to Pod traffic will always be allowed.
c.ruleCache = newRuleCache(c.enqueueRule, []string{gatewayIP})
return c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/vmware-tanzu/antrea/pkg/client/clientset/versioned/fake"
)

const gatewayIP = "10.10.10.1"

func newTestController() (*Controller, *fake.Clientset, *mockReconciler) {
clientset := &fake.Clientset{}

controller := NewNetworkPolicyController(clientset, nil, nil, "node1")
controller := NewNetworkPolicyController(clientset, nil, nil, "node1", gatewayIP)
reconciler := newMockReconciler()
controller.reconciler = reconciler
return controller, clientset, reconciler
Expand Down Expand Up @@ -103,7 +105,7 @@ func TestAddSingleGroupRule(t *testing.T) {
services := []v1beta1.Service{{Protocol: &protocolTCP, Port: &port}}
desiredRule := &CompletedRule{
rule: &rule{Direction: v1beta1.DirectionIn, Services: services},
FromAddresses: sets.NewString("1.1.1.1", "2.2.2.2"),
FromAddresses: sets.NewString("1.1.1.1", "2.2.2.2", gatewayIP),
ToAddresses: sets.NewString(),
Pods: newPodSet(v1beta1.PodReference{"pod1", "ns1"}),
}
Expand Down Expand Up @@ -166,7 +168,7 @@ func TestAddMultipleGroupsRule(t *testing.T) {
services := []v1beta1.Service{{Protocol: &protocolTCP, Port: &port}}
desiredRule := &CompletedRule{
rule: &rule{Direction: v1beta1.DirectionIn, Services: services},
FromAddresses: sets.NewString("1.1.1.1", "2.2.2.2", "3.3.3.3"),
FromAddresses: sets.NewString("1.1.1.1", "2.2.2.2", "3.3.3.3", gatewayIP),
ToAddresses: sets.NewString(),
Pods: newPodSet(v1beta1.PodReference{"pod1", "ns1"}, v1beta1.PodReference{"pod2", "ns2"}),
}
Expand Down

0 comments on commit fe07367

Please sign in to comment.