Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver: Fix quorum handling for all hashing algorithms #5791

Closed
wants to merge 13 commits into from

Conversation

matej-g
Copy link
Collaborator

@matej-g matej-g commented Oct 14, 2022

  • I added CHANGELOG entry for this change.
  • Change is not relevant to the end user.

Changes

This PR suggest a way to fix #5784 discovered with ketama hashing.

The presumption one replication = one request was true for hashmod hashing, however with ketama hashing this is no longer the case (same could be true for behavior for other algorithms in the future). There can now be requests being made to multiple endpoints as part of single replicated request (i.e. one replication = multiple requests). As described in #5784, this renders the current quorum handling incorrect.

To fix quorum validation, I suggest to:

  • Introduce an additional way to validate quorum - instead of counting individual request, the focus should be on confirming if all requests that have been made on behalf of a replica have succeeded (for simplification I refer to these as 'replica group request'). In this way we can confirm that if successful replica group requests >= success threshold, the quorum has been achieved. For example: for replication factor 3, we will end up with three replica group requests (1,2,3) - to guarantee quorum (2), at least two of these groups need to be written successfully.

  • Additionally validate quorum by counting endpoint failures - besides knowing that we need at least quorum of replica requests to succeed, we also know that due to hashing series each replica must end on a distinct node. This means that even if not all requests for any given replica group succeed, we still know quorum was reached if all request succeeded except for requests going to specific endpoint(s). The number of such failed endpoints can be determined based on quorum.

  • This necessitates to send back the number of replica together with an error (or nil error in case of success), in order to make ti possible to track success of a replica group request and endpoint failures. This PR makes the necessary changes to errors channel in the forward fanout method.

  • Some adjustments to the method determining write success were also made - when determining error for requests going to a particular replica, we need to either be able to find single cause for all errors or return single error signalizing that his replication failed.

  • Another simplification included here is in when determining error cause - if the multi error includes only conflict errors, we do not need to care about the threshold, as we can assume other requests succeeded and we can directly return conflict.

  • I also added read / write interactive tests (makes it easier to manually play with receiver changes)

diagram drawio

Verification

  • Added and extended unit / E2E tests for write error determination and quorum verifying
  • Also ran manual tests and comparisons with / without changes, with replication, with / without outage

If there are only conlifct errors present during the error determination, it can be safely assumed other requests succeeded and therefore we can directly return conflict.

Signed-off-by: Matej Gera <[email protected]>
…ests

Instead of deciding on the basis of individual requests, this change takes into consideration the fact that there can be multiple requests per each replica number (for simplification I refer to these as 'replica groups'). For confirming quorum it is then decisive to assert whether ALL requests succeeded from AT LEAST the number of replica groups equal to the write quorom.

Signed-off-by: Matej Gera <[email protected]>
Extends the E2E tests to cover both hashing algorithms.

Signed-off-by: Matej Gera <[email protected]>
Copy link
Member

@saswatamcode saswatamcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for awesome work! 🌟

Some minor comments.

