Skip to content

Commit

Permalink
Block Pod traffic until its NetworkPolicies are realized
Browse files Browse the repository at this point in the history
Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Nov 13, 2023
1 parent af1e32d commit 6831d57
Show file tree
Hide file tree
Showing 21 changed files with 726 additions and 192 deletions.
7 changes: 4 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ func run(o *Options) error {
// after rule deletion.
asyncRuleDeleteInterval := o.pollInterval
antreaPolicyEnabled := features.DefaultFeatureGate.Enabled(features.AntreaPolicy)
// In Antrea agent, status manager will automatically be enabled if
// In Antrea agent, status report will automatically be enabled if
// AntreaPolicy feature is enabled.
statusManagerEnabled := antreaPolicyEnabled
statusReportEnabled := antreaPolicyEnabled

var auditLoggerOptions *networkpolicy.AuditLoggerOptions
auditLoggerOptions = &networkpolicy.AuditLoggerOptions{
Expand All @@ -458,6 +458,7 @@ func run(o *Options) error {
}
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
localPodInformer,
ofClient,
ifaceStore,
nodeKey,
Expand All @@ -468,7 +469,7 @@ func run(o *Options) error {
antreaPolicyEnabled,
l7NetworkPolicyEnabled,
o.enableAntreaProxy,
statusManagerEnabled,
statusReportEnabled,
multicastEnabled,
auditLoggerOptions,
asyncRuleDeleteInterval,
Expand Down
44 changes: 27 additions & 17 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ func (c *ruleCache) getNetworkPolicy(uid string) *v1beta.NetworkPolicy {
return policy
}

func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *querier.NetworkPolicyQueryFilter) []v1beta.NetworkPolicy {
// getAppliedNetworkPolicyRules returns the rules applied to the given Pod.
func (c *ruleCache) getAppliedNetworkPolicyRules(namespace, pod string) map[string]*rule {
var groups []string
memberPod := &v1beta.GroupMember{Pod: &v1beta.PodReference{Name: pod, Namespace: namespace}}
c.appliedToSetLock.RLock()
Expand All @@ -254,24 +255,33 @@ func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *q
}
c.appliedToSetLock.RUnlock()

rules := map[string]*rule{}
for _, group := range groups {
objs, _ := c.rules.ByIndex(appliedToGroupIndex, group)
for _, obj := range objs {
rule := obj.(*rule)
rules[rule.ID] = rule
}
}
return rules
}

func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *querier.NetworkPolicyQueryFilter) []v1beta.NetworkPolicy {
var policies []v1beta.NetworkPolicy
policyKeys := sets.New[string]()
for _, group := range groups {
rules, _ := c.rules.ByIndex(appliedToGroupIndex, group)
for _, ruleObj := range rules {
rule := ruleObj.(*rule)
if policyKeys.Has(string(rule.PolicyUID)) {
continue
}
np := c.getNetworkPolicy(string(rule.PolicyUID))
// The Policy might be removed during the query.
if np == nil {
continue
}
if c.networkPolicyMatchFilter(npFilter, np) {
policies = append(policies, *np)
policyKeys.Insert(string(rule.PolicyUID))
}
rules := c.getAppliedNetworkPolicyRules(namespace, pod)
for _, rule := range rules {
if policyKeys.Has(string(rule.PolicyUID)) {
continue
}
np := c.getNetworkPolicy(string(rule.PolicyUID))
// The Policy might be removed during the query.
if np == nil {
continue
}
if c.networkPolicyMatchFilter(npFilter, np) {
policies = append(policies, *np)
policyKeys.Insert(string(rule.PolicyUID))
}
}
return policies
Expand Down
68 changes: 68 additions & 0 deletions pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"antrea.io/antrea/pkg/querier"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -794,6 +795,73 @@ func TestRuleCacheDeleteNetworkPolicy(t *testing.T) {
}
}

func TestRuleCacheGetAppliedNetworkPolicies(t *testing.T) {
c, _, _, _ := newFakeRuleCache()
policies := []*v1beta2.NetworkPolicy{
{
ObjectMeta: metav1.ObjectMeta{UID: "policy1", Name: "policy1"},
Rules: []v1beta2.NetworkPolicyRule{
{
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}},
},
{
Direction: v1beta2.DirectionOut,
To: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}},
},
},
AppliedToGroups: []string{"appliedToGroup1"},
SourceRef: &v1beta2.NetworkPolicyReference{
Type: v1beta2.K8sNetworkPolicy,
Namespace: "ns1",
Name: "name1",
UID: "policy1",
},
},
{
ObjectMeta: metav1.ObjectMeta{UID: "policy2", Name: "policy2"},
Rules: []v1beta2.NetworkPolicyRule{
{
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup2"}},
},
},
AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2"},
SourceRef: &v1beta2.NetworkPolicyReference{
Type: v1beta2.K8sNetworkPolicy,
Namespace: "ns2",
Name: "name2",
UID: "policy2",
},
},
}
appliedToGroups := []*v1beta2.AppliedToGroup{
{
ObjectMeta: metav1.ObjectMeta{Name: "appliedToGroup1"},
GroupMembers: []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "ns1")},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "appliedToGroup2"},
GroupMembers: []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "ns1"), *newAppliedToGroupMemberPod("pod2", "ns2")},
},
}
c.ReplaceNetworkPolicies(policies)
c.ReplaceAppliedToGroups(appliedToGroups)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns1", "pod1"), 3)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns2", "pod2"), 1)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns1", "non-existing-pod"), 0)
assert.Len(t, c.getAppliedNetworkPolicyRules("non-existing-ns", "pod1"), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{}), 2)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{Name: "policy1"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceName: "name1"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{Namespace: "ns2"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceType: v1beta2.K8sNetworkPolicy}), 2)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceType: v1beta2.AntreaClusterNetworkPolicy}), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod2", "ns2", &querier.NetworkPolicyQueryFilter{}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("non-existing-pod", "ns1", &querier.NetworkPolicyQueryFilter{}), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "non-existing-ns", &querier.NetworkPolicyQueryFilter{}), 0)
}

func TestRuleCacheGetCompletedRule(t *testing.T) {
addressGroup1 := v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.1"), newAddressGroupMember("1.1.1.2"))
addressGroup2 := v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.3"), newAddressGroupMember("1.1.1.2"))
Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer
ipv6Enabled: v6Enabled,
gwPort: gwPort,
}
if controller.ofClient != nil {
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil {
return nil, fmt.Errorf("failed to install flow for DNS response interception: %w", err)
}
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil {
return nil, fmt.Errorf("failed to install flow for DNS response interception: %w", err)
}
if dnsServerOverride != "" {
klog.InfoS("DNS server override provided by user", "dnsServer", dnsServerOverride)
Expand Down
Loading

0 comments on commit 6831d57

Please sign in to comment.