Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move compression.go into confighttp.go #4651

Merged
merged 13 commits into from
Jan 13, 2022
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
## 💡 Enhancements 💡

- Remove expand cases that cannot happen with config.Map (#4649)
- Move `compression.go` into `confighttp.go` to internalize functions in `compression.go` file. (#4651)
- create `configcompression` package to manage compression methods in `confighttp` and `configgrpc`

## v0.42.0 Beta

Expand Down
49 changes: 49 additions & 0 deletions config/configcompression/compressionType.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configcompression // import "go.opentelemetry.io/collector/config/configcompression"

import "fmt"

type CompressionType string

const (
Gzip CompressionType = "gzip"
Zlib CompressionType = "zlib"
Deflate CompressionType = "deflate"
Snappy CompressionType = "snappy"
Zstd CompressionType = "zstd"
None CompressionType = "none"
hyunuk marked this conversation as resolved.
Show resolved Hide resolved
Empty CompressionType = ""
)

func IsCompressed(compressionType CompressionType) bool {
return compressionType != Empty && compressionType != None
}

func (ct *CompressionType) UnmarshalText(in []byte) error {
switch typ := CompressionType(in); typ {
case Gzip,
Zlib,
Deflate,
Snappy,
Zstd,
None,
Empty:
*ct = typ
return nil
default:
return fmt.Errorf("unsupported compression type %q", typ)
}
}
83 changes: 83 additions & 0 deletions config/configcompression/compressionType_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configcompression

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestUnmarshalText(t *testing.T) {
tests := []struct {
name string
compressionName []byte
shouldError bool
}{
{
name: "ValidGzip",
compressionName: []byte("gzip"),
shouldError: false,
},
{
name: "ValidZlib",
compressionName: []byte("zlib"),
shouldError: false,
},
{
name: "ValidDeflate",
compressionName: []byte("deflate"),
shouldError: false,
},
{
name: "ValidSnappy",
compressionName: []byte("snappy"),
shouldError: false,
},
{
name: "ValidZstd",
compressionName: []byte("zstd"),
shouldError: false,
},
{
name: "ValidEmpty",
compressionName: []byte(""),
shouldError: false,
},
{
name: "ValidNone",
compressionName: []byte("none"),
shouldError: false,
},
{
name: "Invalid",
compressionName: []byte("ggip"),
shouldError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
temp := None
err := temp.UnmarshalText(tt.compressionName)
if tt.shouldError {
assert.Error(t, err)
return
}
require.NoError(t, err)
assert.Equal(t, temp, CompressionType(tt.compressionName))
})
}
}
13 changes: 7 additions & 6 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
Expand Down Expand Up @@ -66,7 +67,7 @@ type GRPCClientSettings struct {
Endpoint string `mapstructure:"endpoint"`

// The compression key for supported compression types within collector.
Compression middleware.CompressionType `mapstructure:"compression"`
Compression configcompression.CompressionType `mapstructure:"compression"`

// TLSSetting struct exposes TLS client configuration.
TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"`
Expand Down Expand Up @@ -177,7 +178,7 @@ func (gcs *GRPCClientSettings) isSchemeHTTPS() bool {
// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC.
func (gcs *GRPCClientSettings) ToDialOptions(host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression != middleware.CompressionEmpty && gcs.Compression != middleware.CompressionNone {
if configcompression.IsCompressed(gcs.Compression) {
cp, err := getGRPCCompressionName(gcs.Compression)
if err != nil {
return nil, err
Expand Down Expand Up @@ -357,13 +358,13 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
}

// getGRPCCompressionName returns compression name registered in grpc.
func getGRPCCompressionName(compressionType middleware.CompressionType) (string, error) {
func getGRPCCompressionName(compressionType configcompression.CompressionType) (string, error) {
switch compressionType {
case middleware.CompressionGzip:
case configcompression.Gzip:
return gzip.Name, nil
case middleware.CompressionSnappy:
case configcompression.Snappy:
return snappy.Name, nil
case middleware.CompressionZstd:
case configcompression.Zstd:
return zstd.Name, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
Expand Down
8 changes: 4 additions & 4 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
"go.opentelemetry.io/collector/model/otlpgrpc"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionGzip,
Compression: configcompression.Gzip,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionSnappy,
Compression: configcompression.Snappy,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
"test": "test",
},
Endpoint: "localhost:1234",
Compression: middleware.CompressionZstd,
Compression: configcompression.Zstd,
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
// Copyright The OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package middleware // import "go.opentelemetry.io/collector/internal/middleware"
// This file contains helper functions regarding compression/decompression for confighttp.

package confighttp

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"io"
"net/http"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

type CompressionType string

const (
headerContentEncoding = "Content-Encoding"
CompressionGzip CompressionType = "gzip"
CompressionZlib CompressionType = "zlib"
CompressionDeflate CompressionType = "deflate"
CompressionSnappy CompressionType = "snappy"
CompressionZstd CompressionType = "zstd"
CompressionNone CompressionType = "none"
CompressionEmpty CompressionType = ""
"go.opentelemetry.io/collector/config/configcompression"
)

type CompressRoundTripper struct {
type compressRoundTripper struct {
RoundTripper http.RoundTripper
compressionType CompressionType
compressionType configcompression.CompressionType
writer func(*bytes.Buffer) (io.WriteCloser, error)
}

func NewCompressRoundTripper(rt http.RoundTripper, compressionType CompressionType) *CompressRoundTripper {
return &CompressRoundTripper{
func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.CompressionType) *compressRoundTripper {
return &compressRoundTripper{
RoundTripper: rt,
compressionType: compressionType,
writer: writerFactory(compressionType),
Expand All @@ -55,29 +45,29 @@ func NewCompressRoundTripper(rt http.RoundTripper, compressionType CompressionTy

// writerFactory defines writer field in CompressRoundTripper.
// The validity of input is already checked when NewCompressRoundTripper was called in confighttp,
func writerFactory(compressionType CompressionType) func(*bytes.Buffer) (io.WriteCloser, error) {
func writerFactory(compressionType configcompression.CompressionType) func(*bytes.Buffer) (io.WriteCloser, error) {
switch compressionType {
case CompressionGzip:
case configcompression.Gzip:
return func(buf *bytes.Buffer) (io.WriteCloser, error) {
return gzip.NewWriter(buf), nil
}
case CompressionSnappy:
case configcompression.Snappy:
return func(buf *bytes.Buffer) (io.WriteCloser, error) {
return snappy.NewBufferedWriter(buf), nil
}
case CompressionZstd:
case configcompression.Zstd:
return func(buf *bytes.Buffer) (io.WriteCloser, error) {
return zstd.NewWriter(buf)
}
case CompressionZlib, CompressionDeflate:
case configcompression.Zlib, configcompression.Deflate:
return func(buf *bytes.Buffer) (io.WriteCloser, error) {
return zlib.NewWriter(buf), nil
}
}
return nil
}

func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
func (r *compressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Header.Get(headerContentEncoding) != "" {
// If the header already specifies a content encoding then skip compression
// since we don't want to compress it again. This is a safeguard that normally
Expand Down Expand Up @@ -121,29 +111,25 @@ func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
return r.RoundTripper.RoundTrip(cReq)
}

func (r *CompressRoundTripper) CompressionType() CompressionType {
return r.compressionType
}

type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
type errorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)

type decompressor struct {
errorHandler ErrorHandler
errorHandler
}

type DecompressorOption func(d *decompressor)
type decompressorOption func(d *decompressor)

func WithErrorHandler(e ErrorHandler) DecompressorOption {
func withErrorHandlerForDecompressor(e errorHandler) decompressorOption {
return func(d *decompressor) {
d.errorHandler = e
}
}

// HTTPContentDecompressor is a middleware that offloads the task of handling compressed
// HTTP requests by identifying the compression format in the "Content-Encoding" header and re-writing
// httpContentDecompressor offloads the task of handling compressed HTTP requests
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func HTTPContentDecompressor(h http.Handler, opts ...DecompressorOption) http.Handler {
func httpContentDecompressor(h http.Handler, opts ...decompressorOption) http.Handler {
d := &decompressor{}
for _, o := range opts {
o(d)
Expand Down Expand Up @@ -197,19 +183,3 @@ func newBodyReader(r *http.Request) (io.ReadCloser, error) {
func defaultErrorHandler(w http.ResponseWriter, _ *http.Request, errMsg string, statusCode int) {
http.Error(w, errMsg, statusCode)
}

func (ct *CompressionType) UnmarshalText(in []byte) error {
switch typ := CompressionType(in); typ {
case CompressionGzip,
CompressionZlib,
CompressionDeflate,
CompressionSnappy,
CompressionZstd,
CompressionNone,
CompressionEmpty:
*ct = typ
return nil
default:
return fmt.Errorf("unsupported compression type %q", typ)
}
}
Loading