Skip to content

Commit

Permalink
make expiryTime in encode_prom into a Duration (#411)
Browse files Browse the repository at this point in the history
* made expirtyTime in encode_prom into a Duration

* fixed test

* addressed reviewer comments
  • Loading branch information
KalmanMeth authored Mar 28, 2023
1 parent 9af6a7c commit 99a2598
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Following is the supported API format for prometheus encode:
labels: labels to be associated with the metric
buckets: histogram buckets
prefix: prefix added to each metric name
expiryTime: seconds of no-flow to wait before deleting prometheus data item
expiryTime: time duration of no-flow to wait before deleting prometheus data item
maxMetrics: maximum number of metrics to report (default: unlimited)
</pre>
## Kafka encode API
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PromTLSConf struct {
type PromEncode struct {
Metrics PromMetricsItems `yaml:"metrics,omitempty" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
ExpiryTime Duration `yaml:"expiryTime,omitempty" json:"expiryTime,omitempty" doc:"time duration of no-flow to wait before deleting prometheus data item"`
MaxMetrics int `yaml:"maxMetrics,omitempty" json:"maxMetrics,omitempty" doc:"maximum number of metrics to report (default: unlimited)"`
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config
import (
"encoding/json"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -117,6 +118,8 @@ func TestKafkaPromPipeline(t *testing.T) {
GroupByKeys: api.AggregateBy{"srcAS"},
OperationType: "count",
}})
var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(50 * time.Second)
pl = pl.EncodePrometheus("prom", api.PromEncode{
Metrics: api.PromMetricsItems{{
Name: "connections_per_source_as",
Expand All @@ -129,7 +132,8 @@ func TestKafkaPromPipeline(t *testing.T) {
Labels: []string{"by", "aggregate"},
Buckets: []float64{},
}},
Prefix: "flp_",
Prefix: "flp_",
ExpiryTime: expiryTimeDuration,
})
stages := pl.GetStages()
require.Len(t, stages, 5)
Expand Down Expand Up @@ -159,7 +163,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[4])
require.NoError(t, err)
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
require.JSONEq(t, `{"name":"prom","encode":{"type":"prom","prom":{"expiryTime":"50s", "metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ parameters:
type: prom
prom:
prefix: test_
expiryTime: 1
expiryTime: 1s
metrics:
- name: flow_count
type: counter
Expand Down
10 changes: 5 additions & 5 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
log "github.com/sirupsen/logrus"
)

const defaultExpiryTime = 2 * time.Minute
const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
Expand Down Expand Up @@ -267,9 +267,9 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
cfg = *params.Encode.Prom
}

expiryTime := time.Duration(cfg.ExpiryTime) * time.Second
if expiryTime == 0 {
expiryTime = defaultExpiryTime
expiryTime := cfg.ExpiryTime
if expiryTime.Duration == 0 {
expiryTime.Duration = defaultExpiryTime
}
log.Debugf("expiryTime = %v", expiryTime)

Expand Down Expand Up @@ -348,7 +348,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
gauges: gauges,
histos: histos,
aggHistos: aggHistos,
expiryTime: expiryTime,
expiryTime: expiryTime.Duration,
mCache: putils.NewTimedCache(cfg.MaxMetrics, mChacheLenMetric),
mChacheLenMetric: mChacheLenMetric,
exitChan: putils.ExitChannel(),
Expand Down
15 changes: 10 additions & 5 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ parameters:
type: prom
prom:
prefix: test_
expiryTime: 1
expiryTime: 1s
metrics:
- name: Bytes
type: gauge
Expand Down Expand Up @@ -146,10 +146,11 @@ func Test_CustomMetric(t *testing.T) {
"packets": 2,
"latency": 0.2,
}}

var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 60,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down Expand Up @@ -229,9 +230,11 @@ func Test_MetricTTL(t *testing.T) {
"bytes": 12,
}}

var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(1 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 1,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down Expand Up @@ -281,9 +284,11 @@ func hundredFlows() []config.GenericMap {
}

func BenchmarkPromEncode(b *testing.B) {
var expiryTimeDuration api.Duration
expiryTimeDuration.Duration = time.Duration(60 * time.Second)
params := api.PromEncode{
Prefix: "test_",
ExpiryTime: 60,
ExpiryTime: expiryTimeDuration,
Metrics: []api.PromMetricsItem{{
Name: "bytes_total",
Type: "counter",
Expand Down

0 comments on commit 99a2598

Please sign in to comment.