Skip to content

Commit

Permalink
Block a Pod's IP packets until its NetworkPolicies are realized
Browse files Browse the repository at this point in the history
In the previous implementation, traffic from/to a Pod may bypass
NetworkPolicies applied to the Pod in a short time window when the agent
restarts because realizing NetworkPolicies and enabling forwarding are
asynchronous.

However, we can't wait for all NetworkPolicies to be realized before
enabling forwarding of OVS because there are some cases the former
depends on the latter, for example, when proxyAll is enabled, or when
it's a Windows Node, in which cases control-plane communication relies
on the forwarding of OVS.

This patch takes a more fine-grained approach: block a Pod's IP packets
in NetworkPolicy's entry tables until its NetworkPolicies are realized.
This granularity leaves the Node and the hostNetwork Pods' traffic
untouched and makes the realization issue of a Pod's NetworkPolicies
affect the Pod's IP packets only.

The following changes are made to implement the approach:
1. EgressSecurityClassifierTable is now always required. (Previously
   it's only required for ExternalNode, not K8sNode).
2. One flow with low priority dropping traffic from local Pods is
   installed in EgressSecurityClassifierTable, and one flow with low
   priority dropping traffic to local Pods is installed in
   IngressSecurityClassifierTable.
3. When a Pod's NetworkPolicies are fully realized the first time, one
   flow with normal priority allowing traffic from this Pod is installed
   in EgressSecurityClassifierTable to override the above drop action,
   one flow in IngressSecurityClassifierTable did the same for traffic
   to this Pod.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Nov 15, 2023
1 parent 7b624d3 commit f8f0271
Show file tree
Hide file tree
Showing 23 changed files with 877 additions and 206 deletions.
7 changes: 4 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ func run(o *Options) error {
// after rule deletion.
asyncRuleDeleteInterval := o.pollInterval
antreaPolicyEnabled := features.DefaultFeatureGate.Enabled(features.AntreaPolicy)
// In Antrea agent, status manager will automatically be enabled if
// In Antrea agent, status report will automatically be enabled if
// AntreaPolicy feature is enabled.
statusManagerEnabled := antreaPolicyEnabled
statusReportEnabled := antreaPolicyEnabled

var auditLoggerOptions *networkpolicy.AuditLoggerOptions
auditLoggerOptions = &networkpolicy.AuditLoggerOptions{
Expand All @@ -457,6 +457,7 @@ func run(o *Options) error {
}
networkPolicyController, err := networkpolicy.NewNetworkPolicyController(
antreaClientProvider,
localPodInformer,
ofClient,
ifaceStore,
nodeKey,
Expand All @@ -467,7 +468,7 @@ func run(o *Options) error {
antreaPolicyEnabled,
l7NetworkPolicyEnabled,
o.enableAntreaProxy,
statusManagerEnabled,
statusReportEnabled,
multicastEnabled,
auditLoggerOptions,
asyncRuleDeleteInterval,
Expand Down
44 changes: 27 additions & 17 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ func (c *ruleCache) getNetworkPolicy(uid string) *v1beta.NetworkPolicy {
return policy
}

func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *querier.NetworkPolicyQueryFilter) []v1beta.NetworkPolicy {
// getAppliedNetworkPolicyRules returns the rules applied to the given Pod.
func (c *ruleCache) getAppliedNetworkPolicyRules(namespace, pod string) map[string]*rule {
var groups []string
memberPod := &v1beta.GroupMember{Pod: &v1beta.PodReference{Name: pod, Namespace: namespace}}
c.appliedToSetLock.RLock()
Expand All @@ -254,24 +255,33 @@ func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *q
}
c.appliedToSetLock.RUnlock()

rules := map[string]*rule{}
for _, group := range groups {
objs, _ := c.rules.ByIndex(appliedToGroupIndex, group)
for _, obj := range objs {
rule := obj.(*rule)
rules[rule.ID] = rule
}
}
return rules
}

func (c *ruleCache) getAppliedNetworkPolicies(pod, namespace string, npFilter *querier.NetworkPolicyQueryFilter) []v1beta.NetworkPolicy {
var policies []v1beta.NetworkPolicy
policyKeys := sets.New[string]()
for _, group := range groups {
rules, _ := c.rules.ByIndex(appliedToGroupIndex, group)
for _, ruleObj := range rules {
rule := ruleObj.(*rule)
if policyKeys.Has(string(rule.PolicyUID)) {
continue
}
np := c.getNetworkPolicy(string(rule.PolicyUID))
// The Policy might be removed during the query.
if np == nil {
continue
}
if c.networkPolicyMatchFilter(npFilter, np) {
policies = append(policies, *np)
policyKeys.Insert(string(rule.PolicyUID))
}
rules := c.getAppliedNetworkPolicyRules(namespace, pod)
for _, rule := range rules {
if policyKeys.Has(string(rule.PolicyUID)) {
continue
}
np := c.getNetworkPolicy(string(rule.PolicyUID))
// The Policy might be removed during the query.
if np == nil {
continue
}
if c.networkPolicyMatchFilter(npFilter, np) {
policies = append(policies, *np)
policyKeys.Insert(string(rule.PolicyUID))
}
}
return policies
Expand Down
68 changes: 68 additions & 0 deletions pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/apis/controlplane/v1beta2"
"antrea.io/antrea/pkg/querier"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)
Expand Down Expand Up @@ -794,6 +795,73 @@ func TestRuleCacheDeleteNetworkPolicy(t *testing.T) {
}
}

func TestRuleCacheGetAppliedNetworkPolicies(t *testing.T) {
c, _, _, _ := newFakeRuleCache()
policies := []*v1beta2.NetworkPolicy{
{
ObjectMeta: metav1.ObjectMeta{UID: "policy1", Name: "policy1"},
Rules: []v1beta2.NetworkPolicyRule{
{
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}},
},
{
Direction: v1beta2.DirectionOut,
To: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup1"}},
},
},
AppliedToGroups: []string{"appliedToGroup1"},
SourceRef: &v1beta2.NetworkPolicyReference{
Type: v1beta2.K8sNetworkPolicy,
Namespace: "ns1",
Name: "name1",
UID: "policy1",
},
},
{
ObjectMeta: metav1.ObjectMeta{UID: "policy2", Name: "policy2"},
Rules: []v1beta2.NetworkPolicyRule{
{
Direction: v1beta2.DirectionIn,
From: v1beta2.NetworkPolicyPeer{AddressGroups: []string{"addressGroup2"}},
},
},
AppliedToGroups: []string{"appliedToGroup1", "appliedToGroup2"},
SourceRef: &v1beta2.NetworkPolicyReference{
Type: v1beta2.K8sNetworkPolicy,
Namespace: "ns2",
Name: "name2",
UID: "policy2",
},
},
}
appliedToGroups := []*v1beta2.AppliedToGroup{
{
ObjectMeta: metav1.ObjectMeta{Name: "appliedToGroup1"},
GroupMembers: []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "ns1")},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "appliedToGroup2"},
GroupMembers: []v1beta2.GroupMember{*newAppliedToGroupMemberPod("pod1", "ns1"), *newAppliedToGroupMemberPod("pod2", "ns2")},
},
}
c.ReplaceNetworkPolicies(policies)
c.ReplaceAppliedToGroups(appliedToGroups)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns1", "pod1"), 3)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns2", "pod2"), 1)
assert.Len(t, c.getAppliedNetworkPolicyRules("ns1", "non-existing-pod"), 0)
assert.Len(t, c.getAppliedNetworkPolicyRules("non-existing-ns", "pod1"), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{}), 2)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{Name: "policy1"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceName: "name1"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{Namespace: "ns2"}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceType: v1beta2.K8sNetworkPolicy}), 2)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "ns1", &querier.NetworkPolicyQueryFilter{SourceType: v1beta2.AntreaClusterNetworkPolicy}), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod2", "ns2", &querier.NetworkPolicyQueryFilter{}), 1)
assert.Len(t, c.getAppliedNetworkPolicies("non-existing-pod", "ns1", &querier.NetworkPolicyQueryFilter{}), 0)
assert.Len(t, c.getAppliedNetworkPolicies("pod1", "non-existing-ns", &querier.NetworkPolicyQueryFilter{}), 0)
}

func TestRuleCacheGetCompletedRule(t *testing.T) {
addressGroup1 := v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.1"), newAddressGroupMember("1.1.1.2"))
addressGroup2 := v1beta2.NewGroupMemberSet(newAddressGroupMember("1.1.1.3"), newAddressGroupMember("1.1.1.2"))
Expand Down
6 changes: 2 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ func newFQDNController(client openflow.Client, allocator *idAllocator, dnsServer
ipv6Enabled: v6Enabled,
gwPort: gwPort,
}
if controller.ofClient != nil {
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil {
return nil, fmt.Errorf("failed to install flow for DNS response interception: %w", err)
}
if err := controller.ofClient.NewDNSPacketInConjunction(dnsInterceptRuleID); err != nil {
return nil, fmt.Errorf("failed to install flow for DNS response interception: %w", err)
}
if dnsServerOverride != "" {
klog.InfoS("DNS server override provided by user", "dnsServer", dnsServerOverride)
Expand Down
Loading

0 comments on commit f8f0271

Please sign in to comment.