Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hedgin Bets and Requests #750

Merged
merged 21 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [FEATURE] Added the ability to hedge requests with all backends [#750](https://github.com/grafana/tempo/pull/750) (@joe-elliott)

## v1.0.0

Expand Down
21 changes: 21 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ storage:
# Set to true to enable authentication and certificate checks on gcs requests
[insecure: <bool>]

# Optional. Default is 0 (disabled)
# Example: "hedge_requests_at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of GCS requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge_requests_at: <duration>]

# S3 configuration. Will be used only if value of backend is "s3"
# Check the S3 doc within this folder for information on s3 specific permissions.
s3:
Expand Down Expand Up @@ -283,6 +290,13 @@ storage:
# enable to use path-style requests.
[forcepathstyle: <bool>]

# Optional. Default is 0 (disabled)
# Example: "hedge_requests_at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of S3 requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge_requests_at: <duration>]

# azure configuration. Will be used only if value of backend is "azure"
# EXPERIMENTAL
azure:
Expand All @@ -305,6 +319,13 @@ storage:
# access key when using access key credentials.
[storage-account-key: <string>]

# Optional. Default is 0 (disabled)
# Example: "hedge-requests-at: 500ms"
# If set to a non-zero value a second request will be issued at the provided duration. Recommended to
# be set to p99 of Axure Blog Storage requests to reduce long tail latency. This setting is most impactful when
# used with queriers and has minimal to no impact on other pieces.
[hedge-requests-at: <duration>]

# How often to repoll the backend for new blocks. Default is 5m
[blocklist_poll: <duration>]

Expand Down
3 changes: 3 additions & 0 deletions docs/tempo/website/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ storage:
chunk_buffer_size: 10485760
endpoint: ""
insecure: false
hedge_requests_at: 0
s3:
bucket: ""
endpoint: ""
Expand All @@ -317,13 +318,15 @@ storage:
part_size: 0
signature_v2: false
forcepathstyle: false
hedge_requests_at: 0
azure:
storage-account-name: ""
storage-account-key: ""
container-name: ""
endpoint-suffix: blob.core.windows.net
max-buffers: 4
buffer-size: 3145728
hedge-requests-at: 0
cache: ""
background_cache:
writeback_goroutines: 10
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go 1.16
require (
cloud.google.com/go/storage v1.15.0
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/alecthomas/kong v0.2.11
github.com/cespare/xxhash v1.1.0
github.com/cortexproject/cortex v1.8.1-0.20210422151339-cf1c444e0905
github.com/cristalhq/hedgedhttp v0.4.0
github.com/dustin/go-humanize v1.0.0
github.com/go-kit/kit v0.10.0
github.com/gogo/protobuf v1.3.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.4.0 h1:J1z1zKJ1bEFpMLjZWgtX0inUWlWecNyouWRIQknGzgM=
github.com/cristalhq/hedgedhttp v0.4.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ=
github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8=
github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg=
Expand Down
21 changes: 14 additions & 7 deletions tempodb/backend/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ const (
)

type readerWriter struct {
cfg *Config
containerURL blob.ContainerURL
cfg *Config
containerURL blob.ContainerURL
hedgedContainerURL blob.ContainerURL
}

type appendTracker struct {
Expand All @@ -39,14 +40,20 @@ type appendTracker struct {
func New(cfg *Config) (backend.Reader, backend.Writer, backend.Compactor, error) {
ctx := context.Background()

container, err := GetContainer(ctx, cfg)
container, err := GetContainer(ctx, cfg, false)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "getting storage container")
}

hedgedContainer, err := GetContainer(ctx, cfg, true)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "getting hedged storage container")
}

rw := &readerWriter{
cfg: cfg,
containerURL: container,
cfg: cfg,
containerURL: container,
hedgedContainerURL: hedgedContainer,
}

return rw, rw, rw, nil
Expand Down Expand Up @@ -280,7 +287,7 @@ func (rw *readerWriter) writer(ctx context.Context, src io.Reader, name string)
}

func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64, destBuffer []byte) error {
blobURL := rw.containerURL.NewBlockBlobURL(name)
blobURL := rw.hedgedContainerURL.NewBlockBlobURL(name)

var props *blob.BlobGetPropertiesResponse
props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
Expand Down Expand Up @@ -319,7 +326,7 @@ func (rw *readerWriter) readRange(ctx context.Context, name string, offset int64
}

func (rw *readerWriter) readAll(ctx context.Context, name string) ([]byte, error) {
blobURL := rw.containerURL.NewBlockBlobURL(name)
blobURL := rw.hedgedContainerURL.NewBlockBlobURL(name)

var props *blob.BlobGetPropertiesResponse
props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
Expand Down
36 changes: 28 additions & 8 deletions tempodb/backend/azure/azure_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ import (
"strings"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
blob "github.com/Azure/azure-storage-blob-go/azblob"
"github.com/cristalhq/hedgedhttp"
)

const maxRetries = 3
const (
maxRetries = 1
uptoHedgedRequests = 2
)

func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
func GetContainerURL(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(conf.StorageAccountName.String(), conf.StorageAccountKey.String())
if err != nil {
return blob.ContainerURL{}, err
Expand All @@ -26,9 +31,24 @@ func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, erro
retryOptions.TryTimeout = time.Until(deadline)
}

var httpSender pipeline.Factory
if hedge && conf.HedgeRequestsAt != 0 {
httpSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
client := hedgedhttp.NewClient(conf.HedgeRequestsAt, uptoHedgedRequests, nil)

// Send the request over the network
resp, err := client.Do(request.WithContext(ctx))

return pipeline.NewHTTPResponse(resp), err
}
})
}

