Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Snappy and Zstd for confighttp.go #4441

Merged
merged 28 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6748970
feat: add snappy and zstd into confighttp
Nov 16, 2021
f0833b6
fix: format the code
Nov 16, 2021
887ffae
fix: refactored. tests wip
Nov 17, 2021
e0694f7
feat: add tests
Nov 18, 2021
8949b66
fix: fmt and lint
Nov 18, 2021
b6afa64
fix: modified readme, removed decompression support, and removed comp…
Nov 19, 2021
de97fad
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Nov 19, 2021
0451e77
fix: add compression on otlphttpexporter/readme
Nov 19, 2021
224c7bd
chore: add changelog
Nov 19, 2021
3b2ee17
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Nov 23, 2021
54a4bd5
fix: applied changes from code review
Nov 23, 2021
0e6512d
fix: comments and changelog
Nov 24, 2021
266b9d2
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Nov 25, 2021
6c7a407
fix: applied changes based on the code review
Nov 25, 2021
ea57a71
fix: changed commend on changelog
Nov 26, 2021
5b01b57
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Nov 30, 2021
9520e31
fix: changed map to switch case
Nov 30, 2021
026d903
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 2, 2021
a8ad39d
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 2, 2021
7029a57
fix: move CompressionType to middleware to remove duplicated value
Dec 2, 2021
7624e86
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 5, 2021
4dbff50
fix: implement UnmarshalText for CompressionType
Dec 5, 2021
9ef7473
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 7, 2021
aeb1af9
fix: lint error
Dec 7, 2021
92b8e98
fix: remove recommandation for compression type from readme
Dec 8, 2021
ea0f1de
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 9, 2021
128258c
chore: changelog
Dec 9, 2021
dabd47d
Merge branch 'main' of github.com:open-o11y/opentelemetry-collector i…
Dec 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
- `configtls`: TLS 1.2 is the new default mininum version (#4503)
- `confighttp`: `ToServer` now accepts a `component.Host`, in line with gRPC's counterpart (#4514)

## 💡 Enhancements 💡

- `confighttp`: add client-side compression support. (#4441)
- Each exporter should remove `compression` field if they have and should use `confighttp.HTTPClientSettings`

## 🧰 Bug fixes 🧰

- Fix handling of corrupted records by persistent buffer (experimental) (#4475)
Expand Down
4 changes: 4 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ README](../configtls/README.md).
- [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- [`timeout`](https://golang.org/pkg/net/http/#Client)
- [`write_buffer_size`](https://golang.org/pkg/net/http/#Transport)
- `compression`: Compression type to use among `zstd`, `snappy`, `gzip`, `zlib`, and `deflate`.
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
- `zstd` and `snappy` are recommended.
- `none` will be treated as uncompressed, and any other inputs will cause an error.
hyunuk marked this conversation as resolved.
Show resolved Hide resolved
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
- [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
- [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
Expand All @@ -38,6 +41,7 @@ exporter:
headers:
test1: "value1"
"test 2": "value 2"
compression: zstd
```

## Server Configuration
Expand Down
9 changes: 9 additions & 0 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type HTTPClientSettings struct {
// Auth configuration for outgoing HTTP calls.
Auth *configauth.Authentication `mapstructure:"auth,omitempty"`

// The compression key for supported compression types within collector.
Compression middleware.CompressionType `mapstructure:"compression"`
hyunuk marked this conversation as resolved.
Show resolved Hide resolved

// MaxIdleConns is used to set a limit to the maximum idle HTTP connections the client can keep open.
// There's an already set value, and we want to override it only if an explicit value provided
MaxIdleConns *int `mapstructure:"max_idle_conns"`
Expand Down Expand Up @@ -133,6 +136,12 @@ func (hcs *HTTPClientSettings) ToClient(ext map[config.ComponentID]component.Ext
}
}

// Compress the body using specified compression methods if non-empty string is provided.
// Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed.
if hcs.Compression != middleware.CompressionEmpty && hcs.Compression != middleware.CompressionNone {
clientTransport = middleware.NewCompressRoundTripper(clientTransport, hcs.Compression)
}

if hcs.Auth != nil {
if ext == nil {
return nil, fmt.Errorf("extensions configuration not found")
Expand Down
57 changes: 49 additions & 8 deletions config/confighttp/confighttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
)

type customRoundTripper struct {
Expand Down Expand Up @@ -71,6 +72,43 @@ func TestAllHTTPClientSettings(t *testing.T) {
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "",
},
shouldError: false,
},
{
name: "all_valid_settings_with_none_compression",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
ReadBufferSize: 1024,
WriteBufferSize: 512,
MaxIdleConns: &maxIdleConns,
MaxIdleConnsPerHost: &maxIdleConnsPerHost,
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "none",
},
shouldError: false,
},
{
name: "all_valid_settings_with_gzip_compression",
settings: HTTPClientSettings{
Endpoint: "localhost:1234",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
ReadBufferSize: 1024,
WriteBufferSize: 512,
MaxIdleConns: &maxIdleConns,
MaxIdleConnsPerHost: &maxIdleConnsPerHost,
MaxConnsPerHost: &maxConnsPerHost,
IdleConnTimeout: &idleConnTimeout,
CustomRoundTripper: func(next http.RoundTripper) (http.RoundTripper, error) { return next, nil },
Compression: "gzip",
},
shouldError: false,
},
hyunuk marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -97,14 +135,17 @@ func TestAllHTTPClientSettings(t *testing.T) {
return
}
assert.NoError(t, err)
transport := client.Transport.(*http.Transport)
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
assert.EqualValues(t, 50, transport.MaxIdleConns)
assert.EqualValues(t, 40, transport.MaxIdleConnsPerHost)
assert.EqualValues(t, 45, transport.MaxConnsPerHost)
assert.EqualValues(t, 30*time.Second, transport.IdleConnTimeout)

switch transport := client.Transport.(type) {
case *http.Transport:
assert.EqualValues(t, 1024, transport.ReadBufferSize)
assert.EqualValues(t, 512, transport.WriteBufferSize)
assert.EqualValues(t, 50, transport.MaxIdleConns)
assert.EqualValues(t, 40, transport.MaxIdleConnsPerHost)
assert.EqualValues(t, 45, transport.MaxConnsPerHost)
assert.EqualValues(t, 30*time.Second, transport.IdleConnTimeout)
case *middleware.CompressRoundTripper:
assert.EqualValues(t, "gzip", transport.CompressionType())
}
})
}
}
Expand Down
4 changes: 0 additions & 4 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ The following settings can be optionally configured:
- `ca_file` path to the CA cert. For a client this verifies the server certificate. Should only be used if `insecure` is set to false.
- `cert_file` path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is set to false.
- `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false.

- `compression` (default = none): Compression type to use (only gzip is supported today)

- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.


Example:

```yaml
Expand Down
4 changes: 0 additions & 4 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type Config struct {

// The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used.
LogsEndpoint string `mapstructure:"logs_endpoint"`

// The compression key for supported compression types within
// collector. Currently the only supported mode is `gzip`.
Compression string `mapstructure:"compression"`
}

var _ config.Exporter = (*Config)(nil)
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestLoadConfig(t *testing.T) {
ReadBufferSize: 123,
WriteBufferSize: 345,
Timeout: time.Second * 10,
Compression: "gzip",
},
Compression: "gzip",
})
}
11 changes: 0 additions & 11 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/url"
"runtime"
"strconv"
"strings"
"time"

"go.uber.org/zap"
Expand All @@ -34,10 +33,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/middleware"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/model/pdata"
)
Expand Down Expand Up @@ -89,14 +86,6 @@ func (e *exporter) start(_ context.Context, host component.Host) error {
if err != nil {
return err
}

if e.config.Compression != "" {
if strings.ToLower(e.config.Compression) == configgrpc.CompressionGzip {
client.Transport = middleware.NewCompressRoundTripper(client.Transport)
} else {
return fmt.Errorf("unsupported compression type %q", e.config.Compression)
}
}
e.client = client
return nil
}
Expand Down
57 changes: 0 additions & 57 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,63 +134,6 @@ func TestTraceRoundTrip(t *testing.T) {
}
}

func TestCompressionOptions(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
baseURL string
compression string
err bool
}{
{
name: "no compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "",
},
{
name: "gzip",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip",
},
{
name: "incorrect compression",
baseURL: fmt.Sprintf("http://%s", addr),
compression: "gzip2",
err: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
startTracesReceiver(t, addr, sink)

factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, _ := factory.CreateTracesExporter(context.Background(), componenttest.NewNopExporterCreateSettings(), cfg)
err := exp.Start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
if test.err {
require.Error(t, err)
return
}

td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
require.Eventually(t, func() bool {
return sink.SpanCount() > 0
}, 1*time.Second, 10*time.Millisecond)
allTraces := sink.AllTraces()
require.Len(t, allTraces, 1)
assert.EqualValues(t, td, allTraces[0])
})
}
}

func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0
github.com/cenkalti/backoff/v4 v4.1.2
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/klauspost/compress v1.13.6
github.com/knadh/koanf v1.3.3
github.com/magiconair/properties v1.8.5
github.com/mitchellh/mapstructure v1.4.3
Expand Down Expand Up @@ -50,9 +52,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
Expand Down
Loading