From d25ac5c3e356ec5ad7149a95f760b086264fafa3 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 20 Jan 2022 18:04:15 +0100 Subject: [PATCH] kvserver: replace circuit breaker cancel goroutine with per-Replica registry Total WIP for #74707. Don't review until out of draft. Release note: None --- .../client_replica_circuit_breaker_test.go | 19 +++ pkg/kv/kvserver/helpers_test.go | 4 + pkg/kv/kvserver/replica_circuit_breaker.go | 112 +++++++++++++++++- .../kvserver/replica_circuit_breaker_test.go | 65 ++++++++++ pkg/kv/kvserver/replica_init.go | 1 - pkg/kv/kvserver/replica_send.go | 4 +- 6 files changed, 200 insertions(+), 5 deletions(-) diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index f75dec6bb1f0..5a5c106f5e8f 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -540,6 +540,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque ba.Timestamp = repl.Clock().Now() ba.Add(req) ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + // Tag the breaker with the request. Once Send returns, we'll check that it's + // no longer tracked by the breaker. This gives good coverage that we're not + // going to leak memory. + ctx = context.WithValue(ctx, req, struct{}{}) + defer cancel() _, pErr := repl.Send(ctx, ba) // If our context got canceled, return an opaque error regardless of presence or @@ -548,6 +553,20 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque if err := ctx.Err(); err != nil { pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) } + { + var err error + repl.VisitBreakerContexts(func(ctx context.Context) { + if err == nil && ctx.Value(req) != nil { + err = errors.Errorf( + "request %s returned but context still tracked in breaker", req, + ) + } + }) + if err != nil { + pErr = roachpb.NewErrorf("%s; after %v", err, pErr) + } + } + return pErr.GoError() } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 42b4ec4e6f78..0057bc748e0c 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -226,6 +226,10 @@ func (r *Replica) Breaker() *circuit2.Breaker { return r.breaker.wrapped } +func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) { + r.breaker.visitCancels(fn) +} + func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go index 11815e0d9b41..0b4cab58131b 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker.go +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "go.etcd.io/etcd/raft/v3" @@ -61,9 +62,112 @@ type replicaCircuitBreaker struct { stopper *stop.Stopper r replicaInCircuitBreaker st *cluster.Settings + cancels struct { + syncutil.Mutex + m map[context.Context]func() + } wrapped *circuit.Breaker } +// Register takes a cancelable context and its cancel function. The +// context is cancelled when the circuit breaker trips. If the breaker is +// already tripped, its error is returned immediately and the caller should not +// continue processing the request. Otherwise, the caller is provided with a +// signaller for use in a deferred call to maybeAdjustWithBreakerError, which +// will annotate the outgoing error in the event of the breaker tripping while +// the request is processing. +func (br *replicaCircuitBreaker) Register( + ctx context.Context, cancel func(), +) (_token interface{}, _ signaller, _ error) { + // We intentionally lock over the Signal() call and Err() check to avoid races + // where the breaker trips but the cancel() function is not invoked. Both of + // these calls are cheap and do not allocate. + br.cancels.Lock() + defer br.cancels.Unlock() + + brSig := br.Signal() + if isCircuitBreakerProbe(ctx) { + brSig = neverTripSignaller{} + } + + if err := brSig.Err(); err != nil { + // TODO(tbg): we may want to exclude some requests from this check, or allow + // requests to exclude themselves from the check (via their header). + cancel() + return nil, nil, err + } + br.cancels.m[ctx] = cancel + + // The token is the context, saving allocations. + return ctx, brSig, nil +} + +func (br *replicaCircuitBreaker) Unregister( + tok interface{}, sig signaller, pErr *roachpb.Error, +) *roachpb.Error { + brErr := sig.Err() + if sig.C() == nil { + // Breakers were disabled and we never put the cancel in the registry. + return pErr + } + + br.cancels.Lock() + delete(br.cancels.m, tok.(context.Context)) + br.cancels.Unlock() + + err := pErr.GoError() + if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { + // The breaker tripped while a command was inflight, so we have to + // propagate an ambiguous result. We don't want to replace it, but there + // is a way to stash an Error in it so we use that. + // + // TODO(tbg): could also wrap it; there is no other write to WrappedErr + // in the codebase and it might be better to remove it. Nested *Errors + // are not a good idea. + wrappedErr := brErr + if ae.WrappedErr != nil { + wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) + } + ae.WrappedErr = roachpb.NewError(wrappedErr) + return roachpb.NewError(ae) + } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { + // When a lease acquisition triggered by this request is short-circuited + // by the breaker, it will return an opaque NotLeaseholderError, which we + // replace with the breaker's error. + return roachpb.NewError(errors.CombineErrors(brErr, le)) + } + return pErr +} + +func (br *replicaCircuitBreaker) visitCancels(f func(context.Context)) { + br.cancels.Lock() + for ctx := range br.cancels.m { + f(ctx) + } + br.cancels.Unlock() +} + +func (br *replicaCircuitBreaker) cancelAllTrackedProposals() error { + br.cancels.Lock() + defer br.cancels.Unlock() + // NB: this intentionally consults the wrapped Signal(), which does not check + // the breaker cluster setting. + if br.wrapped.Signal().Err() == nil { + // TODO(tbg): in the future, we may want to trigger the probe even if + // the breaker is not tripped. For example, if we *suspect* that there + // might be something wrong with the range but we're not quite sure, + // we would like to let the probe decide whether to trip. Once/if we + // do this, this code may need to become more permissive and we'll + // have to re-work where the cancellation actually occurs. + return errors.AssertionFailedf("asked to cancel all proposals, but breaker is not tripped") + } + for _, cancel := range br.cancels.m { + cancel() + } + br.cancels.m = map[context.Context]func(){} + return nil +} + func (br *replicaCircuitBreaker) enabled() bool { return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest) @@ -96,7 +200,7 @@ type neverTripSignaller struct{} func (s neverTripSignaller) Err() error { return nil } func (s neverTripSignaller) C() <-chan struct{} { return nil } -func (br *replicaCircuitBreaker) Signal() signaller { +func (br *replicaCircuitBreaker) Signal() signaller { // TODO(tbg): unexport? if !br.enabled() { return neverTripSignaller{} } @@ -117,7 +221,7 @@ func newReplicaCircuitBreaker( r: r, st: cs, } - + br.cancels.m = map[context.Context]func(){} br.wrapped = circuit.NewBreaker(circuit.Options{ Name: "breaker", // log bridge has ctx tags AsyncProbe: br.asyncProbe, @@ -173,6 +277,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { return } + if err := br.cancelAllTrackedProposals(); err != nil { + log.Errorf(ctx, "%s", err) + // Proceed. + } err := sendProbe(ctx, br.r) report(err) }); err != nil { diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go index b056713c36a2..88eec56b197a 100644 --- a/pkg/kv/kvserver/replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -11,14 +11,20 @@ package kvserver import ( + "context" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "go.etcd.io/etcd/raft/v3" ) @@ -40,3 +46,62 @@ func TestReplicaUnavailableError(t *testing.T) { err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt")) } + +type circuitBreakerReplicaMock struct { + clock *hlc.Clock +} + +func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock { + return c.clock +} + +func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor { + //TODO implement me + panic("implement me") +} + +func (c *circuitBreakerReplicaMock) Send( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + return ba.CreateReply(), nil +} + +func (c *circuitBreakerReplicaMock) slowReplicationThreshold( + ba *roachpb.BatchRequest, +) (time.Duration, bool) { + return 0, false +} + +func (c *circuitBreakerReplicaMock) replicaUnavailableError() error { + return errors.New("unavailable") +} + +func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + ctx := context.Background() + + st := cluster.MakeTestingClusterSettings() + // Enable circuit breakers. + replicaCircuitBreakerSlowReplicationThreshold.Override(ctx, &st.SV, time.Hour) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)} + onTrip := func() {} + onReset := func() {} + br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, onTrip, onReset) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + ctx, cancel := context.WithCancel(ctx) + tok, sig, err := br.Register(ctx, cancel) + if err != nil { + b.Error(err) + } + if pErr := br.Unregister(tok, sig, nil); pErr != nil { + b.Error(pErr) + } + cancel() + } + }) +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 44a810af6798..b5347562618c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -125,7 +125,6 @@ func newUnloadedReplica( // replica GC issues, but is a distraction at the moment. // r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r))) r.raftMu.stateLoader = stateloader.Make(desc.RangeID) - r.splitQueueThrottle = util.Every(splitQueueThrottleDuration) r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index a0b4394a7a03..a2ae163aa4b3 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -204,12 +204,12 @@ func (r *Replica) sendWithoutRangeID( // Circuit breaker handling. ctx, cancel := context.WithCancel(ctx) - brSig, err := r.checkCircuitBreaker(ctx, cancel) + tok, brSig, err := r.breaker.Register(ctx, cancel) if err != nil { return nil, roachpb.NewError(err) } defer func() { - rErr = maybeAdjustWithBreakerError(rErr, brSig.Err()) + rErr = r.breaker.Unregister(tok, brSig, rErr) cancel() }()