diff --git a/docs/api.md b/docs/api.md index cad853881..c0796d1fa 100644 --- a/docs/api.md +++ b/docs/api.md @@ -195,10 +195,13 @@ Following is the supported API format for specifying metrics aggregations:
  aggregates:
-         name: description of aggregation result
-         groupByKeys: list of fields on which to aggregate
-         operationType: sum, min, max, count, avg or raw_values
-         operationKey: internal field on which to perform the operation
+         defaultExpiryTime: default time duration of data aggregation to perform rules (default: 2 minutes)
+         rules: list of aggregation rules, each includes:
+                 name: description of aggregation result
+                 groupByKeys: list of fields on which to aggregate
+                 operationType: sum, min, max, count, avg or raw_values
+                 operationKey: internal field on which to perform the operation
+                 expiryTime: time interval over which to perform the operation
 
## Connection tracking API Following is the supported API format for specifying connection tracking: diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 7b656d17f..845ce5de9 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -17,13 +17,15 @@ transform: type: add_service parameters: proto extract: + type: aggregates aggregates: - - name: bandwidth_network_service - groupByKeys: - - service - - _RecordType - operationType: sum - operationKey: bytes + rules: + - name: bandwidth_network_service + groupByKeys: + - service + - _RecordType + operationType: sum + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index bed97fd7e..b78b07bc3 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -21,14 +21,16 @@ transform: type: add_subnet parameters: /24 extract: + type: aggregates aggregates: - - name: bandwidth_source_destination_subnet - groupByKeys: - - dstSubnet24 - - srcSubnet24 - - _RecordType - operationType: sum - operationKey: bytes + rules: + - name: bandwidth_source_destination_subnet + groupByKeys: + - dstSubnet24 + - srcSubnet24 + - _RecordType + operationType: sum + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index b5bdc1357..4dc16398a 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -17,13 +17,15 @@ transform: type: add_subnet parameters: /16 extract: + type: aggregates aggregates: - - name: bandwidth_source_subnet - groupByKeys: - - srcSubnet - - _RecordType - operationType: sum - operationKey: bytes + rules: + - name: bandwidth_source_subnet + groupByKeys: + - srcSubnet + - _RecordType + operationType: sum + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/connection_length_histogram.yaml b/network_definitions/connection_length_histogram.yaml index 5e5309055..5d7a357e6 100644 --- a/network_definitions/connection_length_histogram.yaml +++ b/network_definitions/connection_length_histogram.yaml @@ -11,22 +11,24 @@ tags: - elephant - rate extract: + type: aggregates aggregates: - - name: connection_bytes_hist - groupByKeys: - - _RecordType - operationType: raw_values - operationKey: bytes_total - - name: connection_bytes_hist_AB - groupByKeys: - - _RecordType - operationType: raw_values - operationKey: bytes_AB - - name: connection_bytes_hist_BA - groupByKeys: - - _RecordType - operationType: raw_values - operationKey: bytes_BA + rules: + - name: connection_bytes_hist + groupByKeys: + - _RecordType + operationType: raw_values + operationKey: bytes_total + - name: connection_bytes_hist_AB + groupByKeys: + - _RecordType + operationType: raw_values + operationKey: bytes_AB + - name: connection_bytes_hist_BA + groupByKeys: + - _RecordType + operationType: raw_values + operationKey: bytes_BA encode: type: prom prom: diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index 1074df7e6..b1a0ca798 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -17,12 +17,13 @@ transform: extract: type: aggregates aggregates: - - name: dest_connection_subnet_count - groupByKeys: - - dstSubnet - - _RecordType - operationType: count - operationKey: isNewFlow + rules: + - name: dest_connection_subnet_count + groupByKeys: + - dstSubnet + - _RecordType + operationType: count + operationKey: isNewFlow encode: type: prom prom: diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index c94ac550c..d31437249 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -17,11 +17,12 @@ transform: extract: type: aggregates aggregates: - - name: src_connection_count - groupByKeys: - - srcSubnet - - _RecordType - operationType: count + rules: + - name: src_connection_count + groupByKeys: + - srcSubnet + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/connection_rate_per_tcp_flags.yaml b/network_definitions/connection_rate_per_tcp_flags.yaml index ec0a71eb6..f4814e70b 100644 --- a/network_definitions/connection_rate_per_tcp_flags.yaml +++ b/network_definitions/connection_rate_per_tcp_flags.yaml @@ -11,11 +11,12 @@ tags: extract: type: aggregates aggregates: - - name: TCPFlags_count - groupByKeys: - - TCPFlags - - _RecordType - operationType: count + rules: + - name: TCPFlags_count + groupByKeys: + - TCPFlags + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/connections_per_dst_as.yaml b/network_definitions/connections_per_dst_as.yaml index dcc315d20..7011557b9 100644 --- a/network_definitions/connections_per_dst_as.yaml +++ b/network_definitions/connections_per_dst_as.yaml @@ -12,11 +12,12 @@ tags: extract: type: aggregates aggregates: - - name: dst_as_connection_count - groupByKeys: - - dstAS - - _RecordType - operationType: count + rules: + - name: dst_as_connection_count + groupByKeys: + - dstAS + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/connections_per_src_as.yaml b/network_definitions/connections_per_src_as.yaml index 2b127c1fb..838050c8a 100644 --- a/network_definitions/connections_per_src_as.yaml +++ b/network_definitions/connections_per_src_as.yaml @@ -12,11 +12,12 @@ tags: extract: type: aggregates aggregates: - - name: src_as_connection_count - groupByKeys: - - srcAS - - _RecordType - operationType: count + rules: + - name: src_as_connection_count + groupByKeys: + - srcAS + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index 6e845c50f..77af878a2 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -21,13 +21,15 @@ transform: type: add_subnet parameters: /24 extract: + type: aggregates aggregates: - - name: count_source_destination_subnet - groupByKeys: - - dstSubnet24 - - srcSubnet24 - - _RecordType - operationType: count + rules: + - name: count_source_destination_subnet + groupByKeys: + - dstSubnet24 + - srcSubnet24 + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index 1ab39bb78..ad05f4b4f 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -17,13 +17,15 @@ transform: type: add_subnet parameters: /16 extract: + type: aggregates aggregates: - - name: bandwidth_destination_subnet - groupByKeys: - - dstSubnet - - _RecordType - operationType: sum - operationKey: bytes + rules: + - name: bandwidth_destination_subnet + groupByKeys: + - dstSubnet + - _RecordType + operationType: sum + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index 188588ac7..ffdf6afc5 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -16,14 +16,16 @@ transform: type: add_kubernetes parameters: srcK8S_labels extract: + type: aggregates aggregates: - - name: bandwidth_namespace - groupByKeys: - - srcK8S_Namespace - - srcK8S_Type - - _RecordType - operationType: sum - operationKey: bytes + rules: + - name: bandwidth_namespace + groupByKeys: + - srcK8S_Namespace + - srcK8S_Type + - _RecordType + operationType: sum + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/flows_length_histogram.yaml b/network_definitions/flows_length_histogram.yaml index 9ca59958d..e672bea1d 100644 --- a/network_definitions/flows_length_histogram.yaml +++ b/network_definitions/flows_length_histogram.yaml @@ -17,13 +17,15 @@ transform: type: add_if parameters: ">=0" extract: + type: aggregates aggregates: - - name: flows_bytes_hist - groupByKeys: - - all_Evaluate - - _RecordType - operationType: raw_values - operationKey: bytes + rules: + - name: flows_bytes_hist + groupByKeys: + - all_Evaluate + - _RecordType + operationType: raw_values + operationKey: bytes encode: type: prom prom: diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index f795eb859..086c52ada 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -18,11 +18,12 @@ transform: extract: type: aggregates aggregates: - - name: dest_connection_location_count - groupByKeys: - - dstLocation_CountryName - - _RecordType - operationType: count + rules: + - name: dest_connection_location_count + groupByKeys: + - dstLocation_CountryName + - _RecordType + operationType: count encode: type: prom prom: diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 103e49d69..026129608 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -19,11 +19,12 @@ transform: extract: type: aggregates aggregates: - - name: dest_service_count - groupByKeys: - - service - - _RecordType - operationType: count + rules: + - name: dest_service_count + groupByKeys: + - service + - _RecordType + operationType: count encode: type: prom prom: diff --git a/pkg/api/api.go b/pkg/api/api.go index 9c82b9f60..a49f1e04c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -54,18 +54,18 @@ const ( // Note: items beginning with doc: "## title" are top level items that get divided into sections inside api.md. type API struct { - PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` - KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` - S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"` - IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` - IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` - IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` - TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` - TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` - TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` - WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` - WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` - ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` - ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"` - ExtractTimebased ExtractTimebased `yaml:"timebased" doc:"## Time-based Filters API\nFollowing is the supported API format for specifying metrics time-based filters:\n"` + PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` + KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` + S3Encode EncodeS3 `yaml:"s3" doc:"## S3 encode API\nFollowing is the supported API format for S3 encode:\n"` + IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` + IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` + IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` + TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` + TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` + TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` + WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` + WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` + ExtractAggregate Aggregates `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` + ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"` + ExtractTimebased ExtractTimebased `yaml:"timebased" doc:"## Time-based Filters API\nFollowing is the supported API format for specifying metrics time-based filters:\n"` } diff --git a/pkg/api/extract_aggregate.go b/pkg/api/extract_aggregate.go index e34c93472..a7b43e2ad 100644 --- a/pkg/api/extract_aggregate.go +++ b/pkg/api/extract_aggregate.go @@ -17,12 +17,19 @@ package api +type Aggregates struct { + DefaultExpiryTime Duration `yaml:"defaultExpiryTime,omitempty" json:"defaultExpiryTime,omitempty" doc:"default time duration of data aggregation to perform rules (default: 2 minutes)"` + Rules AggregateDefinitions `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of aggregation rules, each includes:"` +} + type AggregateBy []string type AggregateOperation string +type AggregateDefinitions []AggregateDefinition type AggregateDefinition struct { Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of aggregation result"` GroupByKeys AggregateBy `yaml:"groupByKeys,omitempty" json:"groupByKeys,omitempty" doc:"list of fields on which to aggregate"` OperationType AggregateOperation `yaml:"operationType,omitempty" json:"operationType,omitempty" doc:"sum, min, max, count, avg or raw_values"` OperationKey string `yaml:"operationKey,omitempty" json:"operationKey,omitempty" doc:"internal field on which to perform the operation"` + ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time interval over which to perform the operation"` } diff --git a/pkg/confgen/confgen.go b/pkg/confgen/confgen.go index 61a2175b9..cb8a6152e 100644 --- a/pkg/confgen/confgen.go +++ b/pkg/confgen/confgen.go @@ -23,7 +23,6 @@ import ( "path/filepath" "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) @@ -35,29 +34,29 @@ const ( ) type Definition struct { - FileName string - Description string - Details string - Usage string - Tags []string - TransformNetwork *api.TransformNetwork - AggregateDefinitions *aggregate.Definitions - ExtractTimebased *api.ExtractTimebased - PromEncode *api.PromEncode - Visualization *Visualization + FileName string + Description string + Details string + Usage string + Tags []string + TransformNetwork *api.TransformNetwork + Aggregates *api.Aggregates + ExtractTimebased *api.ExtractTimebased + PromEncode *api.PromEncode + Visualization *Visualization } type Definitions []Definition type ConfGen struct { - opts *Options - config *Config - transformRules api.NetworkTransformRules - aggregateDefinitions aggregate.Definitions - timebasedTopKs api.ExtractTimebased - promMetrics api.PromMetricsItems - visualizations Visualizations - definitions Definitions + opts *Options + config *Config + transformRules api.NetworkTransformRules + aggregates api.Aggregates + timebasedTopKs api.ExtractTimebased + promMetrics api.PromMetricsItems + visualizations Visualizations + definitions Definitions } type DefFile struct { @@ -190,14 +189,14 @@ func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { } // parse extract - definition.AggregateDefinitions, definition.ExtractTimebased, err = cg.parseExtract(&defFile.Extract) + definition.Aggregates, definition.ExtractTimebased, err = cg.parseExtract(&defFile.Extract) if err != nil { log.Debugf("parseExtract err: %v ", err) return err } // parse encode - definition.PromEncode, err = cg.parseEncode(&defFile.Encode, len(*definition.AggregateDefinitions) > 0) + definition.PromEncode, err = cg.parseEncode(&defFile.Encode, len(definition.Aggregates.Rules) > 0) if err != nil { log.Debugf("parseEncode err: %v ", err) return err @@ -237,11 +236,11 @@ func getDefinitionFiles(rootPath string) []string { func NewConfGen(opts *Options) *ConfGen { return &ConfGen{ - opts: opts, - transformRules: api.NetworkTransformRules{}, - aggregateDefinitions: aggregate.Definitions{}, - definitions: Definitions{}, - visualizations: Visualizations{}, + opts: opts, + transformRules: api.NetworkTransformRules{}, + aggregates: api.Aggregates{}, + definitions: Definitions{}, + visualizations: Visualizations{}, } } diff --git a/pkg/confgen/confgen_test.go b/pkg/confgen/confgen_test.go index 0b9293c06..1df9a9af6 100644 --- a/pkg/confgen/confgen_test.go +++ b/pkg/confgen/confgen_test.go @@ -131,13 +131,13 @@ func Test_RunShortConfGen(t *testing.T) { }, out.Parameters[1].Transform.Network.Rules[0]) // Expects aggregates - require.Len(t, out.Parameters[2].Extract.Aggregates, 1) + require.Len(t, out.Parameters[2].Extract.Aggregates.Rules, 1) require.Equal(t, api.AggregateDefinition{ Name: "test_aggregates", GroupByKeys: api.AggregateBy{"service"}, OperationType: "sum", OperationKey: "test_operation_key", - }, out.Parameters[2].Extract.Aggregates[0]) + }, out.Parameters[2].Extract.Aggregates.Rules[0]) // Expects prom encode require.Len(t, out.Parameters[3].Encode.Prom.Metrics, 1) @@ -306,19 +306,19 @@ func Test_RunLongConfGen(t *testing.T) { }, out.Parameters[2].Transform.Network.Rules[0]) // Expects aggregates - require.Len(t, out.Parameters[3].Extract.Aggregates, 2) + require.Len(t, out.Parameters[3].Extract.Aggregates.Rules, 2) require.Equal(t, api.AggregateDefinition{ Name: "test_aggregates", GroupByKeys: api.AggregateBy{"service"}, OperationType: "sum", OperationKey: "test_operation_key", - }, out.Parameters[3].Extract.Aggregates[0]) + }, out.Parameters[3].Extract.Aggregates.Rules[0]) require.Equal(t, api.AggregateDefinition{ Name: "test_agg_histo", GroupByKeys: api.AggregateBy{"service"}, OperationType: "sum", OperationKey: "test_operation_key", - }, out.Parameters[3].Extract.Aggregates[1]) + }, out.Parameters[3].Extract.Aggregates.Rules[1]) // Expects prom encode; make sure type "histogram" is changed to "agg_histogram" require.Len(t, out.Parameters[4].Encode.Prom.Metrics, 2) @@ -365,13 +365,13 @@ func Test_GenerateTruncatedConfig(t *testing.T) { require.Len(t, params, 2) // Expects aggregates - require.Len(t, params[0].Extract.Aggregates, 1) + require.Len(t, params[0].Extract.Aggregates.Rules, 1) require.Equal(t, api.AggregateDefinition{ Name: "test_aggregates", GroupByKeys: api.AggregateBy{"service"}, OperationType: "sum", OperationKey: "test_operation_key", - }, params[0].Extract.Aggregates[0]) + }, params[0].Extract.Aggregates.Rules[0]) // Expects prom encode require.Len(t, params[1].Encode.Prom.Metrics, 1) diff --git a/pkg/confgen/dedup.go b/pkg/confgen/dedup.go index e09d19953..8a069d2d6 100644 --- a/pkg/confgen/dedup.go +++ b/pkg/confgen/dedup.go @@ -21,13 +21,12 @@ import ( "reflect" "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" ) func (cg *ConfGen) dedupe() { cg.transformRules = dedupeNetworkTransformRules(cg.transformRules) - cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions) + cg.aggregates.Rules = dedupeAggregateDefinitions(cg.aggregates.Rules) } type void struct{} @@ -53,7 +52,7 @@ func dedupeNetworkTransformRules(rules api.NetworkTransformRules) api.NetworkTra // dedupeAggregateDefinitions is inefficient because we can't use a map to look for duplicates. // The reason is that aggregate.AggregateDefinition is not hashable due to its AggregateBy field which is a slice. -func dedupeAggregateDefinitions(aggregateDefinitions aggregate.Definitions) aggregate.Definitions { +func dedupeAggregateDefinitions(aggregateDefinitions api.AggregateDefinitions) api.AggregateDefinitions { var dedupeSlice []api.AggregateDefinition for i, aggregateDefinition := range aggregateDefinitions { if containsAggregateDefinitions(dedupeSlice, aggregateDefinition) { diff --git a/pkg/confgen/dedup_test.go b/pkg/confgen/dedup_test.go index 1f28a29f6..8795c163f 100644 --- a/pkg/confgen/dedup_test.go +++ b/pkg/confgen/dedup_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" "github.com/stretchr/testify/require" ) @@ -43,14 +42,14 @@ func Test_dedupeNetworkTransformRules(t *testing.T) { } func Test_dedupeAggregateDefinitions(t *testing.T) { - slice := aggregate.Definitions{ + slice := api.AggregateDefinitions{ api.AggregateDefinition{Name: "n1", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o1")}, api.AggregateDefinition{Name: "n1", GroupByKeys: api.AggregateBy{"a"}, OperationType: api.AggregateOperation("o1")}, api.AggregateDefinition{Name: "n2", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o2")}, api.AggregateDefinition{Name: "n3", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o3")}, api.AggregateDefinition{Name: "n2", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o2")}, } - expected := aggregate.Definitions{ + expected := api.AggregateDefinitions{ api.AggregateDefinition{Name: "n1", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o1")}, api.AggregateDefinition{Name: "n1", GroupByKeys: api.AggregateBy{"a"}, OperationType: api.AggregateOperation("o1")}, api.AggregateDefinition{Name: "n2", GroupByKeys: api.AggregateBy{"a", "b"}, OperationType: api.AggregateOperation("o2")}, diff --git a/pkg/confgen/doc.go b/pkg/confgen/doc.go index da2ae6ae8..0b6230203 100644 --- a/pkg/confgen/doc.go +++ b/pkg/confgen/doc.go @@ -24,7 +24,6 @@ import ( "strings" "github.com/netobserv/flowlogs-pipeline/pkg/api" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" ) func (cg *ConfGen) generateVisualizeText(vgs []VisualizationGrafana) string { @@ -49,7 +48,7 @@ func (cg *ConfGen) generatePromEncodeText(metrics api.PromMetricsItems) string { return section } -func (cg *ConfGen) generateOperationText(definitions aggregate.Definitions) string { +func (cg *ConfGen) generateOperationText(definitions api.AggregateDefinitions) string { section := "" for _, definition := range definitions { by := strings.Join(definition.GroupByKeys[:], ", ") @@ -72,7 +71,7 @@ func (cg *ConfGen) generateDoc(fileName string) error { labels := strings.Join(metric.Tags[:], ", ") // TODO: add support for multiple operations - operation := cg.generateOperationText(*metric.AggregateDefinitions) + operation := cg.generateOperationText(metric.Aggregates.Rules) expose := cg.generatePromEncodeText(metric.PromEncode.Metrics) visualize := cg.generateVisualizeText(metric.Visualization.Grafana) doc += fmt.Sprintf( diff --git a/pkg/confgen/extract.go b/pkg/confgen/extract.go index c8fe3e77c..0cccc7634 100644 --- a/pkg/confgen/extract.go +++ b/pkg/confgen/extract.go @@ -21,11 +21,10 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Definitions, *api.ExtractTimebased, error) { +func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*api.Aggregates, *api.ExtractTimebased, error) { var jsoniterJson = jsoniter.ConfigCompatibleWithStandardLibrary aggregateExtract := (*extract)["aggregates"] b, err := jsoniterJson.Marshal(&aggregateExtract) @@ -34,14 +33,14 @@ func (cg *ConfGen) parseExtract(extract *map[string]interface{}) (*aggregate.Def return nil, nil, err } - var jsonNetworkAggregate aggregate.Definitions + var jsonNetworkAggregate api.Aggregates err = config.JsonUnmarshalStrict(b, &jsonNetworkAggregate) if err != nil { log.Errorf("Unmarshal aggregate.Definitions err: %v ", err) return nil, nil, err } - cg.aggregateDefinitions = append(cg.aggregateDefinitions, jsonNetworkAggregate...) + cg.aggregates.Rules = append(cg.aggregates.Rules, jsonNetworkAggregate.Rules...) timebasedExtract, ok := (*extract)["timebased"] if !ok { diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 7b80b4805..075ee2a55 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -46,8 +46,8 @@ func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { }) } metricsNode := forkedNode - if len(cg.aggregateDefinitions) > 0 { - metricsNode = metricsNode.Aggregate("extract_aggregate", cg.aggregateDefinitions) + if len(cg.aggregates.Rules) > 0 { + metricsNode = metricsNode.Aggregate("extract_aggregate", cg.aggregates) if len(cg.timebasedTopKs.Rules) > 0 { metricsNode = metricsNode.ExtractTimebased("extract_timebased", api.ExtractTimebased{ Rules: cg.timebasedTopKs.Rules, @@ -85,7 +85,7 @@ func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { case "transform_network": parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network) case "extract_aggregate": - parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions) + parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregates) case "extract_timebased": parameters[i] = config.NewTimbasedParams("extract_timebased", cg.timebasedTopKs) case "encode_prom": diff --git a/pkg/config/config.go b/pkg/config/config.go index 838e3f191..6f01e56a3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -108,10 +108,10 @@ type Transform struct { } type Extract struct { - Type string `yaml:"type" json:"type"` - Aggregates []api.AggregateDefinition `yaml:"aggregates,omitempty" json:"aggregates,omitempty"` - ConnTrack *api.ConnTrack `yaml:"conntrack,omitempty" json:"conntrack,omitempty"` - Timebased *api.ExtractTimebased `yaml:"timebased,omitempty" json:"timebased,omitempty"` + Type string `yaml:"type" json:"type"` + Aggregates *api.Aggregates `yaml:"aggregates,omitempty" json:"aggregates,omitempty"` + ConnTrack *api.ConnTrack `yaml:"conntrack,omitempty" json:"conntrack,omitempty"` + Timebased *api.ExtractTimebased `yaml:"timebased,omitempty" json:"timebased,omitempty"` } type Encode struct { diff --git a/pkg/config/pipeline_builder.go b/pkg/config/pipeline_builder.go index a5eb8866f..cb16c279b 100644 --- a/pkg/config/pipeline_builder.go +++ b/pkg/config/pipeline_builder.go @@ -96,7 +96,7 @@ func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuild } // Aggregate chains the current stage with an aggregate stage and returns that new stage -func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefinition) PipelineBuilderStage { +func (b *PipelineBuilderStage) Aggregate(name string, aggs api.Aggregates) PipelineBuilderStage { return b.next(name, NewAggregateParams(name, aggs)) } diff --git a/pkg/config/pipeline_builder_test.go b/pkg/config/pipeline_builder_test.go index 0b1007b28..3a78d09e5 100644 --- a/pkg/config/pipeline_builder_test.go +++ b/pkg/config/pipeline_builder_test.go @@ -113,11 +113,14 @@ func TestKafkaPromPipeline(t *testing.T) { pl = pl.ConnTrack("conntrack", api.ConnTrack{ KeyDefinition: api.KeyDefinition{}, }) - pl = pl.Aggregate("aggregate", []api.AggregateDefinition{{ + var timeDuration api.Duration + timeDuration.Duration = time.Duration(30 * time.Second) + pl = pl.Aggregate("aggregate", api.Aggregates{Rules: []api.AggregateDefinition{{ Name: "src_as_connection_count", GroupByKeys: api.AggregateBy{"srcAS"}, OperationType: "count", - }}) + ExpiryTime: timeDuration, + }}}) var expiryTimeDuration api.Duration expiryTimeDuration.Duration = time.Duration(50 * time.Second) pl = pl.EncodePrometheus("prom", api.PromEncode{ @@ -159,7 +162,7 @@ func TestKafkaPromPipeline(t *testing.T) { b, err = json.Marshal(params[3]) require.NoError(t, err) - require.JSONEq(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":[{"name":"src_as_connection_count","groupByKeys":["srcAS"],"operationType":"count"}]}}`, string(b)) + require.JSONEq(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":{"defaultExpiryTime":"0s","rules":[{"name":"src_as_connection_count","groupByKeys":["srcAS"],"operationType":"count","expiryTime":"30s"}]}}}`, string(b)) b, err = json.Marshal(params[4]) require.NoError(t, err) diff --git a/pkg/config/stage_params.go b/pkg/config/stage_params.go index 799ca4f8b..b2795c958 100644 --- a/pkg/config/stage_params.go +++ b/pkg/config/stage_params.go @@ -33,8 +33,8 @@ func NewKafkaParams(name string, ingest api.IngestKafka) StageParam { return StageParam{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}} } -func NewAggregateParams(name string, aggs []api.AggregateDefinition) StageParam { - return StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}} +func NewAggregateParams(name string, aggs api.Aggregates) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: &aggs}} } func NewTransformGenericParams(name string, gen api.TransformGeneric) StageParam { diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index b45bdbaed..6c820682c 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -43,23 +43,22 @@ parameters: extract: type: aggregates aggregates: - - name: bandwidth_sum - groupByKeys: - - service - operationType: sum - operationKey: bytes - - - name: bandwidth_count - groupByKeys: - - service - operationType: count - operationKey: - - - name: bandwidth_raw_values - groupByKeys: - - service - operationType: raw_values - operationKey: bytes + rules: + - name: bandwidth_sum + groupByKeys: + - service + operationType: sum + operationKey: bytes + - name: bandwidth_count + groupByKeys: + - service + operationType: count + operationKey: + - name: bandwidth_raw_values + groupByKeys: + - service + operationType: raw_values + operationKey: bytes - name: encode encode: type: prom @@ -73,14 +72,12 @@ parameters: valueKey: recent_count labels: - service - - name: bytes_sum type: counter filter: {key: name, value: bandwidth_sum} valueKey: recent_op_value labels: - service - - name: bytes_histogram type: agg_histogram filter: {key: name, value: bandwidth_raw_values} diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index d0c25f209..775b90baf 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -27,15 +27,15 @@ import ( log "github.com/sirupsen/logrus" ) -var defaultExpiryTime = 10 * time.Minute +var defaultExpiryTime = 2 * time.Minute +var cleanupLoopTime = 2 * time.Minute type Aggregates struct { - Aggregates []Aggregate - expiryTime time.Duration + Aggregates []Aggregate + cleanupLoopTime time.Duration + defaultExpiryTime time.Duration } -type Definitions []api.AggregateDefinition - func (aggregates *Aggregates) Evaluate(entries []config.GenericMap) error { for _, aggregate := range aggregates.Aggregates { err := aggregate.Evaluate(entries) @@ -59,11 +59,15 @@ func (aggregates *Aggregates) GetMetrics() []config.GenericMap { } func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefinition) []Aggregate { + expiryTime := aggregateDefinition.ExpiryTime + if expiryTime.Duration == 0 { + expiryTime.Duration = defaultExpiryTime + } aggregate := Aggregate{ Definition: aggregateDefinition, cache: utils.NewTimedCache(0, nil), mutex: &sync.Mutex{}, - expiryTime: aggregates.expiryTime, + expiryTime: expiryTime.Duration, } appendedAggregates := append(aggregates.Aggregates, aggregate) @@ -72,7 +76,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi func (aggregates *Aggregates) cleanupExpiredEntriesLoop() { - ticker := time.NewTicker(aggregates.expiryTime) + ticker := time.NewTicker(aggregates.cleanupLoopTime) go func() { for { select { @@ -93,12 +97,16 @@ func (aggregates *Aggregates) cleanupExpiredEntries() { } } -func NewAggregatesFromConfig(definitions []api.AggregateDefinition) (Aggregates, error) { +func NewAggregatesFromConfig(aggConfig *api.Aggregates) (Aggregates, error) { aggregates := Aggregates{ - expiryTime: defaultExpiryTime, + cleanupLoopTime: cleanupLoopTime, + defaultExpiryTime: aggConfig.DefaultExpiryTime.Duration, + } + if aggregates.defaultExpiryTime == 0 { + aggregates.defaultExpiryTime = defaultExpiryTime } - for _, aggregateDefinition := range definitions { + for _, aggregateDefinition := range aggConfig.Rules { aggregates.Aggregates = aggregates.AddAggregate(aggregateDefinition) } diff --git a/pkg/pipeline/extract/aggregate/aggregates_test.go b/pkg/pipeline/extract/aggregate/aggregates_test.go index ea32d17b8..f5a8bf2db 100644 --- a/pkg/pipeline/extract/aggregate/aggregates_test.go +++ b/pkg/pipeline/extract/aggregate/aggregates_test.go @@ -36,12 +36,13 @@ parameters: extract: type: aggregates aggregates: - - Name: "Avg by src and dst IP's" - GroupByKeys: - - "dstIP" - - "srcIP" - OperationType: "avg" - OperationKey: "value" + rules: + - Name: "Avg by src and dst IP's" + GroupByKeys: + - "dstIP" + - "srcIP" + OperationType: "avg" + OperationKey: "value" ` v, cfg := test.InitConfig(t, yamlConfig) require.NotNil(t, v) @@ -62,6 +63,7 @@ func Test_NewAggregatesFromConfig(t *testing.T) { func Test_CleanupExpiredEntriesLoop(t *testing.T) { defaultExpiryTime = 4 * time.Second // expiration after 4 seconds + cleanupLoopTime = 4 * time.Second // clean up after 4 seconds aggregates := initAggregates(t) expectedAggregate := GetMockAggregate() require.Equal(t, expectedAggregate.Definition, aggregates.Aggregates[0].Definition) diff --git a/pkg/pipeline/extract/extract_aggregate.go b/pkg/pipeline/extract/extract_aggregate.go index 29b32042c..ed2ed740b 100644 --- a/pkg/pipeline/extract/extract_aggregate.go +++ b/pkg/pipeline/extract/extract_aggregate.go @@ -18,7 +18,6 @@ package extract import ( - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" log "github.com/sirupsen/logrus" @@ -43,11 +42,7 @@ func (ea *ExtractAggregate) Extract(entries []config.GenericMap) []config.Generi // NewExtractAggregate creates a new extractor func NewExtractAggregate(params config.StageParam) (Extractor, error) { log.Debugf("entering NewExtractAggregate") - aggConfig := []api.AggregateDefinition{} - if params.Extract != nil { - aggConfig = params.Extract.Aggregates - } - aggregates, err := aggregate.NewAggregatesFromConfig(aggConfig) + aggregates, err := aggregate.NewAggregatesFromConfig(params.Extract.Aggregates) if err != nil { log.Errorf("error in NewAggregatesFromConfig: %v", err) return nil, err diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index cfe85e3b0..8b1825d86 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -39,41 +39,37 @@ parameters: extract: type: aggregates aggregates: - - name: bandwidth_count - groupByKeys: - - service - operationType: count - operationKey: "" - - - name: bandwidth_sum - groupByKeys: - - service - operationType: sum - operationKey: bytes - - - name: bandwidth_max - groupByKeys: - - service - operationType: max - operationKey: bytes - - - name: bandwidth_min - groupByKeys: - - service - operationType: min - operationKey: bytes - - - name: bandwidth_avg - groupByKeys: - - service - operationType: avg - operationKey: bytes - - - name: bandwidth_raw_values - groupByKeys: - - service - operationType: raw_values - operationKey: bytes + rules: + - name: bandwidth_count + groupByKeys: + - service + operationType: count + operationKey: "" + - name: bandwidth_sum + groupByKeys: + - service + operationType: sum + operationKey: bytes + - name: bandwidth_max + groupByKeys: + - service + operationType: max + operationKey: bytes + - name: bandwidth_min + groupByKeys: + - service + operationType: min + operationKey: bytes + - name: bandwidth_avg + groupByKeys: + - service + operationType: avg + operationKey: bytes + - name: bandwidth_raw_values + groupByKeys: + - service + operationType: raw_values + operationKey: bytes ` var err error diff --git a/pkg/test/e2e/pipline/flp-config.yaml b/pkg/test/e2e/pipline/flp-config.yaml index 48dfee4e6..2435ff45f 100644 --- a/pkg/test/e2e/pipline/flp-config.yaml +++ b/pkg/test/e2e/pipline/flp-config.yaml @@ -81,84 +81,85 @@ data: type: network - extract: aggregates: - - name: bandwidth_network_service - groupByKeys: - - service - operationType: sum - operationKey: bytes - - name: bandwidth_source_destination_subnet - groupByKeys: - - dstSubnet24 - - srcSubnet24 - operationType: sum - operationKey: bytes - - name: bandwidth_source_subnet - groupByKeys: - - srcSubnet - operationType: sum - operationKey: bytes - - name: dest_connection_subnet_count - groupByKeys: - - dstSubnet - operationType: sum - operationKey: isNewFlow - - name: src_connection_count - groupByKeys: - - srcSubnet - operationType: count - operationKey: "" - - name: TCPFlags_count - groupByKeys: - - TCPFlags - operationType: count - operationKey: "" - - name: dst_as_connection_count - groupByKeys: - - dstAS - operationType: count - operationKey: "" - - name: src_as_connection_count - groupByKeys: - - srcAS - operationType: count - operationKey: "" - - name: count_source_destination_subnet - groupByKeys: - - dstSubnet24 - - srcSubnet24 - operationType: count - operationKey: "" - - name: bandwidth_destination_subnet - groupByKeys: - - dstSubnet - operationType: sum - operationKey: bytes - - name: bandwidth_namespace - groupByKeys: - - srcK8S_Namespace - - srcK8S_Type - operationType: sum - operationKey: bytes - - name: dest_connection_location_count - groupByKeys: - - dstLocation_CountryName - operationType: count - operationKey: "" - - name: mice_count - groupByKeys: - - mice_Evaluate - operationType: count - operationKey: "" - - name: elephant_count - groupByKeys: - - elephant_Evaluate - operationType: count - operationKey: "" - - name: dest_service_count - groupByKeys: - - service - operationType: count - operationKey: "" + rules: + - name: bandwidth_network_service + groupByKeys: + - service + operationType: sum + operationKey: bytes + - name: bandwidth_source_destination_subnet + groupByKeys: + - dstSubnet24 + - srcSubnet24 + operationType: sum + operationKey: bytes + - name: bandwidth_source_subnet + groupByKeys: + - srcSubnet + operationType: sum + operationKey: bytes + - name: dest_connection_subnet_count + groupByKeys: + - dstSubnet + operationType: sum + operationKey: isNewFlow + - name: src_connection_count + groupByKeys: + - srcSubnet + operationType: count + operationKey: "" + - name: TCPFlags_count + groupByKeys: + - TCPFlags + operationType: count + operationKey: "" + - name: dst_as_connection_count + groupByKeys: + - dstAS + operationType: count + operationKey: "" + - name: src_as_connection_count + groupByKeys: + - srcAS + operationType: count + operationKey: "" + - name: count_source_destination_subnet + groupByKeys: + - dstSubnet24 + - srcSubnet24 + operationType: count + operationKey: "" + - name: bandwidth_destination_subnet + groupByKeys: + - dstSubnet + operationType: sum + operationKey: bytes + - name: bandwidth_namespace + groupByKeys: + - srcK8S_Namespace + - srcK8S_Type + operationType: sum + operationKey: bytes + - name: dest_connection_location_count + groupByKeys: + - dstLocation_CountryName + operationType: count + operationKey: "" + - name: mice_count + groupByKeys: + - mice_Evaluate + operationType: count + operationKey: "" + - name: elephant_count + groupByKeys: + - elephant_Evaluate + operationType: count + operationKey: "" + - name: dest_service_count + groupByKeys: + - service + operationType: count + operationKey: "" type: aggregates name: extract_aggregate - encode: diff --git a/pkg/test/network_defs.go b/pkg/test/network_defs.go index 70c2eb860..8003acb36 100644 --- a/pkg/test/network_defs.go +++ b/pkg/test/network_defs.go @@ -72,12 +72,14 @@ transform: type: add_service parameters: proto extract: + types: aggregates aggregates: - - name: test_aggregates - groupByKeys: - - service - operationType: sum - operationKey: test_operation_key + rules: + - name: test_aggregates + groupByKeys: + - service + operationType: sum + operationKey: test_operation_key encode: type: prom prom: @@ -109,12 +111,14 @@ tags: - test - label extract: + type: aggregates aggregates: - - name: test_agg_histo - groupByKeys: - - service - operationType: sum - operationKey: test_operation_key + rules: + - name: test_agg_histo + groupByKeys: + - service + operationType: sum + operationKey: test_operation_key encode: type: prom prom: