Skip to content

Commit

Permalink
[confighttp] Allow compression list for a server to be overridden (#1…
Browse files Browse the repository at this point in the history
…0295)


---------

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling authored Jun 13, 2024
1 parent 6ca551e commit 725e869
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 51 deletions.
18 changes: 18 additions & 0 deletions .chloggen/jpkroehling_allow-compression-list-to-be-overridden.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow the compression list to be overridden

# One or more tracking issues or pull requests related to the change
issues: [10295]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Allows Collector administrators to control which compression algorithms to enable for HTTP-based receivers.
2 changes: 2 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ will not be enabled.
not set, browsers use a default of 5 seconds.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `0` (no restriction)
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)

Expand All @@ -98,6 +99,7 @@ receivers:
- Example-Header
max_age: 7200
endpoint: 0.0.0.0:55690
compression_algorithms: ["", "gzip"]
processors:
attributes:
actions:
Expand Down
106 changes: 58 additions & 48 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,53 @@ type compressRoundTripper struct {
compressor *compressor
}

var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err
}
return io.NopCloser(sb), nil
},
}

func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionType)
if err != nil {
Expand Down Expand Up @@ -77,64 +124,27 @@ type decompressor struct {
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), enableDecoders []string, decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
errHandler = eh
}

enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){}
for _, dec := range enableDecoders {
enabled[dec] = availableDecoders[dec]

if dec == "deflate" {
enabled["deflate"] = availableDecoders["zlib"]
}
}

d := &decompressor{
maxRequestBodySize: maxRequestBodySize,
errHandler: errHandler,
base: h,
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err
}
return io.NopCloser(sb), nil
},
},
decoders: enabled,
}
d.decoders["deflate"] = d.decoders["zlib"]

for key, dec := range decoders {
d.decoders[key] = dec
Expand Down
29 changes: 27 additions & 2 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestHTTPCustomDecompression(t *testing.T) {
return io.NopCloser(strings.NewReader("decompressed body")), nil
},
}
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, decoders))
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders))

t.Cleanup(srv.Close)

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(http.StatusOK)
}), defaultMaxRequestBodySize, defaultErrorHandler, noDecoders))
}), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
Expand Down Expand Up @@ -349,6 +349,31 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) {
require.Error(t, err)
}

func TestOverrideCompressionList(t *testing.T) {
// prepare
configuredDecoders := []string{"none", "zlib"}

srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "snappy")

client := http.Client{}

// test
res, err := client.Do(req)
require.NoError(t, err)

// verify
assert.Equal(t, http.StatusBadRequest, res.StatusCode, "test handler returned unexpected status code ")
_, err = io.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
}

func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
Expand Down
10 changes: 9 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const headerContentEncoding = "Content-Encoding"
const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate"}

// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
Expand Down Expand Up @@ -283,6 +284,9 @@ type ServerConfig struct {
// Additional headers attached to each HTTP response sent to the client.
// Header values are opaque since they may be sensitive.
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`

// CompressionAlgorithms configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
CompressionAlgorithms []string `mapstructure:"compression_algorithms"`
}

// ToListener creates a net.Listener.
Expand Down Expand Up @@ -348,7 +352,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
hss.MaxRequestBodySize = defaultMaxRequestBodySize
}

handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders)
if hss.CompressionAlgorithms == nil {
hss.CompressionAlgorithms = defaultCompressionAlgorithms
}

handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, hss.CompressionAlgorithms, serverOpts.decoders)

if hss.MaxRequestBodySize > 0 {
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)
Expand Down

0 comments on commit 725e869

Please sign in to comment.