p := blob.NewPipeline(c, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Tempo"},
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Tempo"},
HTTPSender: httpSender,
})

u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
Expand All @@ -48,8 +68,8 @@ func GetContainerURL(ctx context.Context, conf *Config) (blob.ContainerURL, erro
return service.NewContainerURL(conf.ContainerName), nil
}

func GetContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf)
func GetContainer(ctx context.Context, conf *Config, hedge bool) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf, hedge)
if err != nil {
return blob.ContainerURL{}, err
}
Expand All @@ -59,15 +79,15 @@ func GetContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error)
}

func GetBlobURL(ctx context.Context, conf *Config, blobName string) (blob.BlockBlobURL, error) {
c, err := GetContainerURL(ctx, conf)
c, err := GetContainerURL(ctx, conf, false)
if err != nil {
return blob.BlockBlobURL{}, err
}
return c.NewBlockBlobURL(blobName), nil
}

func CreateContainer(ctx context.Context, conf *Config) (blob.ContainerURL, error) {
c, err := GetContainerURL(ctx, conf)
c, err := GetContainerURL(ctx, conf, false)
if err != nil {
return blob.ContainerURL{}, err
}
Expand Down
110 changes: 110 additions & 0 deletions tempodb/backend/azure/azure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package azure

import (
"context"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHedge(t *testing.T) {
tests := []struct {
name string
returnIn time.Duration
hedgeAt time.Duration
expectedHedgedRequests int32
}{
{
name: "hedge disabled",
expectedHedgedRequests: 1,
},
{
name: "hedge enabled doesn't hit",
hedgeAt: time.Hour,
expectedHedgedRequests: 1,
},
{
name: "hedge enabled and hits",
hedgeAt: time.Millisecond,
returnIn: 100 * time.Millisecond,
expectedHedgedRequests: 2,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
count := int32(0)
server := fakeServer(t, tc.returnIn, &count)

r, w, _, err := New(&Config{
MaxBuffers: 3,
BufferSize: 1000,
ContainerName: "blerg",
Endpoint: server.URL[7:], // [7:] -> strip http://,
HedgeRequestsAt: tc.hedgeAt,
})
require.NoError(t, err)

ctx := context.Background()

// the first call on each client initiates an extra http request
// clearing that here
_, _ = r.Read(ctx, "object", uuid.New(), "tenant")
time.Sleep(tc.returnIn)
atomic.StoreInt32(&count, 0)

// calls that should hedge
_, _ = r.Read(ctx, "object", uuid.New(), "tenant")
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count)) // *2 b/c reads execute a HEAD and GET
atomic.StoreInt32(&count, 0)

// this panics with the garbage test setup. todo: make it not panic
// _ = r.ReadRange(ctx, "object", uuid.New(), "tenant", 10, make([]byte, 100))
// time.Sleep(tc.returnIn)
// assert.Equal(t, tc.expectedHedgedRequests, atomic.LoadInt32(&count))
// atomic.StoreInt32(&count, 0)

_, _ = r.BlockMeta(ctx, uuid.New(), "tenant") // *2 b/c reads execute a HEAD and GET
time.Sleep(tc.returnIn)
assert.Equal(t, tc.expectedHedgedRequests*2, atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

// calls that should not hedge
_, _ = r.Tenants(ctx)
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_, _ = r.Blocks(ctx, "tenant")
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.Write(ctx, "object", uuid.New(), "tenant", make([]byte, 10))
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)

_ = w.WriteBlockMeta(ctx, &backend.BlockMeta{})
assert.Equal(t, int32(1), atomic.LoadInt32(&count))
atomic.StoreInt32(&count, 0)
})
}
}

func fakeServer(t *testing.T, returnIn time.Duration, counter *int32) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(returnIn)

atomic.AddInt32(counter, 1)
_, _ = w.Write([]byte(`{}`))
}))
t.Cleanup(server.Close)

return server
}
7 changes: 6 additions & 1 deletion tempodb/backend/azure/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package azure

import "github.com/cortexproject/cortex/pkg/util/flagext"
import (
"time"

"github.com/cortexproject/cortex/pkg/util/flagext"
)

type Config struct {
StorageAccountName flagext.Secret `yaml:"storage-account-name"`
Expand All @@ -9,4 +13,5 @@ type Config struct {
Endpoint string `yaml:"endpoint-suffix"`
MaxBuffers int `yaml:"max-buffers"`
BufferSize int `yaml:"buffer-size"`
HedgeRequestsAt time.Duration `yaml:"hedge-requests-at"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not specific to this PR: I noticed this config uses kebab-case instead of snake_case like the other configs. Is this intentional? Is this just some debt we can't get rid of anymore now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i noticed this for the first time adding this config option. i think everything is snake except for this azure config. we should consider a breaking change PR where we move azure to the same standard as everything else.

}
11 changes: 7 additions & 4 deletions tempodb/backend/gcs/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package gcs

import "time"

type Config struct {
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
BucketName string `yaml:"bucket_name"`
ChunkBufferSize int `yaml:"chunk_buffer_size"`
Endpoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
}
Loading