pkg/receive/handler.go Outdated Show resolved Hide resolved
if success[i] >= replicaGroupReqs[i] {
// If enough replica groups succeed, we can finish early (quorum
// is guaranteed).
replicaGroupSuccess++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So each replicaGroup signifies all the requests made to replicate a batch of series to a replica-endpoint? And if all the requests within that replicaGroup succeed, we count that as a success towards ensuring quorum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the gist 👍, there is now also second way to verify quorum, I added more details in description.

test/e2e/receive_test.go Show resolved Hide resolved
var success int
var (
// success counts how many requests per replica have succedeed.
success = make([]int, h.options.ReplicationFactor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change initializes some slices. Maybe it would be a good idea to benchmark and see changes to hot path?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran BenchmarkHandlerReceiveHTTP, I think we won't be able to avoid those few extra allocs but I think it should still be acceptable?

name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         891µs ± 0%     942µs ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           1.07ms ± 0%    1.12ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       9.42ms ± 0%    9.68ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          11.5ms ± 0%    11.2ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                      39.7ms ± 0%    39.3ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12         49.7ms ± 0%    49.7ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                 85.7ms ± 0%   105.6ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12    87.6ms ± 0%    89.9ms ± 0%   ~     (p=1.000 n=1+1)

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.15MB ± 0%    1.15MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           1.41MB ± 0%    1.41MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       13.1MB ± 0%    13.1MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          15.5MB ± 0%    15.5MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                      53.4MB ± 0%    53.4MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12         62.0MB ± 0%    62.0MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  110MB ± 0%     110MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     110MB ± 0%     110MB ± 0%   ~     (p=1.000 n=1+1)

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         3.10k ± 0%     3.10k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            6.63k ± 0%     6.64k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                        30.3k ± 0%     30.3k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12           65.2k ± 0%     65.2k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                        121k ± 0%      121k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12           260k ± 0%      260k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                   89.0 ± 0%      93.0 ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       217 ± 0%       223 ± 0%   ~     (p=1.000 n=1+1)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With latest changes, still looking fine:

name                                                                                old time/op    new time/op    delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.20ms ± 0%    0.87ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           1.25ms ± 0%    1.05ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       10.6ms ± 0%     9.3ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          12.8ms ± 0%    10.8ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                      46.3ms ± 0%    36.8ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12         55.5ms ± 0%    43.6ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  102ms ± 0%      82ms ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     101ms ± 0%      86ms ± 0%   ~     (p=1.000 n=1+1)

name                                                                                old alloc/op   new alloc/op   delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                        1.15MB ± 0%    1.15MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12           1.41MB ± 0%    1.41MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                       13.1MB ± 0%    13.1MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12          15.5MB ± 0%    15.5MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                      53.3MB ± 0%    53.2MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12         62.0MB ± 0%    62.0MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                  110MB ± 0%     110MB ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12     110MB ± 0%     110MB ± 0%   ~     (p=1.000 n=1+1)

name                                                                                old allocs/op  new allocs/op  delta
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/OK-12                         3.10k ± 0%     3.10k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_500_of_them/conflict_errors-12            6.63k ± 0%     6.64k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/OK-12                        30.3k ± 0%     30.3k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_5000_of_them/conflict_errors-12           65.2k ± 0%     65.2k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/OK-12                        121k ± 0%      121k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/typical_labels_under_1KB,_20000_of_them/conflict_errors-12           260k ± 0%      260k ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/OK-12                   89.0 ± 0%      94.0 ± 0%   ~     (p=1.000 n=1+1)
HandlerReceiveHTTP/extremely_large_label_value_10MB,_10_of_them/conflict_errors-12       218 ± 0%       224 ± 0%   ~     (p=1.000 n=1+1)

pkg/receive/handler.go Outdated Show resolved Hide resolved
@philipgough
Copy link
Contributor

@matej-g I am not familiar enough with this code path to determine correctness here or not, but determining quorum in this fashion makes sense to me for ketama

if err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
}
}
}()
}()

