Skip to content

Commit

Permalink
moved add_if and add_regex_if to transform_filter from transform_network
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Jul 26, 2023
1 parent 7513aa9 commit 7ae760f
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 194 deletions.
38 changes: 5 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,8 @@ Using `remove_entry_if_not_equal` will remove the entry if the specified field e

1. Resolve subnet from IP addresses
1. Resolve known network service names from port numbers and protocols
1. Perform simple mathematical transformations on field values
1. Compute geo-location from IP addresses
1. Resolve kubernetes information from IP addresses
1. Perform regex operations on field values

Example configuration:

Expand All @@ -408,15 +406,6 @@ parameters:
output: srcSubnet
type: add_subnet
parameters: /24
- input: value
output: value_smaller_than10
type: add_if
parameters: <10
- input: value
output: dir
type: add_if
parameters: ==1
assignee: in
- input: dstPort
output: service
type: add_service
Expand All @@ -427,36 +416,25 @@ parameters:
- input: srcIP
output: srcK8S
type: add_kubernetes
- input: srcSubnet
output: match-10.0
type: add_regex_if
parameters: 10.0.*
```

The first rule `add_subnet` generates a new field named `srcSubnet` with the
The rule `add_subnet` generates a new field named `srcSubnet` with the
subnet of `srcIP` calculated based on prefix length from the `parameters` field

The second `add_if` generates a new field named `value_smaller_than10` that contains
the contents of the `value` field for entries that satisfy the condition specified
in the `parameters` variable (smaller than 10 in the example above). In addition, the
field `value_smaller_than10_Evaluate` with value `true` is added to all satisfied
entries. if `assignee` field is set, then on satified parmater i.e. if parameter evalutes true then
`output` value will get value of `assignee` key.

The third rule `add_service` generates a new field named `service` with the known network
The rule `add_service` generates a new field named `service` with the known network
service name of `dstPort` port and `protocol` protocol. Unrecognized ports are ignored
> Note: `protocol` can be either network protocol name or number
>
> Note: optionally supports custom network services resolution by defining configuration parameters
> `servicesFile` and `protocolsFile` with paths to custom services/protocols files respectively

The fourth rule `add_location` generates new fields with the geo-location information retrieved
The rule `add_location` generates new fields with the geo-location information retrieved
from DB [ip2location](https://lite.ip2location.com/) based on `dstIP` IP.
All the geo-location fields will be named by appending `output` value
(`dstLocation` in the example above) to their names in the [ip2location](https://lite.ip2location.com/ DB
(e.g., `CountryName`, `CountryLongName`, `RegionName`, `CityName` , `Longitude` and `Latitude`)

The fifth rule `add_kubernetes` generates new fields with kubernetes information by
The rule `add_kubernetes` generates new fields with kubernetes information by
matching the `input` value (`srcIP` in the example above) with kubernetes `nodes`, `pods` and `services` IPs.
All the kubernetes fields will be named by appending `output` value
(`srcK8S` in the example above) to the kubernetes metadata field names
Expand All @@ -470,13 +448,7 @@ will be generated, and named by appending `parameters` value to the label keys.
> 2. using `KUBECONFIG` environment variable
> 3. using local `~/.kube/config`

The sixth rule `add_regex_if` generates a new field named `match-10.0` that contains
the contents of the `srcSubnet` field for entries that match regex expression specified
in the `parameters` variable. In addition, the field `match-10.0_Matched` with
value `true` is added to all matched entries


> Note: above example describes all available transform network `Type` options
> Note: above example describes the most common available transform network `Type` options

> Note: above transform is essential for the `aggregation` phase

Expand Down
7 changes: 5 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,19 @@ Following is the supported API format for filter transformations:
filter:
rules: list of filter rules, each includes:
input: entry input field
output: entry output field
type: (enum) one of the following:
remove_field: removes the field from the entry
remove_entry_if_exists: removes the entry if the field exists
remove_entry_if_doesnt_exist: removes the entry if the field does not exist
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
add_regex_if: add output field if input field satisfies regex pattern from parameters field
value: specified value of input field:
parameters: parameters specific to type
assignee: value needs to assign to output field
</pre>
## Transform Network API
Following is the supported API format for network transformations:
Expand All @@ -149,8 +154,6 @@ Following is the supported API format for network transformations:
input: entry input field
output: entry output field
type: (enum) one of the following:
add_regex_if: add output field if input field satisfies regex pattern from parameters field
add_if: add output field if input field satisfies criteria from parameters field
add_subnet: add output subnet field from input field and prefix length from parameters field
add_location: add output location fields from input
add_service: add output network service field from input port and parameters protocol field
Expand Down
11 changes: 8 additions & 3 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ type TransformFilterOperationEnum struct {
RemoveEntryIfEqual string `yaml:"remove_entry_if_equal" json:"remove_entry_if_equal" doc:"removes the entry if the field value equals specified value"`
RemoveEntryIfNotEqual string `yaml:"remove_entry_if_not_equal" json:"remove_entry_if_not_equal" doc:"removes the entry if the field value does not equal specified value"`
AddFieldIfDoesntExist string `yaml:"add_field_if_doesnt_exist" json:"add_field_if_doesnt_exist" doc:"adds a field to the entry if the field does not exist"`
AddFieldIf string `yaml:"add_field_if" json:"add_field_if" doc:"add output field set to assignee if input field satisfies criteria from parameters field"`
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
}

func TransformFilterOperationName(operation string) string {
return GetEnumName(TransformFilterOperationEnum{}, operation)
}

type TransformFilterRule struct {
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
Value interface{} `yaml:"value,omitempty" json:"value,omitempty" doc:"specified value of input field:"`
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
Assignee string `yaml:"assignee,omitempty" json:"assignee,omitempty" doc:"value needs to assign to output field"`
}
4 changes: 0 additions & 4 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
}

const (
OpAddRegexIf = "add_regex_if"
OpAddIf = "add_if"
OpAddSubnet = "add_subnet"
OpAddLocation = "add_location"
OpAddService = "add_service"
Expand All @@ -50,8 +48,6 @@ const (
)

type TransformNetworkOperationEnum struct {
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"`
AddSubnet string `yaml:"add_subnet" json:"add_subnet" doc:"add output subnet field from input field and prefix length from parameters field"`
AddLocation string `yaml:"add_location" json:"add_location" doc:"add output location fields from input"`
AddService string `yaml:"add_service" json:"add_service" doc:"add output network service field from input port and parameters protocol field"`
Expand Down
29 changes: 29 additions & 0 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package transform

