Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest stage to provide synthetic workload for benchmarks #395

Merged
merged 8 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | stage |


### ingest_synthetic_flows_processed
| **Name** | ingest_synthetic_flows_processed |
|:---|:---|
| **Description** | Number of flow logs processed |
| **Type** | counter |
| **Labels** | stage |


### metrics_dropped
| **Name** | metrics_dropped |
|:---|:---|
Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
FileType = "file"
FileLoopType = "file_loop"
FileChunksType = "file_chunks"
SyntheticType = "synthetic"
CollectorType = "collector"
GRPCType = "grpc"
FakeType = "fake"
Expand Down
24 changes: 24 additions & 0 deletions pkg/api/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type IngestSynthetic struct {
Connections int `yaml:"connections,omitempty" json:"connections,omitempty" doc:"number of connections to maintain"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
FlowLogsPerMin int `yaml:"flowLogsPerMin,omitempty" json:"flowLogsPerMin,omitempty" doc:"the number of flow logs to send per minute"`
}
3 changes: 3 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Ingest struct {
Collector *api.IngestCollector `yaml:"collector,omitempty" json:"collector,omitempty"`
Kafka *api.IngestKafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
GRPC *api.IngestGRPCProto `yaml:"grpc,omitempty" json:"grpc,omitempty"`
Synthetic *api.IngestSynthetic `yaml:"synthetic,omitempty" json:"synthetic,omitempty"`
}

type File struct {
Expand Down Expand Up @@ -153,6 +154,8 @@ func ParseConfig(opts Options) (ConfigFileStruct, error) {
return out, err
}
logrus.Debugf("metrics settings = %v ", out.MetricsSettings)
} else {
logrus.Errorf("metrics settings missing")
}

return out, nil
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/encode/prom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -109,12 +110,12 @@ func Test_Prom_Cache1(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 10, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -129,12 +130,12 @@ func Test_Prom_Cache2(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 30, promEncode.mCache.GetCacheLen())
Expand All @@ -149,12 +150,12 @@ func Test_Prom_Cache3(t *testing.T) {
promEncode, err := initProm(cfg.Parameters[0].Encode.Prom)
require.NoError(t, err)

entries = test.GenerateConnectionEntries(10)
entries = utils.GenerateConnectionFlowEntries(10)
require.Equal(t, 10, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 20, promEncode.mCache.GetCacheLen())

entries = test.GenerateConnectionEntries(40)
entries = utils.GenerateConnectionFlowEntries(40)
require.Equal(t, 40, len(entries))
encodeEntries(promEncode, entries)
require.Equal(t, 80, promEncode.mCache.GetCacheLen())
Expand Down
7 changes: 4 additions & 3 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -901,15 +902,15 @@ func TestMaxConnections(t *testing.T) {
ct := extract.(*conntrackImpl)
require.Equal(t, 0, ct.connStore.len())

flowLogs := test.GenerateConnectionEntries(10)
flowLogs := utils.GenerateConnectionFlowEntries(10)
ct.Extract(flowLogs)
require.Equal(t, 10, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(20)
flowLogs = utils.GenerateConnectionFlowEntries(20)
ct.Extract(flowLogs)
require.Equal(t, 20, ct.connStore.len())

flowLogs = test.GenerateConnectionEntries(40)
flowLogs = utils.GenerateConnectionFlowEntries(40)
ct.Extract(flowLogs)
require.Equal(t, maxConnections, ct.connStore.len())
}
Expand Down
128 changes: 128 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

type IngestSynthetic struct {
params api.IngestSynthetic
exitChan <-chan struct{}
metricsProcessed prometheus.Counter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename metricsProcessed here as well

}

const (
defaultConnections = 100
defaultBatchLen = 10
defaultFlowLogsPerMin = 2000
)

var (
flowLogsProcessed = operational.DefineMetric(
"ingest_synthetic_flows_processed",
"Number of flow logs processed",
operational.TypeCounter,
"stage",
)
)

// Ingest generates flow logs according to provided parameters
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
log.Debugf("entering IngestSynthetic Ingest, params = %v", ingestS.params)
// get a list of flow log entries, one per desired connection
// these flow logs will be sent again and again to simulate ongoing traffic on those connections
flowLogs := utils.GenerateConnectionFlowEntries(ingestS.params.Connections)
nLogs := len(flowLogs)
next := 0

// compute time interval between batches; divide BatchMaxLen by FlowLogsPerMin and adjust the types
ticker := time.NewTicker(time.Duration(int(time.Minute*time.Duration(ingestS.params.BatchMaxLen)) / ingestS.params.FlowLogsPerMin))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The computation of the interval between batches looks complex to me.
I don't think I understand it.
What does it mean to create a Duration from BatchMaxLen and multiply it by time.Minute?

time.Duration(ingestS.params.BatchMaxLen)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment


// loop forever
// on each iteration send BatchMaxLen flow logs from the array of flow logs.
// if necessary, send the contents of the flowLogs array multiple times (in sub-batches) to fill the number of BatchMaxLen flow logs needed
for {
select {
case <-ingestS.exitChan:
log.Debugf("exiting IngestSynthetic because of signal")
return
case <-ticker.C:
// flowsLeft designates the number out of BatchMaxLen that must still be sent in this batch
// next designates the next flow log entry to be sent
// remainder designates how many flow logs remain in the flowLogs array that can be sent on the next sub-batch.
flowsLeft := ingestS.params.BatchMaxLen
log.Debugf("flowsLeft = %d", flowsLeft)
subBatchLen := flowsLeft
for flowsLeft > 0 {
remainder := nLogs - next
if subBatchLen > remainder {
subBatchLen = remainder
}
log.Debugf("flowsLeft = %d, remainder = %d, subBatchLen = %d", flowsLeft, remainder, subBatchLen)
subBatch := flowLogs[next : next+subBatchLen]
ingestS.sendBatch(subBatch, out)
ingestS.metricsProcessed.Add(float64(subBatchLen))
flowsLeft -= subBatchLen
next += subBatchLen
if subBatchLen == remainder {
next = 0
subBatchLen = flowsLeft
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explored a different approach to achieve the goal of this loop:
https://github.com/ronensc/flowlogs-pipeline/blob/4266dc2e507e89c007a521d6eaeb7bc34de64abd/pkg/pipeline/ingest/ingest_synthetic.go#L73-L81

Please let me know if it is indeed equivalent and if you think it is clearer.
Since we loop over the sub-batch anyway in sendBatch() I incorporated it into the loop and removed sendBatch().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your suggestion is much simpler and cleaner. Done.

}
}
}

func (ingestS *IngestSynthetic) sendBatch(flows []config.GenericMap, out chan<- config.GenericMap) {
for _, flow := range flows {
out <- flow
}
}

// NewIngestSynthetic create a new ingester
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
confIngestSynthetic = *params.Ingest.Synthetic
}
if confIngestSynthetic.Connections == 0 {
confIngestSynthetic.Connections = defaultConnections
}
if confIngestSynthetic.FlowLogsPerMin == 0 {
confIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin
}
if confIngestSynthetic.BatchMaxLen == 0 {
confIngestSynthetic.BatchMaxLen = defaultBatchLen
}
log.Debugf("params = %v", confIngestSynthetic)

return &IngestSynthetic{
params: confIngestSynthetic,
exitChan: utils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&flowLogsProcessed, params.Name),
}, nil
}
91 changes: 91 additions & 0 deletions pkg/pipeline/ingest/ingest_synthetic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (C) 2023 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/stretchr/testify/require"
)

func TestIngestSynthetic(t *testing.T) {
// check default values
params := config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &api.IngestSynthetic{},
},
}
ingest, err := NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn := ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, defaultBatchLen, syn.params.BatchMaxLen)
require.Equal(t, defaultConnections, syn.params.Connections)
require.Equal(t, defaultFlowLogsPerMin, syn.params.FlowLogsPerMin)

batchMaxLen := 3
connections := 20
flowLogsPerMin := 1000
synthetic := api.IngestSynthetic{
BatchMaxLen: batchMaxLen,
Connections: connections,
FlowLogsPerMin: flowLogsPerMin,
}
params = config.StageParam{
Ingest: &config.Ingest{
Type: "synthetic",
Synthetic: &synthetic,
},
}
ingest, err = NewIngestSynthetic(operational.NewMetrics(&config.MetricsSettings{}), params)
syn = ingest.(*IngestSynthetic)
require.NoError(t, err)
require.Equal(t, batchMaxLen, syn.params.BatchMaxLen)
require.Equal(t, connections, syn.params.Connections)
require.Equal(t, flowLogsPerMin, syn.params.FlowLogsPerMin)

// run the Ingest method in a separate thread
ingestOutput := make(chan config.GenericMap)
go syn.Ingest(ingestOutput)

type connection struct {
srcAddr string
dstAddr string
srcPort int
dstPort int
protocol int
}

// Start collecting flows from the ingester and ensure we have the specified number of distinct connections
connectionMap := make(map[connection]int)
for i := 0; i < (3 * connections); i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the hardcoded 3 the batchMaxLen? or just an arbitrary number to make sure we have multiple flow logs per connection?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just to have (many) more flow logs than connections, and to verify that we accumulate the proper number of connections with multiple flow logs per connection.

flowEntry := <-ingestOutput
conn := connection{
srcAddr: flowEntry["SrcAddr"].(string),
dstAddr: flowEntry["DstAddr"].(string),
srcPort: flowEntry["SrcPort"].(int),
dstPort: flowEntry["DstPort"].(int),
protocol: flowEntry["Proto"].(int),
}
connectionMap[conn]++
}
require.Equal(t, connections, len(connectionMap))
}
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ func getIngester(opMetrics *operational.Metrics, params config.StageParam) (inge
switch params.Ingest.Type {
case api.FileType, api.FileLoopType, api.FileChunksType:
ingester, err = ingest.NewIngestFile(params)
case api.SyntheticType:
ingester, err = ingest.NewIngestSynthetic(opMetrics, params)
case api.CollectorType:
ingester, err = ingest.NewIngestCollector(opMetrics, params)
case api.KafkaType:
Expand Down
Loading