var success int
var (
// success counts how many requests per replica have succedeed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reflect this comment and all the others that follow into their respective variable name? This pattern of comment explaining "what" and not "why" is often a sign that something deserves a better name.

Copy link
Collaborator Author

@matej-g matej-g Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to name it a bit better, although I'm finding it challenging to try and find clear enough name that it would encapsulate the complexity of replication logic and be obvious without additional commentary. Suggestions welcome.

@@ -976,6 +1011,16 @@ func determineWriteErrorCause(err error, threshold int) error {
return err
}

// errWithReplicaFunc is a type to enable sending replica number and err over channel.
type errWithReplicaFunc func() (uint64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this have to a function and can't be a struct { uint64, error }?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not have to, I found it simpler to "send" it via function, no strong preference here though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need a function, I think we should "downgrade" the type to a struct to be as simple as possible.

@@ -226,6 +226,15 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 1,
exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"),
},
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this issue was not caught by tests. Do we run unit tests for Ketama?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have tests to verify quorum but they all run just with hashmod, I'm extending those as well to run with both algorithms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the e2e tests that were extended? Can we also do the same for unit tests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant I'm working on extending those unit tests in handler_test.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, sorry for misunderstanding :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All relevant unit tests now have been expanded to run with both algorithms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, did tests fail before your fix?

Copy link
Collaborator Author

@matej-g matej-g Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, which I believe was mainly because:

  • With replication factor 3 and 3 nodes (which were all the test cases), even with ketama, it probably cannot be caught since the number of requests is small (but even on main expanding tests to simulate 6 node deployment, the failures start to show up)
  • We were running all tests with single time series, which is probably more of an edge case (now I run the tests with 50 series which should give us a good enough test data)
  • The node addresses were randomized, I believe on main on ketama this was causing some inconsistent results (I pushed fix for this now as well to generate the addresses in a predictable manner, no need for randomized addresses I believe)

@douglascamata
Copy link
Contributor

There can now be requests being made to multiple endpoints as part of single replicated request (i.e. one replication = multiple requests).

I think that to avoid confusions we should clarify one fundamental difference between ketama and hashmod: what ketama really does is replicate each timeseries inside a request. This is independent of the amount of requests required to achieve it. Meanwhile, hashmod was replicating a single request.

For example, 2 timeseries with ketama using a replication factor 3 will require between 3 and 6 requests to be fully replicated. This will depend on their hash.

In fact, I am even curious whether our code can prevent 2 replicas of a given series from landing on the same node. Could be a low probability scenario, but on systems running millions of timeseries even a low probability might have a big impact. Do you know if this could happen, @fpetkovski or @matej-g?

@fpetkovski
Copy link
Contributor

This cannot happen anymore. It used to, but is now fixed by ensuring that the replica for each section in the ring is covered by a different node: https://github.com/thanos-io/thanos/blob/main/pkg/receive/hashring.go#L138-L154

- Adds second success mode - even if requests from multiple replica group fails, we can still claim quorum as long as the failed requests are from max permissible number of endpoints (this depends on quorum).
- Fix replica success group count from previous change - the count should be local to the success counting loop
- Adjust error write determination for requests counted per replica- we need to either ensure there is only single error cause or fallback to a single error to signal failed replication; this is because if there's a mixed bag of errors, we cannot guarantee write success and we simply need to return that this particular replication failed
- Add more tags to traces

Signed-off-by: Matej Gera <[email protected]>
- Add new unit tests for error determination
- Simplify and unify quorum unit tests; run for both hashing algorithms; extended unit tests (add tests with 6 nodes and higher repl factor)

Signed-off-by: Matej Gera <[email protected]>
Signed-off-by: Matej Gera <[email protected]>
@matej-g
Copy link
Collaborator Author

matej-g commented Oct 21, 2022

Sorry for the delay folks.

I fixed a few more things, updated the description accordingly (I know there are quite few changes, if something's not clear or if I can make things easier to review by splitting / reorganizing this, please let me know).

I have also expanded all unit tests to verify quorum to run for both algorithms.

@matej-g matej-g marked this pull request as ready for review October 21, 2022 12:02
var success int
var (
// perReplicaSuccess is used to determine if enough requests succeeded for given replica number.
preReplicaSuccess = make([]int, h.options.ReplicationFactor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small typo here:

Suggested change
preReplicaSuccess = make([]int, h.options.ReplicationFactor)
perReplicaSuccess = make([]int, h.options.ReplicationFactor)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be tracking per-shard success rate?

Copy link
Contributor

@douglascamata douglascamata Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean with "shard", @fpetkovski? Is this the same as the "replica group request" mentioned in the description of this pull request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might be talking about the same thing, but I still think this variable is very confusing. Requests get split into independent partitions (shards), and then each shard gets replicated. So I would expect us to track how many successful replications each shard has, e.g perShardSuccessfulReplicas.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow fully as well, but throwing in yet another term is even more confusing for me. Are we talking about the initial batching of requests in https://github.com/thanos-io/thanos/blob/main/pkg/receive/handler.go#L555 when you say 'shards'? Since most of the docs refer to these as 'batches', I'd either stick to this or refer to this as 'shards' everywhere (I don't have strong preference but I associate shards more with actual databases rather than requests).

Other than that you're right, but I named it 'per replica', since each index in the slice tracks success of requests going to given replica (for a single shard / batch), but if it's not obvious enough I can prefix it as you suggested.

Copy link
Contributor

@fpetkovski fpetkovski Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling it a batch makes sense as well, I agree that sharding is not mentioned anywhere else. What I wanted to say is that a receiver is not a replica, because each receiver can hold a unique portion of the data.

Copy link
Contributor

@douglascamata douglascamata Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, in most of the places in this part of the code we could literally replace the word "replica" with "replicationIndex". This confuses me a lot.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed @douglascamata, I think this where lot of the confusion originates, as @fpetkovski also mentions, this creates impression that replica = receiver node, but it's more of a logical replica (since parts of that replica will end up on different nodes). I'd be happy to rename this, I'll also try to improve the other variables and get rid of the "group request" naming.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to improve on the naming somewhat, I also added a diagram to the PR description as @douglascamata suggested, I hope it will make things a bit clearer. What I'm struggling with it seems is how to properly name / encapsulate the logic on the very right side of the diagram, i.e. confirming success of replication requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amaaaaaazing, @matej-g!

pkg/receive/handler.go Outdated Show resolved Hide resolved
Signed-off-by: Matej Gera <[email protected]>
Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, thanks! I think I need to spend more on understanding those two success modes. Something does not add up to me... but maybe it's late here (:


// TestReadWriteThanosSetup sets up a Thanos deployment with 6 receiver replicas and replication factor 3 for writing.
// On top of that, the written metrics are queryable via querier.
func TestReadWriteThanosSetup(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some automatic avalanche/Prometheus to this setup to continuously write and then we could manually (or automatically within e2e test if we don't to this yet) play with killing one container.

// replica encapsulates the replica number of a request and if the request is
// already replicated.
// replica encapsulates the replication index of a request and if the request is
// already replicated. Replication index represents the number of logical replica to which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// already replicated. Replication index represents the number of logical replica to which
// already replicated. Replication index represents the number of logical replicas to which

}
finalErr := errs.Err()
// Second success mode - it is permissible to fail all requests to specific endpoints,
// up to maxEndpointFailures, in which case we still know quorum is reached.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I read through the descriptions, and I still can't tell why we need this. Even despite your great diagrams and TWO definitions 🙈 sorry.

Essentially do we need the first success mode if we have such a trivial second mode? Can we have just a second mode? I think we should be safe if one receiver is down - that's the purpose of hashring - but maybe our hashring algorithm is making the wrong decision here and we don't have this invariant:

image

I guess I need to spend more time to understand this..

})

t.Run("replication_with_outage", func(t *testing.T) {
t.Run("replication_hashmod_with_outage", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
t.Run("replication_hashmod_with_outage", func(t *testing.T) {
t.Run("replication_hashmod_with_acceptable_outage", func(t *testing.T) {

It would be nice to add test with unacceptable outage

@bwplotka
Copy link
Member

bwplotka commented Nov 1, 2022

Furthermore to me, e2e test receive_ketama_outage is unconclusive. It passes with:

  • Old Thanos version before this PR
  • With Thanos code from your PR
  • With second success mode disabled 🙈

image

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work, but I feel there is a chance to push the #5807 idea further and simplify the whole flow tremendously. 🤔 Let's investigate it more, will try to help, plus we can talk on PromCon!

@fpetkovski
Copy link
Contributor

I agree that the current way replication works is not ideal, but if this is an issue we should probably prioritize fixing it. So unless the idea from #5807 is easy to implement, I don't think we should delay the fix until we have a pretty solution.

@matej-g
Copy link
Collaborator Author

matej-g commented Nov 16, 2022

@fpetkovski we had finally some F2F time yesterday on this and we're considering if there's better alternative to this, but I'd agree with prioritizing the fix. I'll try to have this resolved by the end of this week.

// It will inspect the error's cause if the error is a MultiError. It will return cause of each contained error but will not traverse any deeper.
// If no cause can be determined, the original error is returned. When forReplication is true,
// it will return failed replication error on any unknown error cause.
func determineWriteErrorCause(err error, threshold int, forReplication bool) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this fixing a separate issue?

@fpetkovski
Copy link
Contributor

fpetkovski commented Nov 20, 2022

I tried solving the issue independently to see if we converge to the same solution: #5910

I think we're pretty much aligned, with the exception of sending a function over a channel which I found a bit confusing 😄
I also still cannot wrap my head around counting successes by replica, maybe the math works out but I cannot do it easily in my head. If we count successes by batch instead, we can simply calculate quorum on each individual batch separately and fail if any of those quorums are not met.

@matej-g
Copy link
Collaborator Author

matej-g commented Nov 21, 2022

I tried solving the issue independently to see if we converge to the same solution: #5910

Thanks, I took a look, it makes sense to me 👍.

I think we're pretty much aligned, with the exception of sending a function over a channel which I found a bit confusing smile I also still cannot wrap my head around counting successes by replica, maybe the math works out but I cannot do it easily in my head. If we count successes by batch instead, we can simply calculate quorum on each individual batch separately and fail if any of those quorums are not met.

For the sending function over func, I think @douglascamata also pointed this out, and I don't have a strong stance on that, we can send a struct as well, but I thought just to be able to send two multiple independent values this a common solution (but perhaps not).

As for the replication success counting, yes, I think we're on the similar page, but in #5910 you do it for each batch vs. here I try to achieve it globally, but your way seems less complex.

@matej-g
Copy link
Collaborator Author

matej-g commented Nov 29, 2022

Closing as this has been superseded by #5910

@douglascamata
Copy link
Contributor

@matej-g I think you forgot to press the "Close" button.

@fpetkovski
Copy link
Contributor

Thanks @douglascamata for keeping track of this. Since @matej-g is away, let's close the issue for now. It's also out of sync with main. We can always ask for forgiveness from Matej if he objects :)

@fpetkovski fpetkovski closed this Dec 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Receive: Ketama replication quorum handling is incorrect
6 participants