import (
"fmt"
"regexp"

"github.com/Knetic/govaluate"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -62,6 +66,31 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
if _, ok := entry[rule.Input]; !ok {
outputEntry[rule.Input] = rule.Value
}
case api.TransformFilterOperationName("AddRegExIf"):
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
continue
}
if matched {
outputEntry[rule.Output] = outputEntry[rule.Input]
outputEntry[rule.Output+"_Matched"] = true
}
case api.TransformFilterOperationName("AddFieldIf"):
expressionString := fmt.Sprintf("val %s", rule.Parameters)
expression, err := govaluate.NewEvaluableExpression(expressionString)
if err != nil {
log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
continue
}
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
if evaluateErr == nil && result.(bool) {
if rule.Assignee != "" {
outputEntry[rule.Output] = rule.Assignee
} else {
outputEntry[rule.Output] = outputEntry[rule.Input]
}
outputEntry[rule.Output+"_Evaluate"] = true
}
default:
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/pipeline/transform/transform_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package transform
import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -230,3 +231,128 @@ func InitNewTransformFilter(t *testing.T, configFile string) Transformer {
require.NoError(t, err)
return newTransform
}

func Test_Transform_AddIfScientificNotation(t *testing.T) {
newNetworkFilter := Filter{
Rules: []api.TransformFilterRule{
{
Input: "value",
Output: "bigger_than_10",
Type: "add_field_if",
Parameters: ">10",
},
{
Input: "value",
Output: "smaller_than_10",
Type: "add_field_if",
Parameters: "<10",
},
{
Input: "value",
Output: "dir",
Assignee: "in",
Type: "add_field_if",
Parameters: "==1",
},
{
Input: "value",
Output: "dir",
Assignee: "out",
Type: "add_field_if",
Parameters: "==0",
},
{
Input: "value",
Output: "not_one",
Assignee: "true",
Type: "add_field_if",
Parameters: "!=1",
},
{
Input: "value",
Output: "not_one",
Assignee: "false",
Type: "add_field_if",
Parameters: "==1",
},
},
}

var entry config.GenericMap
entry = config.GenericMap{
"value": 1.2345e67,
}
output, ok := newNetworkFilter.Transform(entry)
require.True(t, ok)
require.Equal(t, true, output["bigger_than_10_Evaluate"])
require.Equal(t, 1.2345e67, output["bigger_than_10"])
require.Equal(t, "true", output["not_one"])

entry = config.GenericMap{
"value": 1.2345e-67,
}
output, ok = newNetworkFilter.Transform(entry)
require.True(t, ok)
require.Equal(t, true, output["smaller_than_10_Evaluate"])
require.Equal(t, 1.2345e-67, output["smaller_than_10"])
require.Equal(t, "true", output["not_one"])

entry = config.GenericMap{
"value": 1,
}
output, ok = newNetworkFilter.Transform(entry)
require.True(t, ok)
require.Equal(t, true, output["dir_Evaluate"])
require.Equal(t, "in", output["dir"])
require.Equal(t, "false", output["not_one"])

entry = config.GenericMap{
"value": 0,
}
output, ok = newNetworkFilter.Transform(entry)
require.True(t, ok)
require.Equal(t, true, output["dir_Evaluate"])
require.Equal(t, "out", output["dir"])
require.Equal(t, "true", output["not_one"])
}

