Skip to content

Commit

Permalink
NETOBSERV-1208 & NETOBSERV-1233 Aggregators skip missing fields (#470)
Browse files Browse the repository at this point in the history
* report missing aggregator option

* filter toGenericMap instead of aggregator

* skip bytes / packets zeros
  • Loading branch information
jpinsonneau authored Aug 7, 2023
1 parent 31e9d82 commit 5912edf
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 40 deletions.
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ Following is the supported API format for specifying connection tracking:
last: last
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
input: The input field to base the operation on. When omitted, 'name' is used
reportMissing: When true, missing input will produce MissingFieldError metric and error logs
scheduling: list of timeouts and intervals to apply per selector
selector: key-value map to match against connection fields to apply this scheduling
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
Expand Down
9 changes: 5 additions & 4 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ type ConnTrackHash struct {
}

type OutputField struct {
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
ReportMissing bool `yaml:"reportMissing,omitempty" json:"reportMissing,omitempty" doc:"When true, missing input will produce MissingFieldError metric and error logs"`
}

type ConnTrackOperationEnum struct {
Expand Down
10 changes: 8 additions & 2 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
}
out := config.GenericMap{
"FlowDirection": int(flow.Direction.Number()),
"Bytes": flow.Bytes,
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
"DstMac": macToStr(flow.DataLink.GetDstMac()),
"Etype": flow.EthProtocol,
"Packets": flow.Packets,
"Duplicate": flow.Duplicate,
"Proto": flow.Transport.GetProtocol(),
"TimeFlowStartMs": flow.TimeFlowStart.AsTime().UnixMilli(),
Expand All @@ -56,6 +54,14 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
"AgentIP": ipToStr(flow.AgentIp),
}

if flow.Bytes != 0 {
out["Bytes"] = flow.Bytes
}

if flow.Packets != 0 {
out["Packets"] = flow.Packets
}

proto := flow.Transport.GetProtocol()
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = flow.GetIcmpType()
Expand Down
28 changes: 18 additions & 10 deletions pkg/pipeline/extract/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ type aggregator interface {
}

type aggregateBase struct {
inputField string
outputField string
splitAB bool
initVal interface{}
metrics *metricsType
inputField string
outputField string
splitAB bool
initVal interface{}
metrics *metricsType
reportMissing bool
}

type aSum struct{ aggregateBase }
Expand All @@ -64,7 +65,7 @@ func newAggregator(of api.OutputField, metrics *metricsType) (aggregator, error)
} else {
inputField = of.Name
}
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics}
aggBase := aggregateBase{inputField: inputField, outputField: of.Name, splitAB: of.SplitAB, metrics: metrics, reportMissing: of.ReportMissing}
var agg aggregator
switch of.Operation {
case api.ConnTrackOperationName("Sum"):
Expand Down Expand Up @@ -109,10 +110,15 @@ func (agg *aggregateBase) getOutputField(d direction) string {
func (agg *aggregateBase) getInputFieldValue(flowLog config.GenericMap) (float64, error) {
rawValue, ok := flowLog[agg.inputField]
if !ok {
if agg.metrics != nil {
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
// error only if explicitly specified as FLP skip empty fields by default to reduce storage size
if agg.reportMissing {
if agg.metrics != nil {
agg.metrics.aggregatorErrors.WithLabelValues("MissingFieldError", agg.inputField).Inc()
}
return 0, fmt.Errorf("missing field %v", agg.inputField)
}
return 0, fmt.Errorf("missing field %v", agg.inputField)
// fallback on 0 without error
return 0, nil
}
floatValue, err := utils.ConvertToFloat64(rawValue)
if err != nil {
Expand Down Expand Up @@ -185,5 +191,7 @@ func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction
}

func (cp *aLast) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
if flowLog[cp.inputField] != nil {
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
}
}
101 changes: 82 additions & 19 deletions pkg/pipeline/extract/conntrack/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,52 +65,52 @@ func TestNewAggregator_Valid(t *testing.T) {
{
name: "Default SplitAB",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, false}},
},
{
name: "Default input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", SplitAB: true},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil}},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", true, float64(0), nil, false}},
},
{
name: "Custom input",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", Input: "MyInput"},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil}},
expected: &aSum{aggregateBase{"MyInput", "MyAgg", false, float64(0), nil, false}},
},
{
name: "OperationType sum",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum"},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
name: "OperationType sum with errors",
outputField: api.OutputField{Name: "MyAgg", Operation: "sum", ReportMissing: true},
expected: &aSum{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}},
},
{
name: "OperationType count",
outputField: api.OutputField{Name: "MyAgg", Operation: "count"},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil}},
name: "OperationType count with errors",
outputField: api.OutputField{Name: "MyAgg", Operation: "count", ReportMissing: true},
expected: &aCount{aggregateBase{"MyAgg", "MyAgg", false, float64(0), nil, true}},
},
{
name: "OperationType max",
outputField: api.OutputField{Name: "MyAgg", Operation: "max"},
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil}},
expected: &aMax{aggregateBase{"MyAgg", "MyAgg", false, -math.MaxFloat64, nil, false}},
},
{
name: "OperationType min",
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil}},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64, nil, false}},
},
{
name: "Default first",
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}},
},
{
name: "Custom input first",
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil}},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil, nil, false}},
},
{
name: "Default last",
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil}},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil, nil, false}},
},
}

