Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add network policy related metadata to flow records #2163

Merged
merged 1 commit into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions build/yamls/elk-flow-collector/logstash/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,22 @@ def filter(event)
event.remove("[ipfix][egressNetworkPolicyNamespace]")
event.set("[ipfix][egressNetworkPolicyNamespace]", "N/A")
end
ingressNetworkPolicyType = event.get("[ipfix][ingressNetworkPolicyType]")
if ingressNetworkPolicyType == 1
event.set("[ipfix][ingressNetworkPolicyTypeStr]", "K8s NetworkPolicy")
elsif ingressNetworkPolicyType == 2
event.set("[ipfix][ingressNetworkPolicyTypeStr]", "Antrea NetworkPolicy")
elsif ingressNetworkPolicyType == 3
event.set("[ipfix][ingressNetworkPolicyTypeStr]", "Antrea ClusterNetworkPolicy")
end
egressNetworkPolicyType = event.get("[ipfix][egressNetworkPolicyType]")
if egressNetworkPolicyType == 1
event.set("[ipfix][egressNetworkPolicyTypeStr]", "K8s NetworkPolicy")
elsif egressNetworkPolicyType == 2
event.set("[ipfix][egressNetworkPolicyTypeStr]", "Antrea NetworkPolicy")
elsif egressNetworkPolicyType == 3
event.set("[ipfix][egressNetworkPolicyTypeStr]", "Antrea ClusterNetworkPolicy")
end
key = event.get("[ipfix][flowKey]")
if @@time_map.has_key?(key)
t = DateTime.strptime(event.get("[ipfix][flowEndSeconds]").to_s, '%Y-%m-%dT%H:%M:%S').to_time.to_i
Expand Down
13 changes: 13 additions & 0 deletions build/yamls/elk-flow-collector/logstash/ipfix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4070,6 +4070,12 @@
113:
- :string
- :egressNetworkPolicyNamespace
115:
- :uint8
- :ingressNetworkPolicyType
118:
- :uint8
- :egressNetworkPolicyType
120:
- :uint64
- :packetTotalCountFromSourceNode
Expand Down Expand Up @@ -4130,3 +4136,10 @@
140:
- :uint8
- :egressNetworkPolicyRuleAction
141:
- :string
- :ingressNetworkPolicyRuleName
142:
- :string
- :egressNetworkPolicyRuleName

44 changes: 24 additions & 20 deletions docs/network-flow-visibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,26 +163,30 @@ the flow. All the IEs used by the Antrea Flow Exporter are listed below:

#### IEs from Antrea IE Registry

| IPFIX Information Element | Enterprise ID | Field ID | Type |
|-------------------------------|---------------|----------|-------------|
| sourcePodNamespace | 56506 | 100 | string |
| sourcePodName | 56506 | 101 | string |
| destinationPodNamespace | 56506 | 102 | string |
| destinationPodName | 56506 | 103 | string |
| sourceNodeName | 56506 | 104 | string |
| destinationNodeName | 56506 | 105 | string |
| destinationClusterIPv4 | 56506 | 106 | ipv4Address |
| destinationClusterIPv6 | 56506 | 107 | ipv6Address |
| destinationServicePort | 56506 | 108 | unsigned16 |
| destinationServicePortName | 56506 | 109 | string |
| ingressNetworkPolicyName | 56506 | 110 | string |
| ingressNetworkPolicyNamespace | 56506 | 111 | string |
| egressNetworkPolicyName | 56506 | 112 | string |
| egressNetworkPolicyNamespace | 56506 | 113 | string |
| tcpState | 56506 | 136 | string |
| flowType | 56506 | 137 | unsigned8 |
| ingressNetworkPolicyRuleAction| 56506 | 139 | unsigned8 |
| egressNetworkPolicyRuleAction | 56506 | 140 | unsigned8 |
| IPFIX Information Element | Enterprise ID | Field ID | Type |
|----------------------------------|---------------|----------|-------------|
| sourcePodNamespace | 56506 | 100 | string |
| sourcePodName | 56506 | 101 | string |
| destinationPodNamespace | 56506 | 102 | string |
| destinationPodName | 56506 | 103 | string |
| sourceNodeName | 56506 | 104 | string |
| destinationNodeName | 56506 | 105 | string |
| destinationClusterIPv4 | 56506 | 106 | ipv4Address |
| destinationClusterIPv6 | 56506 | 107 | ipv6Address |
| destinationServicePort | 56506 | 108 | unsigned16 |
| destinationServicePortName | 56506 | 109 | string |
| ingressNetworkPolicyName | 56506 | 110 | string |
| ingressNetworkPolicyNamespace | 56506 | 111 | string |
| ingressNetworkPolicyType | 56506 | 115 | unsigned8 |
| ingressNetworkPolicyRuleName | 56506 | 141 | string |
| egressNetworkPolicyName | 56506 | 112 | string |
| egressNetworkPolicyNamespace | 56506 | 113 | string |
| egressNetworkPolicyType | 56506 | 118 | unsigned8 |
| egressNetworkPolicyRuleName | 56506 | 142 | string |
| ingressNetworkPolicyRuleAction | 56506 | 139 | unsigned8 |
| egressNetworkPolicyRuleAction | 56506 | 140 | unsigned8 |
| tcpState | 56506 | 136 | string |
| flowType | 56506 | 137 | unsigned8 |

