Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver: Fix quorum handling for all hashing algorithms #5791

Closed
wants to merge 13 commits into from
Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
## Unreleased

### Fixed
- [#5791](https://github.com/thanos-io/thanos/pull/5791) Receive: Fix quorum validation for `ketama` hashing.

### Added

Expand Down
78 changes: 74 additions & 4 deletions examples/interactive/interactive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/providers/s3"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/testutil"
tracingclient "github.com/thanos-io/thanos/pkg/tracing/client"
"github.com/thanos-io/thanos/pkg/tracing/jaeger"
Expand Down Expand Up @@ -248,8 +249,6 @@ func TestReadOnlyThanosSetup(t *testing.T) {
sidecarHA1 := e2edb.NewThanosSidecar(e, "sidecar-prom-ha1", promHA1, e2edb.WithImage("thanos:latest"), e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}))
sidecar2 := e2edb.NewThanosSidecar(e, "sidecar2", prom2, e2edb.WithImage("thanos:latest"))

receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithIngestionEnabled().Init()

testutil.Ok(t, exec("cp", "-r", prom1Data+"/.", promHA0.Dir()))
testutil.Ok(t, exec("sh", "-c", "find "+prom1Data+"/ -maxdepth 1 -type d | tail -5 | xargs -I {} cp -r {} "+promHA1.Dir())) // Copy only 5 blocks from 9 to mimic replica 1 with partial data set.
testutil.Ok(t, exec("cp", "-r", prom2Data+"/.", prom2.Dir()))
Expand Down Expand Up @@ -280,7 +279,7 @@ func TestReadOnlyThanosSetup(t *testing.T) {
}))

testutil.Ok(t, e2e.StartAndWaitReady(m1))
testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2, receive1))
testutil.Ok(t, e2e.StartAndWaitReady(promHA0, promHA1, prom2, sidecarHA0, sidecarHA1, sidecar2, store1, store2))

// Let's start query on top of all those 6 store APIs (global query engine).
//
Expand Down Expand Up @@ -333,7 +332,6 @@ func TestReadOnlyThanosSetup(t *testing.T) {
sidecarHA0.InternalEndpoint("grpc"),
sidecarHA1.InternalEndpoint("grpc"),
sidecar2.InternalEndpoint("grpc"),
receive1.InternalEndpoint("grpc"),
},
e2edb.WithImage("thanos:latest"),
e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}),
Expand All @@ -353,3 +351,75 @@ func TestReadOnlyThanosSetup(t *testing.T) {
testutil.Ok(t, m.OpenUserInterfaceInBrowser())
testutil.Ok(t, e2einteractive.RunUntilEndpointHit())
}

// TestReadWriteThanosSetup sets up a Thanos deployment with 6 receiver replicas and replication factor 3 for writing.
// On top of that, the written metrics are queryable via querier.
func TestReadWriteThanosSetup(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some automatic avalanche/Prometheus to this setup to continuously write and then we could manually (or automatically within e2e test if we don't to this yet) play with killing one container.

t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!")

e, err := e2e.NewDockerEnvironment("interactive")
testutil.Ok(t, err)
t.Cleanup(e.Close)

// Setup Jaeger.
j := e.Runnable("tracing").WithPorts(map[string]int{"http-front": 16686, "jaeger.thrift": 14268}).Init(e2e.StartOptions{Image: "jaegertracing/all-in-one:1.25"})
testutil.Ok(t, e2e.StartAndWaitReady(j))

jaegerConfig, err := yaml.Marshal(tracingclient.TracingConfig{
Type: tracingclient.Jaeger,
Config: jaeger.Config{
ServiceName: "thanos",
SamplerType: "const",
SamplerParam: 1,
Endpoint: "http://" + j.InternalEndpoint("jaeger.thrift") + "/api/traces",
},
})
testutil.Ok(t, err)

receive1 := e2ethanos.NewReceiveBuilder(e, "receiver-1").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))
receive2 := e2ethanos.NewReceiveBuilder(e, "receiver-2").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))
receive3 := e2ethanos.NewReceiveBuilder(e, "receiver-3").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))
receive4 := e2ethanos.NewReceiveBuilder(e, "receiver-4").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))
receive5 := e2ethanos.NewReceiveBuilder(e, "receiver-5").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))
receive6 := e2ethanos.NewReceiveBuilder(e, "receiver-6").WithHashingAlgorithm(receive.AlgorithmKetama).WithIngestionEnabled().WithTracingConfig(string(jaegerConfig))

h := receive.HashringConfig{
Endpoints: []string{
receive1.InternalEndpoint("grpc"),
receive2.InternalEndpoint("grpc"),
receive3.InternalEndpoint("grpc"),
receive4.InternalEndpoint("grpc"),
receive5.InternalEndpoint("grpc"),
receive6.InternalEndpoint("grpc"),
},
}

testutil.Ok(t, e2e.StartAndWaitReady(
receive1.WithRouting(3, h).Init(), receive2.WithRouting(3, h).Init(), receive3.WithRouting(3, h).Init(),
receive4.WithRouting(3, h).Init(), receive5.WithRouting(3, h).Init(), receive6.WithRouting(3, h).Init(),
))

query1 := e2edb.NewThanosQuerier(
e,
"query1",
[]string{
receive1.InternalEndpoint("grpc"),
receive2.InternalEndpoint("grpc"),
receive3.InternalEndpoint("grpc"),
receive4.InternalEndpoint("grpc"),
receive5.InternalEndpoint("grpc"),
receive6.InternalEndpoint("grpc"),
},
e2edb.WithImage("thanos:latest"),
e2edb.WithFlagOverride(map[string]string{"--tracing.config": string(jaegerConfig)}),
)
testutil.Ok(t, e2e.StartAndWaitReady(query1))

// Wait until we have 6 gRPC connections.
testutil.Ok(t, query1.WaitSumMetricsWithOptions(e2emon.Equals(6), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics()))
testutil.Ok(t, e2einteractive.OpenInBrowser(fmt.Sprintf("http://%s", query1.Endpoint("http"))))

// Tracing endpoint.
testutil.Ok(t, e2einteractive.OpenInBrowser("http://"+j.Endpoint("http-front")))
testutil.Ok(t, e2einteractive.RunUntilEndpointHit())
}
Loading