Skip to content

Commit

Permalink
addressed reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Mar 28, 2023
1 parent 90b35fd commit 4d59cae
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
2 changes: 1 addition & 1 deletion docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
### ingest_synthetic_flows_processed
| **Name** | ingest_synthetic_flows_processed |
|:---|:---|
| **Description** | Number of metrics processed |
| **Description** | Number of flow logs processed |
| **Type** | counter |
| **Labels** | stage |

Expand Down
59 changes: 33 additions & 26 deletions pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,55 @@ const (
)

var (
metricsProcessed = operational.DefineMetric(
flowLogsProcessed = operational.DefineMetric(
"ingest_synthetic_flows_processed",
"Number of metrics processed",
"Number of flow logs processed",
operational.TypeCounter,
"stage",
)
)

// IngestSynthetic Ingest generates flow logs according to provided parameters
// Ingest generates flow logs according to provided parameters
func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
log.Debugf("entering IngestSynthetic Ingest, params = %v", ingestS.params)
// get a list of flow log entries, one per desired connection
// these flow logs will be sent again and again to simulate ongoing traffic on those connections
flowLogs := utils.GenerateConnectionFlowEntries(ingestS.params.Connections)
nLogs := len(flowLogs)
next := 0

// compute time interval between batches
// compute time interval between batches; divide BatchMaxLen by FlowLogsPerMin and adjust the types
ticker := time.NewTicker(time.Duration(int(time.Minute*time.Duration(ingestS.params.BatchMaxLen)) / ingestS.params.FlowLogsPerMin))

// loop forever
// on each iteration send BatchMaxLen flow logs from the array of flow logs.
// if necessary, send the contents of the flowLogs array multiple times (in sub-batches) to fill the number of BatchMaxLen flow logs needed
for {
select {
case <-ingestS.exitChan:
log.Debugf("exiting IngestSynthetic because of signal")
return
case <-ticker.C:
// flowsLeft designates the number out of BatchMaxLen that must still be sent in this batch
// next designates the next flow log entry to be sent
// remainder designates how many flow logs remain in the flowLogs array that can be sent on the next sub-batch.
flowsLeft := ingestS.params.BatchMaxLen
log.Debugf("flowsLeft = %d", flowsLeft)
batchLen := flowsLeft
subBatchLen := flowsLeft
for flowsLeft > 0 {
remainder := nLogs - next
if batchLen > remainder {
batchLen = remainder
if subBatchLen > remainder {
subBatchLen = remainder
}
log.Debugf("flowsLeft = %d, remainder = %d, batchLen = %d", flowsLeft, remainder, batchLen)
batch := flowLogs[next : next+batchLen]
ingestS.sendBatch(batch, out)
ingestS.metricsProcessed.Add(float64(batchLen))
flowsLeft -= batchLen
next += batchLen
if batchLen == remainder {
log.Debugf("flowsLeft = %d, remainder = %d, subBatchLen = %d", flowsLeft, remainder, subBatchLen)
subBatch := flowLogs[next : next+subBatchLen]
ingestS.sendBatch(subBatch, out)
ingestS.metricsProcessed.Add(float64(subBatchLen))
flowsLeft -= subBatchLen
next += subBatchLen
if subBatchLen == remainder {
next = 0
batchLen = flowsLeft
subBatchLen = flowsLeft
}
}
}
Expand All @@ -98,24 +105,24 @@ func (ingestS *IngestSynthetic) sendBatch(flows []config.GenericMap, out chan<-
// NewIngestSynthetic create a new ingester
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
jsonIngestSynthetic := api.IngestSynthetic{}
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
jsonIngestSynthetic = *params.Ingest.Synthetic
confIngestSynthetic = *params.Ingest.Synthetic
}
if jsonIngestSynthetic.Connections == 0 {
jsonIngestSynthetic.Connections = defaultConnections
if confIngestSynthetic.Connections == 0 {
confIngestSynthetic.Connections = defaultConnections
}
if jsonIngestSynthetic.FlowLogsPerMin == 0 {
jsonIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin
if confIngestSynthetic.FlowLogsPerMin == 0 {
confIngestSynthetic.FlowLogsPerMin = defaultFlowLogsPerMin
}
if jsonIngestSynthetic.BatchMaxLen == 0 {
jsonIngestSynthetic.BatchMaxLen = defaultBatchLen
if confIngestSynthetic.BatchMaxLen == 0 {
confIngestSynthetic.BatchMaxLen = defaultBatchLen
}
log.Debugf("params = %v", jsonIngestSynthetic)
log.Debugf("params = %v", confIngestSynthetic)

return &IngestSynthetic{
params: jsonIngestSynthetic,
params: confIngestSynthetic,
exitChan: utils.ExitChannel(),
metricsProcessed: opMetrics.NewCounter(&metricsProcessed, params.Name),
metricsProcessed: opMetrics.NewCounter(&flowLogsProcessed, params.Name),
}, nil
}

0 comments on commit 4d59cae

Please sign in to comment.