func Test_TransformFilterDependentRulesAddRegExIf(t *testing.T) {
var yamlConfig = []byte(`
log-level: debug
pipeline:
- name: transform1
- name: write1
follows: transform1
parameters:
- name: transform1
transform:
type: filter
filter:
rules:
- input: subnetSrcIP
type: add_field_if_doesnt_exist
value: 10.0.0.0/24
- input: subnetSrcIP
output: match-10.0.*
type: add_regex_if
parameters: 10.0.*
- input: subnetSrcIP
output: match-11.0.*
type: add_regex_if
parameters: 11.0.*
- name: write1
write:
type: stdout
`)
newNetworkFilter := InitNewTransformFilter(t, string(yamlConfig)).(*Filter)
require.NotNil(t, newNetworkFilter)

entry := test.GetIngestMockEntry(false)
output, ok := newNetworkFilter.Transform(entry)
require.True(t, ok)

require.Equal(t, "10.0.0.1", output["srcIP"])
require.Equal(t, "10.0.0.0/24", output["subnetSrcIP"])
require.Equal(t, "10.0.0.0/24", output["match-10.0.*"])
require.NotEqual(t, "10.0.0.0/24", output["match-11.0.*"])
}
27 changes: 0 additions & 27 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import (
"fmt"
"net"
"os"
"regexp"
"strconv"
"time"

"github.com/Knetic/govaluate"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
Expand Down Expand Up @@ -56,31 +54,6 @@ func (n *Network) Transform(inputEntry config.GenericMap) (config.GenericMap, bo
// TODO: for efficiency and maintainability, maybe each case in the switch below should be an individual implementation of Transformer
for _, rule := range n.Rules {
switch rule.Type {
case api.OpAddRegexIf:
matched, err := regexp.MatchString(rule.Parameters, fmt.Sprintf("%s", outputEntry[rule.Input]))
if err != nil {
continue
}
if matched {
outputEntry[rule.Output] = outputEntry[rule.Input]
outputEntry[rule.Output+"_Matched"] = true
}
case api.OpAddIf:
expressionString := fmt.Sprintf("val %s", rule.Parameters)
expression, err := govaluate.NewEvaluableExpression(expressionString)
if err != nil {
log.Warningf("Can't evaluate AddIf rule: %+v expression: %v. err %v", rule, expressionString, err)
continue
}
result, evaluateErr := expression.Evaluate(map[string]interface{}{"val": outputEntry[rule.Input]})
if evaluateErr == nil && result.(bool) {
if rule.Assignee != "" {
outputEntry[rule.Output] = rule.Assignee
} else {
outputEntry[rule.Output] = outputEntry[rule.Input]
}
outputEntry[rule.Output+"_Evaluate"] = true
}
case api.OpAddSubnet:
_, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%v%s", outputEntry[rule.Input], rule.Parameters))
if err != nil {
Expand Down
Loading

0 comments on commit 7ae760f

Please sign in to comment.