Skip to content

Commit

Permalink
sumologicexporter: refactor sender (#32592)
Browse files Browse the repository at this point in the history
**Description:**

Refactor sender in order to prepare for adding OTLP formats

Reduces size of #32315

**Link to tracking Issue:** #31479

**Testing:** 

Minimal changes to unit tests to ensure that everything works like
before

**Documentation:**

N/A

---------

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored Apr 23, 2024
1 parent bbbec13 commit ce71dc3
Show file tree
Hide file tree
Showing 14 changed files with 519 additions and 198 deletions.
6 changes: 3 additions & 3 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -679,14 +679,14 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/api v0.173.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -700,15 +700,15 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/api v0.173.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type sumologicexporter struct {

foundSumologicExtension bool
sumologicExtension *sumologicextension.SumologicExtension

id component.ID
}

func initExporter(cfg *Config, settings component.TelemetrySettings) (*sumologicexporter, error) {
Expand Down Expand Up @@ -295,6 +297,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
}
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.filter,
Expand All @@ -305,6 +308,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
logsURL,
tracesURL,
se.graphiteFormatter,
se.id,
)

// Iterate over ResourceLogs
Expand Down Expand Up @@ -392,6 +396,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
}
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.filter,
Expand All @@ -402,6 +407,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
logsURL,
tracesURL,
se.graphiteFormatter,
se.id,
)

// Iterate over ResourceMetrics
Expand Down
14 changes: 7 additions & 7 deletions exporter/sumologicexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAllFailed(t *testing.T) {
logs := logRecordsToLogs(exampleTwoLogs())

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Logs
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestPartiallyFailed(t *testing.T) {
expected := logRecordsToLogs(records[:1])

err = test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Logs
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestPushFailedBatch(t *testing.T) {
}

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")
}

func TestAllMetricsSuccess(t *testing.T) {
Expand Down Expand Up @@ -272,7 +272,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
})

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -309,7 +309,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
expected := metricPairToMetrics(records[:1])

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -375,7 +375,7 @@ gauge_metric_name{foo="bar",key2="value2",remote_name="156955",url="http://anoth
expected := metricPairToMetrics(records[:1])

err = test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestPushMetricsFailedBatch(t *testing.T) {
}

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")
}

func TestGetSignalUrl(t *testing.T) {
Expand Down
63 changes: 49 additions & 14 deletions exporter/sumologicexporter/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,81 @@
package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import (
"fmt"
"sort"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/slices"
)

// fields represents metadata
type fields struct {
orig pcommon.Map
replacer *strings.Replacer
orig pcommon.Map
initialized bool
}

func newFields(attrMap pcommon.Map) fields {
return fields{
orig: attrMap,
replacer: strings.NewReplacer(",", "_", "=", ":", "\n", "_"),
orig: attrMap,
initialized: true,
}
}

// string returns fields as ordered key=value string with `, ` as separator
func (f fields) string() string {
if !f.initialized {
return ""
}

returnValue := make([]string, 0, f.orig.Len())

f.orig.Range(func(k string, v pcommon.Value) bool {
// Don't add source related attributes to fields as they are handled separately
// and are added to the payload either as special HTTP headers or as resources
// attributes.
if k == attributeKeySourceCategory || k == attributeKeySourceHost || k == attributeKeySourceName {
return true
}

sv := v.AsString()

// Skip empty field
if len(sv) == 0 {
return true
}

key := []byte(k)
f.sanitizeField(key)
value := []byte(sv)
f.sanitizeField(value)
sb := strings.Builder{}
sb.Grow(len(key) + len(value) + 1)
sb.Write(key)
sb.WriteRune('=')
sb.Write(value)

returnValue = append(
returnValue,
fmt.Sprintf(
"%s=%s",
f.sanitizeField(k),
f.sanitizeField(v.AsString()),
),
sb.String(),
)
return true
})
sort.Strings(returnValue)
slices.Sort(returnValue)

return strings.Join(returnValue, ", ")
}

// sanitizeFields sanitize field (key or value) to be correctly parsed by sumologic receiver
func (f fields) sanitizeField(fld string) string {
return f.replacer.Replace(fld)
// It modifies the field in place.
func (f fields) sanitizeField(fld []byte) {
for i := 0; i < len(fld); i++ {
switch fld[i] {
case ',':
fld[i] = '_'
case '=':
fld[i] = ':'
case '\n':
fld[i] = '_'
default:
}
}
}
Loading

0 comments on commit ce71dc3

Please sign in to comment.