diff --git a/MAINTAINERS.md b/MAINTAINERS.md index e118c3c00c..f8dec0d164 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -27,15 +27,15 @@ We also have some nice souls that help triaging issues and PRs. See [here](https Full list of triage persons is displayed below: -| Name | Slack | GitHub | Company | -|---------------|------------------|------------------------------------------------|----------| -| Adrien Fillon | `@Adrien F` | [@adrien-f](https://github.com/adrien-f) | | -| Ian Billett | `@billett` | [@ianbillett](https://github.com/ianbillett) | Red Hat | -| Martin Chodur | `@FUSAKLA` | [@fusakla](https://github.com/fusakla) | | -| Michael Dai | `@jojohappy` | [@jojohappy](https://github.com/jojohappy) | | -| Xiang Dai | `@daixiang0` | [@daixiang0](https://github.com/daixiang0) | | -| Wiard van Rij | `@wiard van Rij` | [@wiardvanrij](https://github.com/wiardvanrij) | Fullstaq | -| Jimmie Han | `@hanjm` | [@hanjm](https://github.com/hanjm) | Tencent | +| Name | Slack | GitHub | Company | +|---------------|------------------|------------------------------------------------|---------| +| Adrien Fillon | `@Adrien F` | [@adrien-f](https://github.com/adrien-f) | | +| Ian Billett | `@billett` | [@ianbillett](https://github.com/ianbillett) | Red Hat | +| Martin Chodur | `@FUSAKLA` | [@fusakla](https://github.com/fusakla) | | +| Michael Dai | `@jojohappy` | [@jojohappy](https://github.com/jojohappy) | | +| Xiang Dai | `@daixiang0` | [@daixiang0](https://github.com/daixiang0) | | +| Wiard van Rij | `@wiard van Rij` | [@wiardvanrij](https://github.com/wiardvanrij) | Roku | +| Jimmie Han | `@hanjm` | [@hanjm](https://github.com/hanjm) | Tencent | Please reach any of the maintainer on slack or email if you want to help as well. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 35ce92703e..b7d69307dc 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -229,6 +229,7 @@ func runStore( httpserver.WithListen(conf.httpConfig.bindAddress), httpserver.WithGracePeriod(time.Duration(conf.httpConfig.gracePeriod)), httpserver.WithTLSConfig(conf.httpConfig.tlsConfig), + httpserver.WithEnableH2C(true), // For groupcache. ) g.Add(func() error { diff --git a/docs/components/store.md b/docs/components/store.md index 79fee0960a..ff4937b002 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -246,10 +246,11 @@ Check more [here](../sharding.md). ## Index cache -Thanos Store Gateway supports an index cache to speed up postings and series lookups from TSDB blocks indexes. Two types of caches are supported: +Thanos Store Gateway supports an index cache to speed up postings and series lookups from TSDB blocks indexes. Three types of caches are supported: - `in-memory` (*default*) - `memcached` +- `redis` ### In-memory index cache diff --git a/go.mod b/go.mod index 17f70129da..797cc1d321 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( go.uber.org/automaxprocs v1.4.0 go.uber.org/goleak v1.1.12 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 - golang.org/x/net v0.0.0-20211020060615-d418f374d309 + golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/text v0.3.7 diff --git a/go.sum b/go.sum index dc23735812..abca77e891 100644 --- a/go.sum +++ b/go.sum @@ -1987,8 +1987,9 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211020060615-d418f374d309 h1:A0lJIi+hcTR6aajJH4YqKWwohY4aW9RO7oRMcdv+HKI= golang.org/x/net v0.0.0-20211020060615-d418f374d309/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs= +golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/pkg/cache/groupcache.go b/pkg/cache/groupcache.go index 0bbf596755..3456ee5df6 100644 --- a/pkg/cache/groupcache.go +++ b/pkg/cache/groupcache.go @@ -5,9 +5,12 @@ package cache import ( "context" + "crypto/tls" "encoding/json" "io/ioutil" + "net" "net/http" + "path/filepath" "strconv" "time" @@ -23,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/cache/cachekey" "github.com/vimeo/galaxycache" galaxyhttp "github.com/vimeo/galaxycache/http" + "golang.org/x/net/http2" "gopkg.in/yaml.v2" ) @@ -92,6 +96,12 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf cfg *CachingBucketConfig) (*Groupcache, error) { httpProto := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{ BasePath: basepath, + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + }, }) universe := galaxycache.NewUniverse(httpProto, conf.SelfURL) @@ -121,7 +131,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf galaxyhttp.RegisterHTTPHandler(universe, &galaxyhttp.HTTPOptions{ BasePath: basepath, }, mux) - r.Get(basepath, mux.ServeHTTP) + r.Get(filepath.Join(basepath, conf.GroupcacheGroup, ":key"), mux.ServeHTTP) galaxy := universe.NewGalaxy(conf.GroupcacheGroup, int64(conf.MaxSize), galaxycache.GetterFunc( func(ctx context.Context, id string, dest galaxycache.Codec) error { diff --git a/pkg/cache/groupcache_test.go b/pkg/cache/groupcache_test.go new file mode 100644 index 0000000000..292513bbaa --- /dev/null +++ b/pkg/cache/groupcache_test.go @@ -0,0 +1,235 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package cache + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/route" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/prober" + httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/store/cache/cachekey" + "github.com/thanos-io/thanos/pkg/testutil" + galaxyhttp "github.com/vimeo/galaxycache/http" + "golang.org/x/net/http2" +) + +const basePath = `/_groupcache/` +const groupName = `groupName` +const selfURLH1 = "http://localhost:12345" +const selfURLH2C = "http://localhost:12346" + +func TestMain(m *testing.M) { + reg := prometheus.NewRegistry() + router := route.New() + + bkt := objstore.NewInMemBucket() + defer bkt.Close() + + payload := strings.Repeat("foobar", 16*1024/6) + + if err := bkt.Upload(context.Background(), "test", strings.NewReader(payload)); err != nil { + fmt.Printf("failed to upload: %s\n", err.Error()) + os.Exit(1) + } + + httpServer := httpserver.New(log.NewNopLogger(), nil, component.Bucket, &prober.HTTPProbe{}, + httpserver.WithListen("0.0.0.0:12345")) + httpServer.Handle("/", router) + httpServerH2C := httpserver.New(log.NewNopLogger(), nil, component.Bucket, &prober.HTTPProbe{}, + httpserver.WithListen("0.0.0.0:12346"), httpserver.WithEnableH2C(true)) + httpServerH2C.Handle("/", router) + + cachingBucketConfig := NewCachingBucketConfig() + cachingBucketConfig.CacheGet("test", nil, func(s string) bool { + return true + }, 16*1024, 0*time.Second, 0*time.Second, 0*time.Second) + + groupCache, err := NewGroupcacheWithConfig( + log.NewJSONLogger(os.Stderr), + reg, + GroupcacheConfig{ + Peers: []string{selfURLH1}, + SelfURL: selfURLH1, + GroupcacheGroup: groupName, + DNSSDResolver: dns.MiekgdnsResolverType, + DNSInterval: 30 * time.Second, + MaxSize: model.Bytes(16 * 1024), + }, + basePath, + router, + bkt, + cachingBucketConfig, + ) + if err != nil { + fmt.Printf("failed creating group cache: %s\n", err.Error()) + os.Exit(1) + } + + go func() { + if err = httpServer.ListenAndServe(); err != nil { + fmt.Printf("failed to listen: %s\n", err.Error()) + } + }() + go func() { + if err = httpServerH2C.ListenAndServe(); err != nil { + fmt.Printf("failed to listen: %s\n", err.Error()) + } + }() + + defer httpServer.Shutdown(nil) + defer httpServerH2C.Shutdown(nil) + + cachingBucketConfig.SetCacheImplementation(groupCache) + + exitVal := m.Run() + + os.Exit(exitVal) +} + +// Benchmark retrieval of one key from one groupcache node. +func BenchmarkGroupcacheRetrieval(b *testing.B) { + b.Run("h2c", func(b *testing.B) { + fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) { + return net.Dial(network, addr) + }, + }, + BasePath: basePath, + }) + + f, err := fetcher.NewFetcher(selfURLH2C) + testutil.Ok(b, err) + + b.Run("seq", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }) + b.Run("parallel=500", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + ch := make(chan struct{}) + + for i := 0; i < 500; i++ { + go func() { + for range ch { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- struct{}{} + } + close(ch) + }) + }) + b.Run("h1, max one TCP connection", func(b *testing.B) { + fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{ + BasePath: basePath, + Transport: &http.Transport{ + MaxConnsPerHost: 1, + MaxIdleConnsPerHost: 1, + }, + }) + + f, err := fetcher.NewFetcher(selfURLH1) + testutil.Ok(b, err) + + b.Run("seq", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }) + + b.Run("parallel=500", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + ch := make(chan struct{}) + + for i := 0; i < 500; i++ { + go func() { + for range ch { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- struct{}{} + } + close(ch) + }) + }) + b.Run("h1, unlimited TCP connections", func(b *testing.B) { + fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{ + BasePath: basePath, + Transport: &http.Transport{}, + }) + + f, err := fetcher.NewFetcher(selfURLH1) + testutil.Ok(b, err) + + b.Run("seq", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }) + + b.Run("parallel=500", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + ch := make(chan struct{}) + + for i := 0; i < 500; i++ { + go func() { + for range ch { + _, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String()) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- struct{}{} + } + close(ch) + }) + }) + +} diff --git a/pkg/objstore/s3/s3_e2e_test.go b/pkg/objstore/s3/s3_e2e_test.go index f31f276ddc..97ef4a86c4 100644 --- a/pkg/objstore/s3/s3_e2e_test.go +++ b/pkg/objstore/s3/s3_e2e_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" "github.com/go-kit/log" "github.com/thanos-io/thanos/pkg/objstore/s3" @@ -29,16 +28,12 @@ func BenchmarkUpload(b *testing.B) { b.Cleanup(e2ethanos.CleanScenario(b, e)) const bucket = "benchmark" - m := e2ethanos.NewMinio(e, "benchmark", bucket) + m, err := e2ethanos.NewMinio(e, "benchmark", bucket) + testutil.Ok(b, err) testutil.Ok(b, e2e.StartAndWaitReady(m)) - bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, "test-feed") + bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed") testutil.Ok(b, err) buf := bytes.Buffer{} diff --git a/pkg/server/http/http.go b/pkg/server/http/http.go index f829047d9f..256304fe11 100644 --- a/pkg/server/http/http.go +++ b/pkg/server/http/http.go @@ -15,6 +15,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" toolkit_web "github.com/prometheus/exporter-toolkit/web" + "golang.org/x/net/http2" + "golang.org/x/net/http2/h2c" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/prober" @@ -48,12 +50,20 @@ func New(logger log.Logger, reg *prometheus.Registry, comp component.Component, registerProbes(mux, prober, logger) registerProfiler(mux) + var h http.Handler + if options.enableH2C { + h2s := &http2.Server{} + h = h2c.NewHandler(mux, h2s) + } else { + h = mux + } + return &Server{ logger: log.With(logger, "service", "http/server", "component", comp.String()), comp: comp, prober: prober, mux: mux, - srv: &http.Server{Addr: options.listen, Handler: mux}, + srv: &http.Server{Addr: options.listen, Handler: h}, opts: options, } } diff --git a/pkg/server/http/option.go b/pkg/server/http/option.go index 693a9ac79b..350caf1635 100644 --- a/pkg/server/http/option.go +++ b/pkg/server/http/option.go @@ -13,6 +13,7 @@ type options struct { listen string tlsConfigPath string mux *http.ServeMux + enableH2C bool } // Option overrides behavior of Server. @@ -48,6 +49,12 @@ func WithTLSConfig(tls string) Option { }) } +func WithEnableH2C(enableH2C bool) Option { + return optionFunc(func(o *options) { + o.enableH2C = enableH2C + }) +} + // WithMux overrides the the server's default mux. func WithMux(mux *http.ServeMux) Option { return optionFunc(func(o *options) { diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index aecbe50e6a..96afd971cd 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -17,7 +17,6 @@ import ( "time" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" "github.com/efficientgo/e2e/matchers" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -345,16 +344,12 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) const bucket = "compact_test" - m := e2ethanos.NewMinio(e, "minio", bucket) + m, err := e2ethanos.NewMinio(e, "minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) - bkt, err := s3.NewBucketWithConfig(logger, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), // We need separate client config, when connecting to minio from outside. - Insecure: true, - }, "test-feed") + bkt, err := s3.NewBucketWithConfig(logger, + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed") testutil.Ok(t, err) ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) @@ -456,14 +451,8 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { } svcConfig := client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), } // Crank down the deletion mark delay since deduplication can miss blocks in the presence of replica labels it doesn't know about. diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index df72308feb..10101ae8b8 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -4,9 +4,15 @@ package e2ethanos import ( + "crypto/rand" + "crypto/rsa" + "crypto/x509" "encoding/json" + "encoding/pem" "fmt" "io/ioutil" + "math/big" + "net" "os" "path/filepath" "strconv" @@ -25,7 +31,9 @@ import ( "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/receive" ) @@ -814,29 +822,43 @@ http { // NewMinio returns minio server, used as a local replacement for S3. // TODO(@matej-g): This is a temporary workaround for https://github.com/efficientgo/e2e/issues/11; // after this is addresses fixed all calls should be replaced with e2edb.NewMinio. -func NewMinio(env e2e.Environment, name, bktName string) *e2e.InstrumentedRunnable { +func NewMinio(env e2e.Environment, name, bktName string) (*e2e.InstrumentedRunnable, error) { image := "minio/minio:RELEASE.2019-12-30T05-45-39Z" minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master" commands := []string{ "curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", - "mkdir -p /data/%s && minio server --address :%v --quiet /data", + "mkdir -p /data/%s && minio server --certs-dir /shared/data/certs --address :%v --quiet /data", + } + + if err := os.MkdirAll(filepath.Join(env.SharedDir(), "data", "certs", "CAs"), 0750); err != nil { + return nil, errors.Wrap(err, "create certs dir") + } + + if err := genCerts( + filepath.Join(env.SharedDir(), "data", "certs", "public.crt"), + filepath.Join(env.SharedDir(), "data", "certs", "private.key"), + filepath.Join(env.SharedDir(), "data", "certs", "CAs", "ca.crt"), + env.Name()+"-"+name); err != nil { + return nil, errors.Wrap(err, "fail to generate certs") } return e2e.NewInstrumentedRunnable( env, name, - map[string]int{"http": 8090}, - "http").Init( + map[string]int{"https": 8090}, + "https").Init( e2e.StartOptions{ Image: image, // Create the required bucket before starting minio. - Command: e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, 8090)), - Readiness: e2e.NewHTTPReadinessProbe("http", "/minio/health/ready", 200, 200), + Command: e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, 8090)), + //TODO(@clyang82): This is a temporary workaround for https://github.com/efficientgo/e2e/issues/9 + //Readiness: e2e.NewHTTPReadinessProbe("http", "/minio/health/ready", 200, 200), + Readiness: e2e.NewCmdReadinessProbe(e2e.NewCommand("sh", "-c", "sleep 1 && curl -k https://127.0.0.1:8090/minio/health/ready")), EnvVars: map[string]string{ "MINIO_ACCESS_KEY": e2edb.MinioAccessKey, "MINIO_SECRET_KEY": e2edb.MinioSecretKey, "MINIO_BROWSER": "off", - "ENABLE_HTTPS": "0", + "ENABLE_HTTPS": "1", // https://docs.min.io/docs/minio-kms-quickstart-guide.html "MINIO_KMS_KES_ENDPOINT": "https://play.min.io:7373", "MINIO_KMS_KES_KEY_FILE": "root.key", @@ -844,7 +866,7 @@ func NewMinio(env e2e.Environment, name, bktName string) *e2e.InstrumentedRunnab "MINIO_KMS_KES_KEY_NAME": "my-minio-key", }, }, - ) + ), nil } func NewMemcached(e e2e.Environment, name string) *e2e.InstrumentedRunnable { @@ -914,3 +936,90 @@ func NewToolsBucketWeb( return toolsBucketWeb, nil } + +// genCerts generates certificates and writes those to the provided paths. +func genCerts(certPath, privkeyPath, caPath, serverName string) error { + var caRoot = &x509.Certificate{ + SerialNumber: big.NewInt(2019), + NotAfter: time.Now().AddDate(10, 0, 0), + IsCA: true, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + } + + var cert = &x509.Certificate{ + SerialNumber: big.NewInt(1658), + DNSNames: []string{serverName}, + IPAddresses: []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}, + NotAfter: time.Now().AddDate(10, 0, 0), + SubjectKeyId: []byte{1, 2, 3}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + } + + caPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return err + } + + certPrivKey, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return err + } + // Generate CA cert. + caBytes, err := x509.CreateCertificate(rand.Reader, caRoot, caRoot, &caPrivKey.PublicKey, caPrivKey) + if err != nil { + return err + } + caPEM := pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: caBytes, + }) + err = ioutil.WriteFile(caPath, caPEM, 0644) + if err != nil { + return err + } + + // Sign the cert with the CA private key. + certBytes, err := x509.CreateCertificate(rand.Reader, cert, caRoot, &certPrivKey.PublicKey, caPrivKey) + if err != nil { + return err + } + certPEM := pem.EncodeToMemory(&pem.Block{ + Type: "CERTIFICATE", + Bytes: certBytes, + }) + err = ioutil.WriteFile(certPath, certPEM, 0644) + if err != nil { + return err + } + + certPrivKeyPEM := pem.EncodeToMemory(&pem.Block{ + Type: "RSA PRIVATE KEY", + Bytes: x509.MarshalPKCS1PrivateKey(certPrivKey), + }) + err = ioutil.WriteFile(privkeyPath, certPrivKeyPEM, 0644) + if err != nil { + return err + } + + return nil +} + +func NewS3Config(bucket, endpoint, basePath string) s3.Config { + return s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: endpoint, + Insecure: false, + HTTPConfig: s3.HTTPConfig{ + TLSConfig: objstore.TLSConfig{ + CAFile: filepath.Join(basePath, "data", "certs", "CAs", "ca.crt"), + CertFile: filepath.Join(basePath, "data", "certs", "public.crt"), + KeyFile: filepath.Join(basePath, "data", "certs", "private.key"), + }, + }, + } +} diff --git a/test/e2e/info_api_test.go b/test/e2e/info_api_test.go index 35ca2a51c9..2040549ea7 100644 --- a/test/e2e/info_api_test.go +++ b/test/e2e/info_api_test.go @@ -13,12 +13,10 @@ import ( "testing" "time" - e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/efficientgo/e2e" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/thanos/pkg/objstore/client" - "github.com/thanos-io/thanos/pkg/objstore/s3" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" @@ -41,20 +39,15 @@ func TestInfo(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3)) const bucket = "info-api-test" - m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + m, err := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) store, err := e2ethanos.NewStoreGW( e, "1", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, "", nil, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index ea8bcf136b..3577f2f7bd 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -26,7 +26,6 @@ import ( "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -655,7 +654,8 @@ func TestSidecarStorePushdown(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) const bucket = "store_gateway_test" - m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + m, err := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) dir := filepath.Join(e.SharedDir(), "tmp") @@ -672,13 +672,7 @@ func TestSidecarStorePushdown(t *testing.T) { testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, "test") + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test") testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) @@ -687,14 +681,8 @@ func TestSidecarStorePushdown(t *testing.T) { e, "1", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, "", nil, diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index fad943e569..13eb6d07ef 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -14,7 +14,6 @@ import ( "time" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" "github.com/efficientgo/e2e/matchers" "github.com/go-kit/log" "github.com/prometheus/common/model" @@ -44,7 +43,8 @@ func TestStoreGateway(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) const bucket = "store_gateway_test" - m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + m, err := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) memcached := e2ethanos.NewMemcached(e, "1") @@ -62,14 +62,8 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) e, "1", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, memcachedConfig, nil, @@ -110,13 +104,8 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), // We need separate client config, when connecting to minio from outside. - Insecure: true, - }, "test-feed") + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed") testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) @@ -295,7 +284,8 @@ func TestStoreGatewayMemcachedCache(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) const bucket = "store_gateway_memcached_cache_test" - m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + m, err := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) memcached := e2ethanos.NewMemcached(e, "1") @@ -310,14 +300,8 @@ blocks_iter_ttl: 0s`, memcached.InternalEndpoint("memcached")) e, "1", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, memcachedConfig, nil, @@ -343,13 +327,8 @@ blocks_iter_ttl: 0s`, memcached.InternalEndpoint("memcached")) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), // We need separate client config, when connecting to minio from outside. - Insecure: true, - }, "test-feed") + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed") testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) @@ -410,17 +389,19 @@ func TestStoreGatewayGroupCache(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) const bucket = "store_gateway_groupcache_test" - m := e2ethanos.NewMinio(e, "thanos-minio", bucket) + m, err := e2ethanos.NewMinio(e, "thanos-minio", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) groupcacheConfig := `type: GROUPCACHE config: - self_url: http://store-gw-%d:42/ + self_url: http://e2e_test_store_gateway_groupcache-store-gw-%d:8080 peers: - - http://store-gw-1:42/ - - http://store-gw-2:42/ - - http://store-gw-3:42/ + - http://e2e_test_store_gateway_groupcache-store-gw-1:8080 + - http://e2e_test_store_gateway_groupcache-store-gw-2:8080 + - http://e2e_test_store_gateway_groupcache-store-gw-3:8080 groupcache_group: groupcache_test_group + dns_interval: 1s blocks_iter_ttl: 0s metafile_exists_ttl: 0s metafile_doesnt_exist_ttl: 0s @@ -430,14 +411,8 @@ metafile_content_ttl: 0s` e, "1", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, fmt.Sprintf(groupcacheConfig, 1), nil, @@ -449,14 +424,8 @@ metafile_content_ttl: 0s` e, "2", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, fmt.Sprintf(groupcacheConfig, 2), nil, @@ -468,14 +437,8 @@ metafile_content_ttl: 0s` e, "3", client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), }, fmt.Sprintf(groupcacheConfig, 3), nil, @@ -506,13 +469,7 @@ metafile_content_ttl: 0s` testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), // We need separate client config, when connecting to minio from outside. - Insecure: true, - }, "test-feed") + bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed") testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) @@ -552,9 +509,9 @@ metafile_content_ttl: 0s` }) t.Run("query with cache hit", func(t *testing.T) { - retrievedMetrics, err := store1.SumMetrics([]string{`thanos_cache_groupcache_hits_total`, `thanos_cache_groupcache_loads_total`}) + retrievedMetrics, err := store1.SumMetrics([]string{`thanos_cache_groupcache_hits_total`, `thanos_cache_groupcache_loads_total`, `thanos_cache_groupcache_get_requests_total`}) testutil.Ok(t, err) - testutil.Assert(t, len(retrievedMetrics) == 2) + testutil.Assert(t, len(retrievedMetrics) == 3) queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, time.Now, promclient.QueryOptions{ @@ -572,5 +529,7 @@ metafile_content_ttl: 0s` testutil.Ok(t, store1.WaitSumMetricsWithOptions(e2e.Greater(retrievedMetrics[0]), []string{`thanos_cache_groupcache_hits_total`})) testutil.Ok(t, store1.WaitSumMetricsWithOptions(e2e.Equals(retrievedMetrics[1]), []string{`thanos_cache_groupcache_loads_total`})) + testutil.Ok(t, store1.WaitSumMetricsWithOptions(e2e.Greater(retrievedMetrics[2]), []string{`thanos_cache_groupcache_get_requests_total`})) + testutil.Ok(t, store2.WaitSumMetricsWithOptions(e2e.Greater(0), []string{`thanos_cache_groupcache_peer_loads_total`})) }) } diff --git a/test/e2e/tools_bucket_web_test.go b/test/e2e/tools_bucket_web_test.go index 2c8b23c7af..5ce4621484 100644 --- a/test/e2e/tools_bucket_web_test.go +++ b/test/e2e/tools_bucket_web_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/efficientgo/e2e" - e2edb "github.com/efficientgo/e2e/db" "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -40,18 +39,13 @@ func TestToolsBucketWebExternalPrefixWithoutReverseProxy(t *testing.T) { externalPrefix := "testThanos" const bucket = "compact_test" - m := e2ethanos.NewMinio(e, "thanos", bucket) + m, err := e2ethanos.NewMinio(e, "thanos", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) svcConfig := client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e2ethanos.ContainerSharedDir), } b, err := e2ethanos.NewToolsBucketWeb( @@ -79,18 +73,13 @@ func TestToolsBucketWebExternalPrefix(t *testing.T) { externalPrefix := "testThanos" const bucket = "toolsBucketWeb_test" - m := e2ethanos.NewMinio(e, "thanos", bucket) + m, err := e2ethanos.NewMinio(e, "thanos", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) svcConfig := client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e2ethanos.ContainerSharedDir), } b, err := e2ethanos.NewToolsBucketWeb( @@ -124,18 +113,13 @@ func TestToolsBucketWebExternalPrefixAndRoutePrefix(t *testing.T) { externalPrefix := "testThanos" routePrefix := "test" const bucket = "toolsBucketWeb_test" - m := e2ethanos.NewMinio(e, "thanos", bucket) + m, err := e2ethanos.NewMinio(e, "thanos", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) svcConfig := client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e2ethanos.ContainerSharedDir), } b, err := e2ethanos.NewToolsBucketWeb( @@ -167,17 +151,13 @@ func TestToolsBucketWebWithTimeAndRelabelFilter(t *testing.T) { t.Cleanup(e2ethanos.CleanScenario(t, e)) // Create Minio. const bucket = "toolsBucketWeb_test" - m := e2ethanos.NewMinio(e, "thanos", bucket) + m, err := e2ethanos.NewMinio(e, "thanos", bucket) + testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(m)) // Create bucket. logger := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(logger, s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.Endpoint("http"), - Insecure: true, - }, "tools") + bkt, err := s3.NewBucketWithConfig(logger, + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "tools") testutil.Ok(t, err) // Create share dir for upload. dir := filepath.Join(e.SharedDir(), "tmp") @@ -217,14 +197,8 @@ func TestToolsBucketWebWithTimeAndRelabelFilter(t *testing.T) { } // Start thanos tool bucket web. svcConfig := client.BucketConfig{ - Type: client.S3, - Config: s3.Config{ - Bucket: bucket, - AccessKey: e2edb.MinioAccessKey, - SecretKey: e2edb.MinioSecretKey, - Endpoint: m.InternalEndpoint("http"), - Insecure: true, - }, + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), e2ethanos.ContainerSharedDir), } b, err := e2ethanos.NewToolsBucketWeb( e,