Expand All @@ -132,6 +132,7 @@ func TestAddField_and_Update(t *testing.T) {
{Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"},
{Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"},
{Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"},
{Name: "PktDropLatestDropCause", Operation: "last", Input: "PktDropLatestDropCause"},
}
var aggs []aggregator
for _, of := range ofs {
Expand All @@ -158,21 +159,67 @@ func TestAddField_and_Update(t *testing.T) {
name: "flowLog 1",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
direction: dirAB,
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0},
expected: map[string]interface{}{
"Bytes_AB": float64(100),
"Bytes_BA": float64(0),
"Packets": float64(10),
"maxFlowLogBytes": float64(100),
"minFlowLogBytes": float64(100),
"numFlowLogs": float64(1),
"FirstFlowDirection": 0,
"LastFlowDirection": 0,
"PktDropLatestDropCause": nil,
},
},
{
name: "flowLog 2",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
flowLog: config.GenericMap{"SrcAddr": ipA, "DstAddr": ipB, "Bytes": 100, "FlowDirection": flowDirA, "PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET"},
direction: dirAB,
expected: map[string]interface{}{
"Bytes_AB": float64(200), // updated bytes count
"Bytes_BA": float64(0),
"Packets": float64(10),
"maxFlowLogBytes": float64(100),
"minFlowLogBytes": float64(100),
"numFlowLogs": float64(2), // updated flow count
"FirstFlowDirection": 0,
"LastFlowDirection": 0,
"PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // added drop cause
},
},
{
name: "flowLog 3",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 300, 20, false),
direction: dirBA,
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1},
expected: map[string]interface{}{
"Bytes_AB": float64(200),
"Bytes_BA": float64(300), // updated reverse direction byte count
"Packets": float64(30),
"maxFlowLogBytes": float64(300), // updated max bytes from any direction
"minFlowLogBytes": float64(100),
"numFlowLogs": float64(3), // updated count
"FirstFlowDirection": 0,
"LastFlowDirection": 1,
"PktDropLatestDropCause": "SKB_DROP_REASON_NO_SOCKET", // missing field is kept to its last available value
},
},
}

conn := NewConnBuilder(nil).Build()
for _, agg := range aggs {
agg.addField(conn)
}
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil}
expectedInits := map[string]interface{}{
"Bytes_AB": float64(0),
"Bytes_BA": float64(0),
"Packets": float64(0),
"maxFlowLogBytes": float64(-math.MaxFloat64),
"minFlowLogBytes": float64(math.MaxFloat64),
"numFlowLogs": float64(0),
"FirstFlowDirection": nil,
"LastFlowDirection": nil,
"PktDropLatestDropCause": nil,
}
require.Equal(t, expectedInits, conn.(*connType).aggFields)

for i, test := range table {
Expand All @@ -188,7 +235,7 @@ func TestAddField_and_Update(t *testing.T) {
func TestMissingFieldError(t *testing.T) {
test.ResetPromRegistry()
metrics := newMetrics(opMetrics)
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true, ReportMissing: true}, metrics)
require.NoError(t, err)

conn := NewConnBuilder(metrics).Build()
Expand All @@ -201,6 +248,22 @@ func TestMissingFieldError(t *testing.T) {
require.Contains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"} 1`)
}

func TestSkipMissingFieldError(t *testing.T) {
test.ResetPromRegistry()
metrics := newMetrics(opMetrics)
agg, err := newAggregator(api.OutputField{Name: "Bytes", Operation: "sum", SplitAB: true}, metrics)
require.NoError(t, err)

conn := NewConnBuilder(metrics).Build()
agg.addField(conn)

flowLog := config.GenericMap{}
agg.update(conn, flowLog, dirAB, true)

exposed := test.ReadExposedMetrics(t)
require.NotContains(t, exposed, `conntrack_aggregator_errors{error="MissingFieldError",field="Bytes"}`)
}

func TestFloat64ConversionError(t *testing.T) {
test.ResetPromRegistry()
metrics := newMetrics(opMetrics)
Expand Down
5 changes: 4 additions & 1 deletion pkg/pipeline/extract/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package conntrack

import (
"fmt"
"reflect"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/utils"
Expand Down Expand Up @@ -100,7 +101,9 @@ func (c *connType) getNextHeartbeatTime() time.Time {
func (c *connType) toGenericMap() config.GenericMap {
gm := config.GenericMap{}
for k, v := range c.aggFields {
gm[k] = v
if v != nil && (reflect.TypeOf(v).Kind() != reflect.Float64 || v.(float64) != 0) {
gm[k] = v
}
}

// In case of a conflict between the keys and the aggFields / cpFields, the keys should prevail.
Expand Down
20 changes: 16 additions & 4 deletions pkg/pipeline/extract/conntrack/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,26 @@ func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, p
"DstAddr": dstIP,
"DstPort": dstPort,
"Proto": protocol,
"Bytes_AB": bytesAB,
"Bytes_BA": bytesBA,
"Packets_AB": packetsAB,
"Packets_BA": packetsBA,
"numFlowLogs": numFlowLogs,
api.IsFirstFieldName: false,
},
}

if bytesAB != 0 {
mock.record["Bytes_AB"] = bytesAB
}

if bytesBA != 0 {
mock.record["Bytes_BA"] = bytesBA
}

if bytesAB != 0 {
mock.record["Packets_AB"] = packetsAB
}

if bytesBA != 0 {
mock.record["Packets_BA"] = packetsBA
}
return mock
}

Expand Down

0 comments on commit 5912edf

Please sign in to comment.