Skip to content

Commit

Permalink
Fix memory leak in metrics chain element (#1626)
Browse files Browse the repository at this point in the history
* Fix memory leak in metrics chain element

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Fix lint issues

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Added check is opentelemetry enabled and fixed copyrights

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Fix metrics memory leak by storing temp connection data in metadata

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Added copyright

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Address review comments

Signed-off-by: Vladislav Byrgazov <[email protected]>

* Fixed import

Signed-off-by: Vladislav Byrgazov <[email protected]>

---------

Signed-off-by: Vladislav Byrgazov <[email protected]>
Co-authored-by: Vladislav Byrgazov <[email protected]>
  • Loading branch information
Ex4amp1e and Vladislav Byrgazov authored May 18, 2024
1 parent 3b79590 commit 7b51d9c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 22 deletions.
14 changes: 10 additions & 4 deletions pkg/networkservice/common/metrics/metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2023 Nordix Foundation.
//
// Copyright (c) 2022-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,16 +20,21 @@ package metrics

import (
"context"
"sync"

"go.opentelemetry.io/otel/metric"

"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

type keyType struct{}
type metricsMap = map[string]metric.Int64Counter

func loadOrStore(ctx context.Context, metrics metricsMap) (value metricsMap, ok bool) {
type metricsData struct {
counter map[string]metric.Int64Counter
previous sync.Map
}

func loadOrStore(ctx context.Context, metrics *metricsData) (value *metricsData, ok bool) {
rawValue, ok := metadata.Map(ctx, false).LoadOrStore(keyType{}, metrics)
return rawValue.(metricsMap), ok
return rawValue.(*metricsData), ok
}
35 changes: 22 additions & 13 deletions pkg/networkservice/common/metrics/server.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2021-2022 Doc.ai and/or its affiliates.
// Copyright (c) 2023 Nordix Foundation.
// Copyright (c) 2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -22,7 +23,6 @@ import (
"context"
"fmt"
"strconv"
"sync"

"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
Expand All @@ -31,18 +31,20 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/tools/opentelemetry"
)

type metricServer struct {
meter metric.Meter
previousMetrics sync.Map
meter metric.Meter
}

// NewServer returns a new metric server chain element
func NewServer() networkservice.NetworkServiceServer {
return &metricServer{
meter: otel.Meter(""),
var res = &metricServer{}
if opentelemetry.IsEnabled() {
res.meter = otel.Meter("")
}
return res
}

func (t *metricServer) Request(ctx context.Context, request *networkservice.NetworkServiceRequest) (*networkservice.Connection, error) {
Expand All @@ -51,7 +53,9 @@ func (t *metricServer) Request(ctx context.Context, request *networkservice.Netw
return nil, err
}

t.writeMetrics(ctx, conn.GetPath())
if opentelemetry.IsEnabled() {
t.writeMetrics(ctx, conn.GetPath())
}
return conn, nil
}

Expand All @@ -61,7 +65,9 @@ func (t *metricServer) Close(ctx context.Context, conn *networkservice.Connectio
return nil, err
}

t.writeMetrics(ctx, conn.GetPath())
if opentelemetry.IsEnabled() {
t.writeMetrics(ctx, conn.GetPath())
}
return &empty.Empty{}, nil
}

Expand All @@ -72,7 +78,10 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa
continue
}

metrics, _ := loadOrStore(ctx, make(map[string]metric.Int64Counter))
k := metricsData{
counter: make(map[string]metric.Int64Counter),
}
metrics, _ := loadOrStore(ctx, &k)
for metricName, metricValue := range pathSegment.Metrics {
/* Works with integers only */
recVal, err := strconv.ParseInt(metricValue, 10, 64)
Expand All @@ -81,15 +90,15 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa
}

counterName := fmt.Sprintf("%s_%s", pathSegment.Name, metricName)
_, ok := metrics[metricName]
_, ok := metrics.counter[metricName]
if !ok {
var counter metric.Int64Counter

counter, err = t.meter.Int64Counter(counterName)
if err != nil {
continue
}
metrics[metricName] = counter
metrics.counter[metricName] = counter
}

previousValueKey := fmt.Sprintf(
Expand All @@ -98,17 +107,17 @@ func (t *metricServer) writeMetrics(ctx context.Context, path *networkservice.Pa
path.GetPathSegments()[0].Id,
)
var previousValueInt int64
previousValue, ok := t.previousMetrics.Load(previousValueKey)
previousValue, ok := metrics.previous.Load(previousValueKey)
if ok {
previousValueInt, _ = strconv.ParseInt(previousValue.(string), 10, 64)
}

metrics[metricName].Add(
metrics.counter[metricName].Add(
ctx,
recVal-previousValueInt,
metric.WithAttributes(attribute.String("connection", path.GetPathSegments()[0].Id)),
)
t.previousMetrics.Store(previousValueKey, metricValue)
metrics.previous.Store(previousValueKey, metricValue)
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/networkservice/common/metrics/server_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco and/or its affiliates.
// Copyright (c) 2022-2024 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,8 @@ package metrics_test

import (
"context"
"os"
"sync"

"math/rand"
"strconv"
Expand All @@ -30,33 +32,46 @@ import (

"github.com/networkservicemesh/api/pkg/api/networkservice"

"github.com/networkservicemesh/sdk/pkg/networkservice/common/begin"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/metrics"
"github.com/networkservicemesh/sdk/pkg/networkservice/common/updatepath"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/chain"
"github.com/networkservicemesh/sdk/pkg/networkservice/core/next"
"github.com/networkservicemesh/sdk/pkg/networkservice/utils/metadata"
)

const (
connectionCount = 1000
telemetryEnv = "TELEMETRY"
)

func TestMetrics_Concurrency(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

err := os.Setenv(telemetryEnv, "true")
if err != nil {
return
}

server := chain.NewNetworkServiceServer(
begin.NewServer(),
metadata.NewServer(),
updatepath.NewServer("testServer"),
&metricsGeneratorServer{},
metrics.NewServer(),
)
for i := 0; i < 100; i++ {

wg := new(sync.WaitGroup)
wg.Add(connectionCount)
for i := 0; i < connectionCount; i++ {
go func(i int) {
defer wg.Done()
req := &networkservice.NetworkServiceRequest{
Connection: &networkservice.Connection{Id: "nsc-" + strconv.Itoa(i)},
}
_, err := server.Request(context.Background(), req)
require.NoError(t, err)
}(i)
}
wg.Wait()
}

type metricsGeneratorServer struct{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tools/opentelemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The example exposes the following backends:
After running docker-compose you can enable spans and metrics inside any test using the following code:
```Go
log.EnableTracing(true)
os.Setenv("TELEMETRY", "opentelemetry")
os.Setenv("TELEMETRY", "true")
spanExporter := opentelemetry.InitSpanExporter(ctx, "0.0.0.0:4317")
metricExporter := opentelemetry.InitMetricExporter(ctx, "0.0.0.0:4317")
o := opentelemetry.Init(ctx, spanExporter, metricExporter, "NSM")
Expand Down

0 comments on commit 7b51d9c

Please sign in to comment.