Skip to content

Commit

Permalink
initial framework from ipfixlookupprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
fizzers123 committed Dec 23, 2023
1 parent 8186605 commit 7458013
Show file tree
Hide file tree
Showing 15 changed files with 1,857 additions and 0 deletions.
1 change: 1 addition & 0 deletions processor/ipfixlookupprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
149 changes: 149 additions & 0 deletions processor/ipfixlookupprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# IPFIX Lookup Processor
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Distributions | [contrib]|
| Issues |[![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Fipfix%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Fipfix) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Fipfix%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Fipfix) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib

## Supported Pipeline Types

| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] |
| ------------------------ | ------------------------ | ----------------- |
| traces | traces | [development] |

[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels
<!-- end autogenerated section -->

[ElasticSearch]: https://www.elastic.co/elasticsearch/

The `ipfixlookup` processor can be used to inject IPFIX spans into existing traces.

## Prerequisits

You need to store your Netflow/IPFIX logs in [ElasticSearch] to use this processor. Please check the [Netflow Integration](https://docs.elastic.co/en/integrations/netflow) for more information.

### Default Configuration

The `ipfixlookup` processorr will require the following minimum configuration.


For example, in the following configuration, the processor will connect to the specified [ElasticSearch] instance and search for IPFIX/Netflow events in the specified time window.
```yaml
processors:
groupbytrace:
wait_duration: 100s
num_traces: 1000
num_workers: 2
ipfix_lookup:
elastic_search:
connection:
addresses:
- https://<elastic-search-address>:9200/
username: elastic
password: <password>
certificate_fingerprint: <certificate fingerprint>
timing:
lookup_window: 25

service:
pipelines:
traces:
receivers: [otlp]
processors: [groupbytrace, ipfix_lookup]
exporters: [otlp/jaeger, debug]
```
### Custom lookup fields
Optionally, you can specify the fields the processor will look up in [ElasticSearch] and match within the spans
| Full Path Configuration | Description |
| -------------------------------------------- | --------------------------------------------------------------------------------------------- |
| `ipfix_lookup.query_parameters` | Parameters used for querying the IPFIX lookup processor. |
| `ipfix_lookup.base_query.field_name` | The name of the field used in the base query. |
| `ipfix_lookup.base_query.field_value` | The value of the field used in the base query. |
| `ipfix_lookup.device_identifier` | The field used to identify the device in IPFIX records. |
| `ipfix_lookup.lookup_fields.source_ip` | Field representing the source IP address. |
| `ipfix_lookup.lookup_fields.source_port` | Field representing the source port. |
| `ipfix_lookup.lookup_fields.destination_ip` | Field representing the destination IP address. |
| `ipfix_lookup.lookup_fields.destination_port`| Field representing the destination port. |
| `ipfix_lookup.span_attribute_fields` | Fields to be added as attributes to the new span. In [gjson](https://gjson.dev/) format. |
| `ipfix_lookup.spans.source_ips` | Fields representing the source IP address of the span. |
| `ipfix_lookup.spans.source_ports` | Fields representing the source port of the span. |
| `ipfix_lookup.spans.destination_ip_and_port` | Fields representing the destination IP and port of the span. Like: `192.168.10.10:443` |
| `ipfix_lookup.spans.destination_ips` | Fields representing the destination IP address of the span. |
| `ipfix_lookup.spans.destination_ports` | Fields representing the destination port of the span. |

The configuration below shows the default values:


```yaml
processors:
groupbytrace:
wait_duration: 10s
num_traces: 1000
num_workers: 2
ipfix_lookup:
query_parameters:
base_query:
field_name: input.type
field_value: netflow
device_identifier: "fields.observer\\.ip.0"
lookup_fields:
source_ip: source.ip
source_port: source.port
destination_ip: destination.ip
destination_port: destination.port
span_attribute_fields:
- "@this"
- "fields.event\\.duration.0"
- "fields.observer\\.ip.0"
- "fields.source\\.ip.0"
- "fields.source\\.port.0"
- "fields.destination\\.ip.0"
- "fields.destination\\.port.0"
- "fields.netflow\\.ip_next_hop_ipv4_address"
spans:
span_fields:
source_ips:
- net.peer.ip
- net.peer.name
- src.ip
source_ports:
- net.peer.port
- src.port
destination_ip_and_port:
- http.host
destination_ips:
- dst.ip
- net.peer.name
destination_ports:
- dst.port
```

### Timings:

This processor is responsible for looking through the spans in each trace. If the IP and port quartet (`source.ip, source.port, destination.ip, destination.port`) are found in a span, the corresponding flow is looked up in ElasticSearch. When flows are found, a new span is added to the trace, and the trace is exported.

![CorrelationUnitv3 drawio](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/5bb8e7de-c254-4991-871d-05c9c6d6f3f6)


The timing configuration is needed because there is an ingest delay in any large distributed search engine. Because of this, the processor must wait a bit before the search can be started. This delay can be defined in the `processors.groupbytrace.wait_duration` value. Afterwards, the search can be started. The time window that will be searched can be configured in the` processors.ipfix_lookup.timing.lookup_window`. To keep the processor simple, the lookup_window is added before the start timestamp and after the end timestamp. This way, the chance that the Netflow/IPFIX records leading or being caused by this span is found is maximized.


# Example screenshot
Example of a working implementation:
![finnal-implementation](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/37036d33-07f1-4c9e-bdea-7834a5e01015)
(The network was intentionally slowed down for this screenshot)




13 changes: 13 additions & 0 deletions processor/ipfixlookupprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor"

// Config for the processor
type Config struct {
// TODO
}

func (c *Config) Validate() error {
return nil
}
4 changes: 4 additions & 0 deletions processor/ipfixlookupprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor"
35 changes: 35 additions & 0 deletions processor/ipfixlookupprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
)

const (
// this is the name used to refer to the processor in the config.yaml
typeStr = "ipfixLookup"
)

func NewFactory() processor.Factory {

return processor.NewFactory(
typeStr,
createDefaultConfig,
processor.WithTraces(createTracesToTracesProcessor, component.StabilityLevelAlpha))
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createTracesToTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
c := newProcessor(params.Logger, cfg)
c.tracesConsumer = nextConsumer
return c, nil
}
40 changes: 40 additions & 0 deletions processor/ipfixlookupprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestFactory_Type(t *testing.T) {
factory := NewFactory()
assert.Equal(t, factory.Type(), component.Type(typeStr))
}

func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{})
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestNewFactory(t *testing.T) {
factory := NewFactory()
conn, err := factory.CreateTracesProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
factory.CreateDefaultConfig(),
consumertest.NewNop(),
)

assert.NoError(t, err)
assert.NotNil(t, conn)
}
66 changes: 66 additions & 0 deletions processor/ipfixlookupprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor

go 1.20

require (
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.88.1-0.20231026220224-6405e152a2d9
go.opentelemetry.io/collector/consumer v0.88.1-0.20231026220224-6405e152a2d9
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/processor v0.88.0
go.uber.org/zap v1.26.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.91.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector v0.88.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.0.0-20231106091314-ad6fa27ad929
go.opentelemetry.io/collector/config/configtelemetry v0.88.1-0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/collector/confmap v0.88.1-0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017.0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl => ../../pkg/ottl

retract (
v0.76.2
v0.76.1
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
Loading

0 comments on commit 7458013

Please sign in to comment.