From 7b51d9c9b5d0f5118db4dc3a5217b5375aaa560f Mon Sep 17 00:00:00 2001 From: Vladislav Byrgazov <58589910+Ex4amp1e@users.noreply.github.com> Date: Sun, 19 May 2024 02:33:13 +0800 Subject: [PATCH] Fix memory leak in metrics chain element (#1626) * Fix memory leak in metrics chain element Signed-off-by: Vladislav Byrgazov * Fix lint issues Signed-off-by: Vladislav Byrgazov * Added check is opentelemetry enabled and fixed copyrights Signed-off-by: Vladislav Byrgazov * Fix metrics memory leak by storing temp connection data in metadata Signed-off-by: Vladislav Byrgazov * Added copyright Signed-off-by: Vladislav Byrgazov * Address review comments Signed-off-by: Vladislav Byrgazov * Fixed import Signed-off-by: Vladislav Byrgazov --------- Signed-off-by: Vladislav Byrgazov Co-authored-by: Vladislav Byrgazov --- pkg/networkservice/common/metrics/metadata.go | 14 +++++--- pkg/networkservice/common/metrics/server.go | 35 ++++++++++++------- .../common/metrics/server_test.go | 23 +++++++++--- pkg/tools/opentelemetry/README.md | 2 +- 4 files changed, 52 insertions(+), 22 deletions(-) diff --git a/pkg/networkservice/common/metrics/metadata.go b/pkg/networkservice/common/metrics/metadata.go index fb70598b3..e57319743 100644 --- a/pkg/networkservice/common/metrics/metadata.go +++ b/pkg/networkservice/common/metrics/metadata.go @@ -1,6 +1,7 @@ -// Copyright (c) 2022 Cisco and/or its affiliates. // Copyright (c) 2023 Nordix Foundation. // +// Copyright (c) 2022-2024 Cisco and/or its affiliates. +// // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,6 +20,7 @@ package metrics import ( "context" + "sync" "go.opentelemetry.io/otel/metric" @@ -26,9 +28,13 @@ import ( ) type keyType struct{} -type metricsMap = map[string]metric.Int64Counter -func loadOrStore(ctx context.Context, metrics metricsMap) (value metricsMap, ok bool) { +type metricsData struct { + counter map[string]metric.Int64Counter + previous sync.Map +} + +func loadOrStore(ctx context.Context, metrics *metricsData) (value *metricsData, ok bool) { rawValue, ok := metadata.Map(ctx, false).LoadOrStore(keyType{}, metrics) - return rawValue.(metricsMap), ok + return rawValue.(*metricsData), ok } diff --git a/pkg/networkservice/common/metrics/server.go b/pkg/networkservice/common/metrics/server.go index 954e7d460..178b0cc56 100644 --- a/pkg/networkservice/common/metrics/server.go +++ b/pkg/networkservice/common/metrics/server.go @@ -1,5 +1,6 @@ // Copyright (c) 2021-2022 Doc.ai and/or its affiliates. // Copyright (c) 2023 Nordix Foundation. +// Copyright (c) 2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -22,7 +23,6 @@ import ( "context" "fmt" "strconv" - "sync" "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/api/pkg/api/networkservice" @@ -31,18 +31,20 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/networkservicemesh/sdk/pkg/networkservice/core/next" + "github.com/networkservicemesh/sdk/pkg/tools/opentelemetry" ) type metricServer struct { - meter metric.Meter - previousMetrics sync.Map + meter metric.Meter } // NewServer returns a new metric server chain element func NewServer() networkservice.NetworkServiceServer { - return &metricServer{ - meter: otel.Meter(""), + var res = &metricServer{} + if opentelemetry.IsEnabled() { + res.meter = otel.Meter("") } + return res } func (t *metricServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) { @@ -51,7 +53,9 @@ func (t *metricServer) Request(ctx context.Context, request *networkservice.Netw return nil, err } - t.writeMetrics(ctx, conn.GetPath()) + if opentelemetry.IsEnabled() { + t.writeMetrics(ctx, conn.GetPath()) + } return conn, nil } @@ -61,7 +65,9 @@ func (t *metricServer) Close(ctx context.Context, conn *networkservice.Connectio return nil, err } - t.writeMetrics(ctx, conn.GetPath()) + if opentelemetry.IsEnabled() { + t.writeMetrics(ctx, conn.GetPath()) + } return &empty.Empty{}, nil } @@ -72,7 +78,10 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa continue } - metrics, _ := loadOrStore(ctx, make(map[string]metric.Int64Counter)) + k := metricsData{ + counter: make(map[string]metric.Int64Counter), + } + metrics, _ := loadOrStore(ctx, &k) for metricName, metricValue := range pathSegment.Metrics { /* Works with integers only */ recVal, err := strconv.ParseInt(metricValue, 10, 64) @@ -81,7 +90,7 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa } counterName := fmt.Sprintf("%s_%s", pathSegment.Name, metricName) - _, ok := metrics[metricName] + _, ok := metrics.counter[metricName] if !ok { var counter metric.Int64Counter @@ -89,7 +98,7 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa if err != nil { continue } - metrics[metricName] = counter + metrics.counter[metricName] = counter } previousValueKey := fmt.Sprintf( @@ -98,17 +107,17 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa path.GetPathSegments()[0].Id, ) var previousValueInt int64 - previousValue, ok := t.previousMetrics.Load(previousValueKey) + previousValue, ok := metrics.previous.Load(previousValueKey) if ok { previousValueInt, _ = strconv.ParseInt(previousValue.(string), 10, 64) } - metrics[metricName].Add( + metrics.counter[metricName].Add( ctx, recVal-previousValueInt, metric.WithAttributes(attribute.String("connection", path.GetPathSegments()[0].Id)), ) - t.previousMetrics.Store(previousValueKey, metricValue) + metrics.previous.Store(previousValueKey, metricValue) } } } diff --git a/pkg/networkservice/common/metrics/server_test.go b/pkg/networkservice/common/metrics/server_test.go index 9bffb40ef..0d33ca14b 100644 --- a/pkg/networkservice/common/metrics/server_test.go +++ b/pkg/networkservice/common/metrics/server_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2022 Cisco and/or its affiliates. +// Copyright (c) 2022-2024 Cisco and/or its affiliates. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,6 +18,8 @@ package metrics_test import ( "context" + "os" + "sync" "math/rand" "strconv" @@ -30,7 +32,6 @@ import ( "github.com/networkservicemesh/api/pkg/api/networkservice" - "github.com/networkservicemesh/sdk/pkg/networkservice/common/begin" "github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics" "github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath" "github.com/networkservicemesh/sdk/pkg/networkservice/core/chain" @@ -38,18 +39,31 @@ import ( "github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata" ) +const ( + connectionCount = 1000 + telemetryEnv = "TELEMETRY" +) + func TestMetrics_Concurrency(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) + err := os.Setenv(telemetryEnv, "true") + if err != nil { + return + } + server := chain.NewNetworkServiceServer( - begin.NewServer(), metadata.NewServer(), updatepath.NewServer("testServer"), &metricsGeneratorServer{}, metrics.NewServer(), ) - for i := 0; i < 100; i++ { + + wg := new(sync.WaitGroup) + wg.Add(connectionCount) + for i := 0; i < connectionCount; i++ { go func(i int) { + defer wg.Done() req := &networkservice.NetworkServiceRequest{ Connection: &networkservice.Connection{Id: "nsc-" + strconv.Itoa(i)}, } @@ -57,6 +71,7 @@ func TestMetrics_Concurrency(t *testing.T) { require.NoError(t, err) }(i) } + wg.Wait() } type metricsGeneratorServer struct{} diff --git a/pkg/tools/opentelemetry/README.md b/pkg/tools/opentelemetry/README.md index 5311acad9..ac545eec7 100644 --- a/pkg/tools/opentelemetry/README.md +++ b/pkg/tools/opentelemetry/README.md @@ -26,7 +26,7 @@ The example exposes the following backends: After running docker-compose you can enable spans and metrics inside any test using the following code: ```Go log.EnableTracing(true) -os.Setenv("TELEMETRY", "opentelemetry") +os.Setenv("TELEMETRY", "true") spanExporter := opentelemetry.InitSpanExporter(ctx, "0.0.0.0:4317") metricExporter := opentelemetry.InitMetricExporter(ctx, "0.0.0.0:4317") o := opentelemetry.Init(ctx, spanExporter, metricExporter, "NSM")