Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest stage to provide synthetic workload for benchmarks #395

Merged
merged 8 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### ingest_synthetic_flows_processed
| **Name** | ingest_synthetic_flows_processed |
|:---|:---|
| **Description** | Number of flow logs processed |
| **Type** | counter |
| **Labels** | stage |


### metrics_dropped
| **Name** | metrics_dropped |
|:---|:---|
Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
FileType = "file"
FileLoopType = "file_loop"
FileChunksType = "file_chunks"
SyntheticType = "synthetic"
CollectorType = "collector"
GRPCType = "grpc"
FakeType = "fake"
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type IngestSynthetic struct {
Connections int `yaml:"connections,omitempty" json:"connections,omitempty" doc:"number of connections to maintain"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
FlowLogsPerMin int `yaml:"flowLogsPerMin,omitempty" json:"flowLogsPerMin,omitempty" doc:"the number of flow logs to send per minute"`
}
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Ingest struct {
Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"`
Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"`
Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"`
}

type File struct {
Expand Down Expand Up @@ -153,6 +154,8 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) {
return out, err
}
logrus.Debugf("metrics settings = %v ", out.MetricsSettings)
} else {
logrus.Errorf("metrics settings missing")
}

return out, nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/encode/prom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -109,12 +110,12 @@ func Test_Prom_Cache1(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 10, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -129,12 +130,12 @@ func Test_Prom_Cache2(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -149,12 +150,12 @@ func Test_Prom_Cache3(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 80, promEncode.mCache.GetCacheLen())
Expand Down
7 changes: 4 additions & 3 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -901,15 +902,15 @@ func TestMaxConnections(t *testing.T) {
ct := extract.(*conntrackImpl)
require.Equal(t, 0, ct.connStore.len())

flowLogs := test.GenerateConnectionEntries(10)
flowLogs := utils.GenerateConnectionFlowEntries(10)
ct.Extract(flowLogs)
require.Equal(t, 10, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(20)
flowLogs = utils.GenerateConnectionFlowEntries(20)
ct.Extract(flowLogs)
require.Equal(t, 20, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(40)
flowLogs = utils.GenerateConnectionFlowEntries(40)
ct.Extract(flowLogs)
require.Equal(t, maxConnections, ct.connStore.len())
}
Expand Down
107 changes: 107 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

type IngestSynthetic struct {
params api.IngestSynthetic
exitChan <-chan struct{}
flowLogsProcessed prometheus.Counter
}

const (
defaultConnections = 100
defaultBatchLen = 10
defaultFlowLogsPerMin = 2000
)

var (
flowLogsProcessed = operational.DefineMetric(
"ingest_synthetic_flows_processed",
"Number of flow logs processed",
operational.TypeCounter,
"stage",
)
)

// Ingest generates flow logs according to provided parameters
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
log.Debugf("entering IngestSynthetic Ingest, params = %v", ingestS.params)
// get a list of flow log entries, one per desired connection
// these flow logs will be sent again and again to simulate ongoing traffic on those connections
flowLogs := utils.GenerateConnectionFlowEntries(ingestS.params.Connections)
nLogs := len(flowLogs)
next := 0

// compute time interval between batches; divide BatchMaxLen by FlowLogsPerMin and adjust the types
ticker := time.NewTicker(time.Duration(int(time.Minute*time.Duration(ingestS.params.BatchMaxLen)) / ingestS.params.FlowLogsPerMin))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The computation of the interval between batches looks complex to me.
I don't think I understand it.
What does it mean to create a Duration from BatchMaxLen and multiply it by time.Minute?

time.Duration(ingestS.params.BatchMaxLen)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment


// loop forever
for {
select {
case <-ingestS.exitChan:
log.Debugf("exiting IngestSynthetic because of signal")
return
case <-ticker.C:
log.Debugf("sending a batch of %d flow logs from index %d", ingestS.params.BatchMaxLen, next)
for i := 0; i < ingestS.params.BatchMaxLen; i++ {
out <- flowLogs[next]
ingestS.flowLogsProcessed.Inc()
next++
if next >= nLogs {
next = 0
}
}
}
}
}

// NewIngestSynthetic create a new ingester
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
confIngestSynthetic = *params.Ingest.Synthetic
}
if confIngestSynthetic.Connections == 0 {
confIngestSynthetic.Connections = defaultConnections
}
if confIngestSynthetic.FlowLogsPerMin == 0 {
confIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin
}
if confIngestSynthetic.BatchMaxLen == 0 {
confIngestSynthetic.BatchMaxLen = defaultBatchLen
}
log.Debugf("params = %v", confIngestSynthetic)

return &IngestSynthetic{
params: confIngestSynthetic,
exitChan: utils.ExitChannel(),
flowLogsProcessed: opMetrics.NewCounter(&flowLogsProcessed, params.Name),
}, nil
}
91 changes: 91 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/stretchr/testify/require"
)

func TestIngestSynthetic(t *testing.T) {
// check default values
params := config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &api.IngestSynthetic{},
},
}
ingest, err := NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn := ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, defaultBatchLen, syn.params.BatchMaxLen)
require.Equal(t, defaultConnections, syn.params.Connections)
require.Equal(t, defaultFlowLogsPerMin, syn.params.FlowLogsPerMin)

batchMaxLen := 3
connections := 20
flowLogsPerMin := 1000
synthetic := api.IngestSynthetic{
BatchMaxLen: batchMaxLen,
Connections: connections,
FlowLogsPerMin: flowLogsPerMin,
}
params = config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &synthetic,
},
}
ingest, err = NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn = ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, batchMaxLen, syn.params.BatchMaxLen)
require.Equal(t, connections, syn.params.Connections)
require.Equal(t, flowLogsPerMin, syn.params.FlowLogsPerMin)

// run the Ingest method in a separate thread
ingestOutput := make(chan config.GenericMap)
go syn.Ingest(ingestOutput)

type connection struct {
srcAddr string
dstAddr string
srcPort int
dstPort int
protocol int
}

// Start collecting flows from the ingester and ensure we have the specified number of distinct connections
connectionMap := make(map[connection]int)
for i := 0; i < (3 * connections); i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the hardcoded 3 the batchMaxLen? or just an arbitrary number to make sure we have multiple flow logs per connection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just to have (many) more flow logs than connections, and to verify that we accumulate the proper number of connections with multiple flow logs per connection.

flowEntry := <-ingestOutput
conn := connection{
srcAddr: flowEntry["SrcAddr"].(string),
dstAddr: flowEntry["DstAddr"].(string),
srcPort: flowEntry["SrcPort"].(int),
dstPort: flowEntry["DstPort"].(int),
protocol: flowEntry["Proto"].(int),
}
connectionMap[conn]++
}
require.Equal(t, connections, len(connectionMap))
}
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge
switch params.Ingest.Type {
case api.FileType, api.FileLoopType, api.FileChunksType:
ingester, err = ingest.NewIngestFile(params)
case api.SyntheticType:
ingester, err = ingest.NewIngestSynthetic(opMetrics, params)
case api.CollectorType:
ingester, err = ingest.NewIngestCollector(opMetrics, params)
case api.KafkaType:
Expand Down
Loading