Skip to content

Commit

Permalink
fix servicegraph bug(orgin open-telemetry#32019)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frapschen committed Mar 29, 2024
1 parent de91cb1 commit c65b860
Show file tree
Hide file tree
Showing 5 changed files with 419 additions and 31 deletions.
10 changes: 6 additions & 4 deletions connector/servicegraphconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -395,15 +396,16 @@ func (p *serviceGraphConnector) onExpire(e *store.Edge) {
}

func (p *serviceGraphConnector) aggregateMetricsForEdge(e *store.Edge) {
metricKey := p.buildMetricKey(e.ClientService, e.ServerService, string(e.ConnectionType), e.Dimensions)
metricKey := p.buildMetricKey(e.ClientService, e.ServerService, string(e.ConnectionType), strconv.FormatBool(e.Failed), e.Dimensions)
dimensions := buildDimensions(e)

p.seriesMutex.Lock()
defer p.seriesMutex.Unlock()
p.updateSeries(metricKey, dimensions)
p.updateCountMetrics(metricKey)
if e.Failed {
p.updateErrorMetrics(metricKey)
} else {
p.updateCountMetrics(metricKey)
}
p.updateDurationMetrics(metricKey, e.ServerLatencySec, e.ClientLatencySec)
}
Expand Down Expand Up @@ -602,9 +604,9 @@ func (p *serviceGraphConnector) collectServerLatencyMetrics(ilm pmetric.ScopeMet
return nil
}

func (p *serviceGraphConnector) buildMetricKey(clientName, serverName, connectionType string, edgeDimensions map[string]string) string {
func (p *serviceGraphConnector) buildMetricKey(clientName, serverName, connectionType string, failed string, edgeDimensions map[string]string) string {
var metricKey strings.Builder
metricKey.WriteString(clientName + metricKeySeparator + serverName + metricKeySeparator + connectionType)
metricKey.WriteString(clientName + metricKeySeparator + serverName + metricKeySeparator + connectionType + metricKeySeparator + failed)

for _, dimName := range p.config.Dimensions {
dim, ok := edgeDimensions[dimName]
Expand Down
93 changes: 66 additions & 27 deletions connector/servicegraphconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func TestConnectorStart(t *testing.T) {
Expand Down Expand Up @@ -65,32 +68,68 @@ func TestConnectorShutdown(t *testing.T) {
}

func TestConnectorConsume(t *testing.T) {
// Prepare
cfg := &Config{
Dimensions: []string{"some-attribute", "non-existing-attribute"},
Store: StoreConfig{MaxItems: 10},
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
conn := newConnector(set, cfg)
conn.metricsConsumer = newMockMetricsExporter()

assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))

// Test & verify
td := buildSampleTrace(t, "val")
// The assertion is part of verifyHappyCaseMetrics func.
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

// Force collection
conn.store.Expire()
md, err := conn.buildMetrics()
assert.NoError(t, err)
verifyHappyCaseMetrics(t, md)

// Shutdown the connector
assert.NoError(t, conn.Shutdown(context.Background()))
t.Run("test common case", func(t *testing.T) {
// Prepare
cfg := &Config{
Dimensions: []string{"some-attribute", "non-existing-attribute"},
Store: StoreConfig{MaxItems: 10},
}

set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
conn := newConnector(set, cfg)
conn.metricsConsumer = newMockMetricsExporter()

assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))

// Test & verify
td := buildSampleTrace(t, "val")
// The assertion is part of verifyHappyCaseMetrics func.
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

// Force collection
conn.store.Expire()
md, err := conn.buildMetrics()
assert.NoError(t, err)
verifyHappyCaseMetrics(t, md)

// Shutdown the connector
assert.NoError(t, conn.Shutdown(context.Background()))
})
t.Run("test fix failed label not work", func(t *testing.T) {
cfg := &Config{
Store: StoreConfig{MaxItems: 10},
}
set := componenttest.NewNopTelemetrySettings()
set.Logger = zaptest.NewLogger(t)
conn := newConnector(set, cfg)
conn.metricsConsumer = newMockMetricsExporter()

assert.NoError(t, conn.Start(context.Background(), componenttest.NewNopHost()))
defer require.NoError(t, conn.Shutdown(context.Background()))

// this trace simulate two services' trace: foo, bar
// foo called bar three times, two success, one failed
td, err := golden.ReadTraces("testdata/failed-label-not-work-simple-trace.yaml")
assert.NoError(t, err)
assert.NoError(t, conn.ConsumeTraces(context.Background(), td))

// Force collection
conn.store.Expire()
actualMetrics, err := conn.buildMetrics()
assert.NoError(t, err)

// Verify
expectedMetrics, err := golden.ReadMetrics("testdata/failed-label-not-work-expect-metrics.yaml")
assert.NoError(t, err)

err = pmetrictest.CompareMetrics(expectedMetrics, actualMetrics,
pmetrictest.IgnoreMetricDataPointsOrder(),
pmetrictest.IgnoreResourceMetricsOrder(),
pmetrictest.IgnoreResourceMetricsOrder(), pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp())
require.NoError(t, err)
})
}

func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) {
Expand Down Expand Up @@ -267,7 +306,7 @@ func TestUpdateDurationMetrics(t *testing.T) {
Dimensions: []string{},
},
}
metricKey := p.buildMetricKey("foo", "bar", "", map[string]string{})
metricKey := p.buildMetricKey("foo", "bar", "", "false", map[string]string{})

testCases := []struct {
caseStr string
Expand Down
9 changes: 9 additions & 0 deletions connector/servicegraphconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/servi
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.97.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.97.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.97.1-0.20240327181407-1038b67c85a0
go.opentelemetry.io/collector/config/configtelemetry v0.97.1-0.20240327181407-1038b67c85a0
Expand Down Expand Up @@ -47,6 +49,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.97.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
Expand Down Expand Up @@ -102,3 +105,9 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
Loading

0 comments on commit c65b860

Please sign in to comment.