Skip to content

Commit

Permalink
Aggregation process support for deny connections tracking (antrea-io#175
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zyiou committed May 11, 2021
1 parent 9e98e27 commit 5099295
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 19 deletions.
41 changes: 29 additions & 12 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,7 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
a.mutex.Lock()
defer a.mutex.Unlock()

correlationRequired := false
// Get Antrea flowType from the record that will help to determine if correlation
// is required or not.
if ieWithValue, exist := record.GetInfoElementWithValue("flowType"); exist {
if recordFlowType, ok := ieWithValue.Value.(uint8); ok {
// Correlation is required for only InterNode flow type defined in
// pkg/registry/registry.go.
if recordFlowType == registry.FlowTypeInterNode {
correlationRequired = true
}
}
}
correlationRequired := isCorrelationRequired(record)

aggregationRecord, exist := a.flowKeyRecordMap[*flowKey]
if exist {
Expand Down Expand Up @@ -267,6 +256,14 @@ func (a *AggregationProcess) correlateRecords(incomingRecord, existingRecord ent
}
existingIeWithValue.Value = ieWithValue.Value
}
case entities.Unsigned8:
if ieWithValue.Value != uint8(0) {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
if existingIeWithValue.Value != uint8(0) {
klog.Warningf("%v field should not have been filled in the existing record; existing value: %v and current value: %v", field, existingIeWithValue.Value, ieWithValue.Value)
}
existingIeWithValue.Value = ieWithValue.Value
}
case entities.Unsigned16:
if ieWithValue.Value != uint16(0) {
existingIeWithValue, _ := existingRecord.GetInfoElementWithValue(field)
Expand Down Expand Up @@ -621,3 +618,23 @@ func validateDataRecord(record entities.Record) bool {
}
return true
}

// isCorrelationRequired returns true when flowType is InterNode and
// egressNetworkPolicyRuleAction is not deny (drop/reject).
func isCorrelationRequired(record entities.Record) bool {
if ieWithValue, exist := record.GetInfoElementWithValue("flowType"); exist {
if recordFlowType, ok := ieWithValue.Value.(uint8); ok {
if recordFlowType == registry.FlowTypeInterNode {
if egressRuleActionIe, exist := record.GetInfoElementWithValue("egressNetworkPolicyRuleAction"); exist {
if egressRuleAction, ok := egressRuleActionIe.Value.(uint8); ok {
if egressRuleAction == registry.NetworkPolicyRuleActionDrop || egressRuleAction == registry.NetworkPolicyRuleActionReject {
return false
}
}
}
return true
}
}
}
return false
}
16 changes: 13 additions & 3 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
"destinationClusterIPv4",
"destinationClusterIPv6",
"destinationServicePort",
"ingressNetworkPolicyRuleAction",
}
nonStatsElementList = []string{
"flowEndSeconds",
Expand Down Expand Up @@ -82,7 +83,8 @@ func createMsgwithTemplateSet(isIPv6 bool) *entities.Message {
}
ie10 := entities.NewInfoElementWithValue(entities.NewInfoElement("flowEndSeconds", 151, 14, 0, 4), nil)
ie11 := entities.NewInfoElementWithValue(entities.NewInfoElement("flowType", 137, 1, registry.AntreaEnterpriseID, 1), nil)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11)
ie12 := entities.NewInfoElementWithValue(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), nil)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12)
set.AddRecord(elements, 256)

