Skip to content

Commit

Permalink
Merge branch 'main' into dedup-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nberkley committed Jan 18, 2022
2 parents 81dd0a0 + ba9b36d commit 1455910
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 191 deletions.
18 changes: 9 additions & 9 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ We also have some nice souls that help triaging issues and PRs. See [here](https

Full list of triage persons is displayed below:

| Name | Slack | GitHub | Company |
|---------------|------------------|------------------------------------------------|----------|
| Adrien Fillon | `@Adrien F` | [@adrien-f](https://github.com/adrien-f) | |
| Ian Billett | `@billett` | [@ianbillett](https://github.com/ianbillett) | Red Hat |
| Martin Chodur | `@FUSAKLA` | [@fusakla](https://github.com/fusakla) | |
| Michael Dai | `@jojohappy` | [@jojohappy](https://github.com/jojohappy) | |
| Xiang Dai | `@daixiang0` | [@daixiang0](https://github.com/daixiang0) | |
| Wiard van Rij | `@wiard van Rij` | [@wiardvanrij](https://github.com/wiardvanrij) | Fullstaq |
| Jimmie Han | `@hanjm` | [@hanjm](https://github.com/hanjm) | Tencent |
| Name | Slack | GitHub | Company |
|---------------|------------------|------------------------------------------------|---------|
| Adrien Fillon | `@Adrien F` | [@adrien-f](https://github.com/adrien-f) | |
| Ian Billett | `@billett` | [@ianbillett](https://github.com/ianbillett) | Red Hat |
| Martin Chodur | `@FUSAKLA` | [@fusakla](https://github.com/fusakla) | |
| Michael Dai | `@jojohappy` | [@jojohappy](https://github.com/jojohappy) | |
| Xiang Dai | `@daixiang0` | [@daixiang0](https://github.com/daixiang0) | |
| Wiard van Rij | `@wiard van Rij` | [@wiardvanrij](https://github.com/wiardvanrij) | Roku |
| Jimmie Han | `@hanjm` | [@hanjm](https://github.com/hanjm) | Tencent |

Please reach any of the maintainer on slack or email if you want to help as well.

Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func runStore(
httpserver.WithListen(conf.httpConfig.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.httpConfig.gracePeriod)),
httpserver.WithTLSConfig(conf.httpConfig.tlsConfig),
httpserver.WithEnableH2C(true), // For groupcache.
)

g.Add(func() error {
Expand Down
3 changes: 2 additions & 1 deletion docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,11 @@ Check more [here](../sharding.md).
## Index cache

Thanos Store Gateway supports an index cache to speed up postings and series lookups from TSDB blocks indexes. Two types of caches are supported:
Thanos Store Gateway supports an index cache to speed up postings and series lookups from TSDB blocks indexes. Three types of caches are supported:

- `in-memory` (*default*)
- `memcached`
- `redis`

### In-memory index cache

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
go.uber.org/automaxprocs v1.4.0
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/net v0.0.0-20211020060615-d418f374d309
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.7
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1987,8 +1987,9 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211020060615-d418f374d309 h1:A0lJIi+hcTR6aajJH4YqKWwohY4aW9RO7oRMcdv+HKI=
golang.org/x/net v0.0.0-20211020060615-d418f374d309/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs=
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
12 changes: 11 additions & 1 deletion pkg/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package cache

import (
"context"
"crypto/tls"
"encoding/json"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"strconv"
"time"

Expand All @@ -23,6 +26,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/cache/cachekey"
"github.com/vimeo/galaxycache"
galaxyhttp "github.com/vimeo/galaxycache/http"
"golang.org/x/net/http2"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -92,6 +96,12 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
cfg *CachingBucketConfig) (*Groupcache, error) {
httpProto := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
BasePath: basepath,
Transport: &http2.Transport{
AllowHTTP: true,
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
})
universe := galaxycache.NewUniverse(httpProto, conf.SelfURL)

Expand Down Expand Up @@ -121,7 +131,7 @@ func NewGroupcacheWithConfig(logger log.Logger, reg prometheus.Registerer, conf
galaxyhttp.RegisterHTTPHandler(universe, &galaxyhttp.HTTPOptions{
BasePath: basepath,
}, mux)
r.Get(basepath, mux.ServeHTTP)
r.Get(filepath.Join(basepath, conf.GroupcacheGroup, ":key"), mux.ServeHTTP)

galaxy := universe.NewGalaxy(conf.GroupcacheGroup, int64(conf.MaxSize), galaxycache.GetterFunc(
func(ctx context.Context, id string, dest galaxycache.Codec) error {
Expand Down
235 changes: 235 additions & 0 deletions pkg/cache/groupcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/prober"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store/cache/cachekey"
"github.com/thanos-io/thanos/pkg/testutil"
galaxyhttp "github.com/vimeo/galaxycache/http"
"golang.org/x/net/http2"
)

const basePath = `/_groupcache/`
const groupName = `groupName`
const selfURLH1 = "http://localhost:12345"
const selfURLH2C = "http://localhost:12346"

func TestMain(m *testing.M) {
reg := prometheus.NewRegistry()
router := route.New()

bkt := objstore.NewInMemBucket()
defer bkt.Close()

payload := strings.Repeat("foobar", 16*1024/6)

if err := bkt.Upload(context.Background(), "test", strings.NewReader(payload)); err != nil {
fmt.Printf("failed to upload: %s\n", err.Error())
os.Exit(1)
}

httpServer := httpserver.New(log.NewNopLogger(), nil, component.Bucket, &prober.HTTPProbe{},
httpserver.WithListen("0.0.0.0:12345"))
httpServer.Handle("/", router)
httpServerH2C := httpserver.New(log.NewNopLogger(), nil, component.Bucket, &prober.HTTPProbe{},
httpserver.WithListen("0.0.0.0:12346"), httpserver.WithEnableH2C(true))
httpServerH2C.Handle("/", router)

cachingBucketConfig := NewCachingBucketConfig()
cachingBucketConfig.CacheGet("test", nil, func(s string) bool {
return true
}, 16*1024, 0*time.Second, 0*time.Second, 0*time.Second)

groupCache, err := NewGroupcacheWithConfig(
log.NewJSONLogger(os.Stderr),
reg,
GroupcacheConfig{
Peers: []string{selfURLH1},
SelfURL: selfURLH1,
GroupcacheGroup: groupName,
DNSSDResolver: dns.MiekgdnsResolverType,
DNSInterval: 30 * time.Second,
MaxSize: model.Bytes(16 * 1024),
},
basePath,
router,
bkt,
cachingBucketConfig,
)
if err != nil {
fmt.Printf("failed creating group cache: %s\n", err.Error())
os.Exit(1)
}

go func() {
if err = httpServer.ListenAndServe(); err != nil {
fmt.Printf("failed to listen: %s\n", err.Error())
}
}()
go func() {
if err = httpServerH2C.ListenAndServe(); err != nil {
fmt.Printf("failed to listen: %s\n", err.Error())
}
}()

defer httpServer.Shutdown(nil)
defer httpServerH2C.Shutdown(nil)

cachingBucketConfig.SetCacheImplementation(groupCache)

exitVal := m.Run()

os.Exit(exitVal)
}

// Benchmark retrieval of one key from one groupcache node.
func BenchmarkGroupcacheRetrieval(b *testing.B) {
b.Run("h2c", func(b *testing.B) {
fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
Transport: &http2.Transport{
AllowHTTP: true,
DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
return net.Dial(network, addr)
},
},
BasePath: basePath,
})

f, err := fetcher.NewFetcher(selfURLH2C)
testutil.Ok(b, err)

b.Run("seq", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
})
b.Run("parallel=500", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

ch := make(chan struct{})

for i := 0; i < 500; i++ {
go func() {
for range ch {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
}()
}

for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
close(ch)
})
})
b.Run("h1, max one TCP connection", func(b *testing.B) {
fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
BasePath: basePath,
Transport: &http.Transport{
MaxConnsPerHost: 1,
MaxIdleConnsPerHost: 1,
},
})

f, err := fetcher.NewFetcher(selfURLH1)
testutil.Ok(b, err)

b.Run("seq", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
})

b.Run("parallel=500", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

ch := make(chan struct{})

for i := 0; i < 500; i++ {
go func() {
for range ch {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
}()
}

for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
close(ch)
})
})
b.Run("h1, unlimited TCP connections", func(b *testing.B) {
fetcher := galaxyhttp.NewHTTPFetchProtocol(&galaxyhttp.HTTPOptions{
BasePath: basePath,
Transport: &http.Transport{},
})

f, err := fetcher.NewFetcher(selfURLH1)
testutil.Ok(b, err)

b.Run("seq", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
})

b.Run("parallel=500", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()

ch := make(chan struct{})

for i := 0; i < 500; i++ {
go func() {
for range ch {
_, _, err = f.Fetch(context.Background(), groupName, cachekey.BucketCacheKey{Verb: "content", Name: "test"}.String())
testutil.Ok(b, err)
}
}()
}

for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
close(ch)
})
})

}
13 changes: 4 additions & 9 deletions pkg/objstore/s3/s3_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"

"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
"github.com/go-kit/log"

"github.com/thanos-io/thanos/pkg/objstore/s3"
Expand All @@ -29,16 +28,12 @@ func BenchmarkUpload(b *testing.B) {
b.Cleanup(e2ethanos.CleanScenario(b, e))

const bucket = "benchmark"
m := e2ethanos.NewMinio(e, "benchmark", bucket)
m, err := e2ethanos.NewMinio(e, "benchmark", bucket)
testutil.Ok(b, err)
testutil.Ok(b, e2e.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), s3.Config{
Bucket: bucket,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
Endpoint: m.Endpoint("http"),
Insecure: true,
}, "test-feed")
bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(),
e2ethanos.NewS3Config(bucket, m.Endpoint("https"), e.SharedDir()), "test-feed")
testutil.Ok(b, err)

buf := bytes.Buffer{}
Expand Down
Loading

0 comments on commit 1455910

Please sign in to comment.