### Supported capabilities

Expand Down
12 changes: 10 additions & 2 deletions pkg/agent/controller/networkpolicy/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/contiv/libOpenflow/openflow13"
"github.com/contiv/libOpenflow/protocol"
"github.com/contiv/ofnet/ofctrl"
"github.com/vmware/go-ipfix/pkg/registry"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -380,8 +381,10 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {

// For K8s NetworkPolicy implicit drop action, we cannot get name/namespace.
if tableID == openflow.IngressDefaultTable {
denyConn.IngressNetworkPolicyType = registry.PolicyTypeK8sNetworkPolicy
denyConn.IngressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
} else if tableID == openflow.EgressDefaultTable {
denyConn.EgressNetworkPolicyType = registry.PolicyTypeK8sNetworkPolicy
denyConn.EgressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
} else { // Get name and namespace for Antrea Network Policy or Antrea Cluster Network Policy
// Set match to corresponding ingress/egress reg according to disposition
Expand All @@ -391,18 +394,23 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
return fmt.Errorf("error when obtaining rule id from reg: %v", err)
}
policy := c.GetNetworkPolicyByRuleFlowID(ruleID)
rule := c.GetRuleByFlowID(ruleID)

if policy == nil {
if policy == nil || rule == nil {
// Default drop by K8s NetworkPolicy
klog.V(4).Infof("Cannot find NetworkPolicy that has ruleID %v", ruleID)
klog.V(4).Infof("Cannot find NetworkPolicy or rule that has ruleID %v", ruleID)
} else {
if tableID == openflow.AntreaPolicyIngressRuleTable {
denyConn.IngressNetworkPolicyName = policy.Name
denyConn.IngressNetworkPolicyNamespace = policy.Namespace
denyConn.IngressNetworkPolicyType = flowexporter.PolicyTypeToUint8(policy.Type)
denyConn.IngressNetworkPolicyRuleName = rule.Name
denyConn.IngressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
} else if tableID == openflow.AntreaPolicyEgressRuleTable {
denyConn.EgressNetworkPolicyName = policy.Name
denyConn.EgressNetworkPolicyNamespace = policy.Namespace
denyConn.EgressNetworkPolicyType = flowexporter.PolicyTypeToUint8(policy.Type)
denyConn.EgressNetworkPolicyRuleName = rule.Name
denyConn.EgressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(disposition)
}
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/agent/flowexporter/connections/conntrack_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,31 @@ func (cs *ConntrackConnectionStore) addNetworkPolicyMetadata(conn *flowexporter.
egressOfID := binary.LittleEndian.Uint32(conn.Labels[4:8])
if ingressOfID != 0 {
policy := cs.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(ingressOfID)
if policy == nil {
rule := cs.networkPolicyQuerier.GetRuleByFlowID(ingressOfID)
if policy == nil || rule == nil {
// This should not happen because the rule flow ID to rule mapping is
// preserved for max(5s, flowPollInterval) even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has rule with ingressOfID %v", ingressOfID)
klog.Warningf("Cannot find NetworkPolicy or rule with ingressOfID %v", ingressOfID)
} else {
conn.IngressNetworkPolicyName = policy.Name
conn.IngressNetworkPolicyNamespace = policy.Namespace
conn.IngressNetworkPolicyType = flowexporter.PolicyTypeToUint8(policy.Type)
conn.IngressNetworkPolicyRuleName = rule.Name
heanlan marked this conversation as resolved.
Show resolved Hide resolved
conn.IngressNetworkPolicyRuleAction = registry.NetworkPolicyRuleActionAllow
}
}
if egressOfID != 0 {
policy := cs.networkPolicyQuerier.GetNetworkPolicyByRuleFlowID(egressOfID)
if policy == nil {
rule := cs.networkPolicyQuerier.GetRuleByFlowID(egressOfID)
if policy == nil || rule == nil {
// This should not happen because the rule flow ID to rule mapping is
// preserved for max(5s, flowPollInterval) even after the rule deletion.
klog.Warningf("Cannot find NetworkPolicy that has rule with egressOfID %v", egressOfID)
klog.Warningf("Cannot find NetworkPolicy or rule with egressOfID %v", egressOfID)
} else {
conn.EgressNetworkPolicyName = policy.Name
conn.EgressNetworkPolicyNamespace = policy.Namespace
conn.EgressNetworkPolicyType = flowexporter.PolicyTypeToUint8(policy.Type)
conn.EgressNetworkPolicyRuleName = rule.Name
conn.EgressNetworkPolicyRuleAction = registry.NetworkPolicyRuleActionAllow
}
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/binary"
"fmt"
"net"
"strings"
Expand All @@ -36,11 +37,58 @@ import (
interfacestoretest "antrea.io/antrea/pkg/agent/interfacestore/testing"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow"
ofclient "antrea.io/antrea/pkg/agent/openflow"
proxytest "antrea.io/antrea/pkg/agent/proxy/testing"
agenttypes "antrea.io/antrea/pkg/agent/types"
cpv1beta "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
secv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
queriertest "antrea.io/antrea/pkg/querier/testing"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

var (
np1 = cpv1beta.NetworkPolicyReference{
Type: cpv1beta.K8sNetworkPolicy,
Namespace: "foo",
Name: "bar",
UID: "uid1",
}
np2 = cpv1beta.NetworkPolicyReference{
Type: cpv1beta.AntreaNetworkPolicy,
Namespace: "foo",
Name: "baz",
UID: "uid2",
}
rule1 = agenttypes.PolicyRule{
Direction: cpv1beta.DirectionIn,
From: []agenttypes.Address{},
To: []agenttypes.Address{},
Service: []cpv1beta.Service{},
Action: nil,
Priority: nil,
Name: "",
FlowID: uint32(0),
TableID: ofclient.IngressRuleTable,
PolicyRef: &np1,
EnableLogging: false,
}
priority = uint16(50000)
action = secv1alpha1.RuleActionAllow
rule2 = agenttypes.PolicyRule{
Direction: cpv1beta.DirectionOut,
From: []agenttypes.Address{},
To: []agenttypes.Address{},
Service: []cpv1beta.Service{},
Action: &action,
Priority: &priority,
Name: "allow",
FlowID: uint32(0),
TableID: ofclient.EgressRuleTable,
PolicyRef: &np2,
EnableLogging: false,
}
)

func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -200,6 +248,27 @@ func TestConntrackConnectionStore_AddOrUpdateConn(t *testing.T) {
serviceStr := fmt.Sprintf("%s:%d/%s", expConn.DestinationServiceAddress.String(), expConn.DestinationServicePort, protocol)
mockProxier.EXPECT().GetServiceByIP(serviceStr).Return(servicePortName, true)
expConn.DestinationServicePortName = servicePortName.String()
case 4:
// Tests NetworkPolicy mapping.
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.FlowKey.SourceAddress.String()).Return(nil, false)
mockIfaceStore.EXPECT().GetInterfaceByIP(expConn.FlowKey.SourceAddress.String()).Return(nil, false)

ingressOfID := binary.LittleEndian.Uint32(test.flow.Labels[:4])
npQuerier.EXPECT().GetNetworkPolicyByRuleFlowID(ingressOfID).Return(&np1)
npQuerier.EXPECT().GetRuleByFlowID(ingressOfID).Return(&rule1)
expConn.IngressNetworkPolicyName = np1.Name
expConn.IngressNetworkPolicyNamespace = np1.Namespace
expConn.IngressNetworkPolicyType = flowexporter.PolicyTypeToUint8(np1.Type)
expConn.IngressNetworkPolicyRuleName = rule1.Name

egressOfID := binary.LittleEndian.Uint32(test.flow.Labels[4:8])
npQuerier.EXPECT().GetNetworkPolicyByRuleFlowID(egressOfID).Return(&np2)
npQuerier.EXPECT().GetRuleByFlowID(egressOfID).Return(&rule2)
expConn.EgressNetworkPolicyName = np2.Name
expConn.EgressNetworkPolicyNamespace = np2.Namespace
expConn.EgressNetworkPolicyType = flowexporter.PolicyTypeToUint8(np2.Type)
expConn.EgressNetworkPolicyRuleName = rule2.Name
expConn.EgressNetworkPolicyRuleAction = flowexporter.RuleActionToUint8(string(*rule2.Action))
}
conntrackConnStore.AddOrUpdateConn(&test.flow)
actualConn, ok := conntrackConnStore.GetConnByKey(flowTuple)
Expand Down
20 changes: 20 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,13 @@ var (
"destinationServicePortName",
"ingressNetworkPolicyName",
"ingressNetworkPolicyNamespace",
"ingressNetworkPolicyType",
"ingressNetworkPolicyRuleName",
"ingressNetworkPolicyRuleAction",
"egressNetworkPolicyName",
"egressNetworkPolicyNamespace",
"egressNetworkPolicyType",
"egressNetworkPolicyRuleName",
"egressNetworkPolicyRuleAction",
"tcpState",
"flowType",
Expand Down Expand Up @@ -537,12 +541,20 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
ie.Value = record.Conn.IngressNetworkPolicyName
case "ingressNetworkPolicyNamespace":
ie.Value = record.Conn.IngressNetworkPolicyNamespace
case "ingressNetworkPolicyType":
ie.Value = record.Conn.IngressNetworkPolicyType
case "ingressNetworkPolicyRuleName":
ie.Value = record.Conn.IngressNetworkPolicyRuleName
case "ingressNetworkPolicyRuleAction":
ie.Value = record.Conn.IngressNetworkPolicyRuleAction
case "egressNetworkPolicyName":
ie.Value = record.Conn.EgressNetworkPolicyName
case "egressNetworkPolicyNamespace":
ie.Value = record.Conn.EgressNetworkPolicyNamespace
case "egressNetworkPolicyType":
ie.Value = record.Conn.EgressNetworkPolicyType
case "egressNetworkPolicyRuleName":
ie.Value = record.Conn.EgressNetworkPolicyRuleName
case "egressNetworkPolicyRuleAction":
ie.Value = record.Conn.EgressNetworkPolicyRuleAction
case "tcpState":
Expand Down Expand Up @@ -655,12 +667,20 @@ func (exp *flowExporter) addDenyConnToSet(conn *flowexporter.Connection, flowEnd
ie.Value = conn.IngressNetworkPolicyName
case "ingressNetworkPolicyNamespace":
ie.Value = conn.IngressNetworkPolicyNamespace
case "ingressNetworkPolicyType":
ie.Value = conn.IngressNetworkPolicyType
case "ingressNetworkPolicyRuleName":
ie.Value = conn.IngressNetworkPolicyRuleName
case "ingressNetworkPolicyRuleAction":
ie.Value = conn.IngressNetworkPolicyRuleAction
case "egressNetworkPolicyName":
ie.Value = conn.EgressNetworkPolicyName
case "egressNetworkPolicyNamespace":
ie.Value = conn.EgressNetworkPolicyNamespace
case "egressNetworkPolicyType":
ie.Value = conn.EgressNetworkPolicyType
case "egressNetworkPolicyRuleName":
ie.Value = conn.EgressNetworkPolicyRuleName
case "egressNetworkPolicyRuleAction":
ie.Value = conn.EgressNetworkPolicyRuleAction
case "tcpState":
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
ipfixentitiestesting "github.com/vmware/go-ipfix/pkg/entities/testing"
"github.com/vmware/go-ipfix/pkg/registry"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"

"antrea.io/antrea/pkg/agent/flowexporter"
Expand Down Expand Up @@ -244,6 +245,10 @@ func getElemList(ianaIE []string, antreaIE []string) []*ipfixentities.InfoElemen
elemList[i] = ipfixentities.NewInfoElementWithValue(ie.Element, "")
case "ingressNetworkPolicyName", "ingressNetworkPolicyNamespace", "egressNetworkPolicyName", "egressNetworkPolicyNamespace":
elemList[i] = ipfixentities.NewInfoElementWithValue(ie.Element, "")
case "ingressNetworkPolicyRuleName", "egressNetworkPolicyRuleName":
elemList[i] = ipfixentities.NewInfoElementWithValue(ie.Element, "")
case "ingressNetworkPolicyType", "egressNetworkPolicyType", "ingressNetworkPolicyRuleAction", "egressNetworkPolicyRuleAction":
elemList[i] = ipfixentities.NewInfoElementWithValue(ie.Element, uint8(0))
}
}
return elemList
Expand Down Expand Up @@ -274,8 +279,12 @@ func getConnection(isIPv6 bool, isPresent bool, statusFlag uint32, protoID uint8
DestinationPodName: "",
IngressNetworkPolicyName: "",
IngressNetworkPolicyNamespace: "",
IngressNetworkPolicyType: registry.PolicyTypeK8sNetworkPolicy,
IngressNetworkPolicyRuleName: "",
EgressNetworkPolicyName: "np",
EgressNetworkPolicyNamespace: "np-ns",
EgressNetworkPolicyType: registry.PolicyTypeK8sNetworkPolicy,
EgressNetworkPolicyRuleName: "",
DestinationServicePortName: "service",
TCPState: tcpState,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/flowexporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ type Connection struct {
DestinationServicePort uint16
IngressNetworkPolicyName string
IngressNetworkPolicyNamespace string
IngressNetworkPolicyType uint8
IngressNetworkPolicyRuleName string
IngressNetworkPolicyRuleAction uint8
EgressNetworkPolicyName string
EgressNetworkPolicyNamespace string
EgressNetworkPolicyType uint8
EgressNetworkPolicyRuleName string
EgressNetworkPolicyRuleAction uint8
TCPState string
// fields specific to deny connections
Expand Down
Loading