message := entities.NewMessage(true)
Expand Down Expand Up @@ -118,11 +120,13 @@ func createDataMsgForSrc(t *testing.T, isIPv6 bool, isIntraNode bool, isUpdatedR
antreaFlowType := new(bytes.Buffer)
flowEndReason := new(bytes.Buffer)
tcpState := new(bytes.Buffer)
ingressNetworkPolicyRuleAction := new(bytes.Buffer)

util.Encode(srcPort, binary.BigEndian, uint16(1234))
util.Encode(dstPort, binary.BigEndian, uint16(5678))
util.Encode(proto, binary.BigEndian, uint8(6))
util.Encode(svcPort, binary.BigEndian, uint16(4739))
util.Encode(ingressNetworkPolicyRuleAction, binary.BigEndian, registry.NetworkPolicyRuleActionNoAction)
srcPod.WriteString("pod1")
if !isIntraNode {
dstPod.WriteString("")
Expand Down Expand Up @@ -175,8 +179,9 @@ func createDataMsgForSrc(t *testing.T, isIPv6 bool, isIntraNode bool, isUpdatedR
ie12 := entities.NewInfoElementWithValue(tmpElement, flowEndReason)
tmpElement, _ = registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
ie13 := entities.NewInfoElementWithValue(tmpElement, tcpState)
ie14 := entities.NewInfoElementWithValue(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -241,10 +246,12 @@ func createDataMsgForDst(t *testing.T, isIPv6 bool, isIntraNode bool, isUpdatedR
antreaFlowType := new(bytes.Buffer)
flowEndReason := new(bytes.Buffer)
tcpState := new(bytes.Buffer)
ingressNetworkPolicyRuleAction := new(bytes.Buffer)

util.Encode(srcPort, binary.BigEndian, uint16(1234))
util.Encode(dstPort, binary.BigEndian, uint16(5678))
util.Encode(proto, binary.BigEndian, uint8(6))
util.Encode(ingressNetworkPolicyRuleAction, binary.BigEndian, registry.NetworkPolicyRuleActionDrop)
if !isIntraNode {
util.Encode(svcPort, binary.BigEndian, uint16(0))
srcPod.WriteString("")
Expand Down Expand Up @@ -300,8 +307,9 @@ func createDataMsgForDst(t *testing.T, isIPv6 bool, isIntraNode bool, isUpdatedR
ie12 := entities.NewInfoElementWithValue(tmpElement, flowEndReason)
tmpElement, _ = registry.GetInfoElement("tcpState", registry.AntreaEnterpriseID)
ie13 := entities.NewInfoElementWithValue(tmpElement, tcpState)
ie14 := entities.NewInfoElementWithValue(entities.NewInfoElement("ingressNetworkPolicyRuleAction", 139, 1, registry.AntreaEnterpriseID, 1), ingressNetworkPolicyRuleAction)

elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13)
elements = append(elements, ie1, ie2, ie3, ie4, ie5, ie6, ie7, ie8, ie9, ie10, ie11, ie12, ie13, ie14)
// Add all elements in statsElements.
for _, element := range statsElementList {
var e *entities.InfoElement
Expand Down Expand Up @@ -695,6 +703,8 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equal(t, net.IP{0xc0, 0xa8, 0x0, 0x1}, ieWithValue.Value)
ieWithValue, _ = aggRecord.Record.GetInfoElementWithValue("destinationServicePort")
assert.Equal(t, uint16(4739), ieWithValue.Value)
ieWithValue, _ = aggRecord.Record.GetInfoElementWithValue("ingressNetworkPolicyRuleAction")
assert.Equal(t, registry.NetworkPolicyRuleActionDrop, ieWithValue.Value)
for _, e := range nonStatsElementList {
ieWithValue, _ = aggRecord.Record.GetInfoElementWithValue(e)
expectedIE, _ := dstRecordLatest.GetInfoElementWithValue(e)
Expand Down
7 changes: 4 additions & 3 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (

// enum for ingressNetworkPolicyRuleAction and egressNetworkPolicyRuleAction field in Antrea registry.
const (
NetworkPolicyRuleActionAllow = uint8(1)
NetworkPolicyRuleActionDrop = uint8(2)
NetworkPolicyRuleActionReject = uint8(3)
NetworkPolicyRuleActionNoAction = uint8(0)
NetworkPolicyRuleActionAllow = uint8(1)
NetworkPolicyRuleActionDrop = uint8(2)
NetworkPolicyRuleActionReject = uint8(3)
)

// enum for flowEndReason field in IANA registry.
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/registry_antrea.csv
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ ElementID,Name,Abstract Data Type,Data Type Semantics,Status,Description,Units,R
136,tcpState,string,,current,,,,,,,,56506,
137,flowType,unsigned8,,current,The type of flow is based on the location of the source and destination Pods. Supported Types(uint8 value): FlowTypeIntraNode(1) FlowTypeInterNode(2) FlowTypeToExternal(3) and FlowTypeFromExternal(4),,,,,,,56506,
138,tcpStatePrevList,string,,current,,,,,,,,56506,
139,ingressNetworkPolicyRuleAction,unsigned8,,current,Supported Actions(uint8 value): NetworkPolicyRuleActionAllow(1) NetworkPolicyRuleActionDrop(2) NetworkPolicyRuleActionReject(3),,,,,,,56506,
139,ingressNetworkPolicyRuleAction,unsigned8,,current,Supported Actions(uint8 value): NetworkPolicyRuleActionNoAction(0) NetworkPolicyRuleActionAllow(1) NetworkPolicyRuleActionDrop(2) NetworkPolicyRuleActionReject(3),,,,,,,56506,
140,egressNetworkPolicyRuleAction,unsigned8,,current,Supported Actions(uint8 value): NetworkPolicyRuleActionAllow(1) NetworkPolicyRuleActionDrop(2) NetworkPolicyRuleActionReject(3),,,,,,,56506,

0 comments on commit 5099295

Please sign in to comment.