Skip to content

Commit

Permalink
kvserver: replace circuit breaker cancel goroutine with per-Replica r…
Browse files Browse the repository at this point in the history
…egistry

Total WIP for cockroachdb#74707. Don't review until out of draft.

Release note: None
  • Loading branch information
tbg committed Jan 25, 2022
1 parent 44b2ba8 commit d25ac5c
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 5 deletions.
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
ba.Timestamp = repl.Clock().Now()
ba.Add(req)
ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration)
// Tag the breaker with the request. Once Send returns, we'll check that it's
// no longer tracked by the breaker. This gives good coverage that we're not
// going to leak memory.
ctx = context.WithValue(ctx, req, struct{}{})

defer cancel()
_, pErr := repl.Send(ctx, ba)
// If our context got canceled, return an opaque error regardless of presence or
Expand All @@ -548,6 +553,20 @@ func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Reque
if err := ctx.Err(); err != nil {
pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr)
}
{
var err error
repl.VisitBreakerContexts(func(ctx context.Context) {
if err == nil && ctx.Value(req) != nil {
err = errors.Errorf(
"request %s returned but context still tracked in breaker", req,
)
}
})
if err != nil {
pErr = roachpb.NewErrorf("%s; after %v", err, pErr)
}
}

return pErr.GoError()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (r *Replica) Breaker() *circuit2.Breaker {
return r.breaker.wrapped
}

func (r *Replica) VisitBreakerContexts(fn func(ctx context.Context)) {
r.breaker.visitCancels(fn)
}

func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
Expand Down
112 changes: 110 additions & 2 deletions pkg/kv/kvserver/replica_circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -61,9 +62,112 @@ type replicaCircuitBreaker struct {
stopper *stop.Stopper
r replicaInCircuitBreaker
st *cluster.Settings
cancels struct {
syncutil.Mutex
m map[context.Context]func()
}
wrapped *circuit.Breaker
}

// Register takes a cancelable context and its cancel function. The
// context is cancelled when the circuit breaker trips. If the breaker is
// already tripped, its error is returned immediately and the caller should not
// continue processing the request. Otherwise, the caller is provided with a
// signaller for use in a deferred call to maybeAdjustWithBreakerError, which
// will annotate the outgoing error in the event of the breaker tripping while
// the request is processing.
func (br *replicaCircuitBreaker) Register(
ctx context.Context, cancel func(),
) (_token interface{}, _ signaller, _ error) {
// We intentionally lock over the Signal() call and Err() check to avoid races
// where the breaker trips but the cancel() function is not invoked. Both of
// these calls are cheap and do not allocate.
br.cancels.Lock()
defer br.cancels.Unlock()

brSig := br.Signal()
if isCircuitBreakerProbe(ctx) {
brSig = neverTripSignaller{}
}

if err := brSig.Err(); err != nil {
// TODO(tbg): we may want to exclude some requests from this check, or allow
// requests to exclude themselves from the check (via their header).
cancel()
return nil, nil, err
}
br.cancels.m[ctx] = cancel

// The token is the context, saving allocations.
return ctx, brSig, nil
}

func (br *replicaCircuitBreaker) Unregister(
tok interface{}, sig signaller, pErr *roachpb.Error,
) *roachpb.Error {
brErr := sig.Err()
if sig.C() == nil {
// Breakers were disabled and we never put the cancel in the registry.
return pErr
}

br.cancels.Lock()
delete(br.cancels.m, tok.(context.Context))
br.cancels.Unlock()

err := pErr.GoError()
if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) {
// The breaker tripped while a command was inflight, so we have to
// propagate an ambiguous result. We don't want to replace it, but there
// is a way to stash an Error in it so we use that.
//
// TODO(tbg): could also wrap it; there is no other write to WrappedErr
// in the codebase and it might be better to remove it. Nested *Errors
// are not a good idea.
wrappedErr := brErr
if ae.WrappedErr != nil {
wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr)
}
ae.WrappedErr = roachpb.NewError(wrappedErr)
return roachpb.NewError(ae)
} else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) {
// When a lease acquisition triggered by this request is short-circuited
// by the breaker, it will return an opaque NotLeaseholderError, which we
// replace with the breaker's error.
return roachpb.NewError(errors.CombineErrors(brErr, le))
}
return pErr
}

func (br *replicaCircuitBreaker) visitCancels(f func(context.Context)) {
br.cancels.Lock()
for ctx := range br.cancels.m {
f(ctx)
}
br.cancels.Unlock()
}

func (br *replicaCircuitBreaker) cancelAllTrackedProposals() error {
br.cancels.Lock()
defer br.cancels.Unlock()
// NB: this intentionally consults the wrapped Signal(), which does not check
// the breaker cluster setting.
if br.wrapped.Signal().Err() == nil {
// TODO(tbg): in the future, we may want to trigger the probe even if
// the breaker is not tripped. For example, if we *suspect* that there
// might be something wrong with the range but we're not quite sure,
// we would like to let the probe decide whether to trip. Once/if we
// do this, this code may need to become more permissive and we'll
// have to re-work where the cancellation actually occurs.
return errors.AssertionFailedf("asked to cancel all proposals, but breaker is not tripped")
}
for _, cancel := range br.cancels.m {
cancel()
}
br.cancels.m = map[context.Context]func(){}
return nil
}

func (br *replicaCircuitBreaker) enabled() bool {
return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 &&
br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest)
Expand Down Expand Up @@ -96,7 +200,7 @@ type neverTripSignaller struct{}
func (s neverTripSignaller) Err() error { return nil }
func (s neverTripSignaller) C() <-chan struct{} { return nil }

func (br *replicaCircuitBreaker) Signal() signaller {
func (br *replicaCircuitBreaker) Signal() signaller { // TODO(tbg): unexport?
if !br.enabled() {
return neverTripSignaller{}
}
Expand All @@ -117,7 +221,7 @@ func newReplicaCircuitBreaker(
r: r,
st: cs,
}

br.cancels.m = map[context.Context]func(){}
br.wrapped = circuit.NewBreaker(circuit.Options{
Name: "breaker", // log bridge has ctx tags
AsyncProbe: br.asyncProbe,
Expand Down Expand Up @@ -173,6 +277,10 @@ func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) {
return
}

if err := br.cancelAllTrackedProposals(); err != nil {
log.Errorf(ctx, "%s", err)
// Proceed.
}
err := sendProbe(ctx, br.r)
report(err)
}); err != nil {
Expand Down
65 changes: 65 additions & 0 deletions pkg/kv/kvserver/replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
package kvserver

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft/v3"
)
Expand All @@ -40,3 +46,62 @@ func TestReplicaUnavailableError(t *testing.T) {
err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs)
echotest.Require(t, string(redact.Sprint(err)), testutils.TestDataPath(t, "replica_unavailable_error.txt"))
}

type circuitBreakerReplicaMock struct {
clock *hlc.Clock
}

func (c *circuitBreakerReplicaMock) Clock() *hlc.Clock {
return c.clock
}

func (c *circuitBreakerReplicaMock) Desc() *roachpb.RangeDescriptor {
//TODO implement me
panic("implement me")
}

func (c *circuitBreakerReplicaMock) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
}

func (c *circuitBreakerReplicaMock) slowReplicationThreshold(
ba *roachpb.BatchRequest,
) (time.Duration, bool) {
return 0, false
}

func (c *circuitBreakerReplicaMock) replicaUnavailableError() error {
return errors.New("unavailable")
}

func BenchmarkReplicaCircuitBreaker_Register(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
ctx := context.Background()

st := cluster.MakeTestingClusterSettings()
// Enable circuit breakers.
replicaCircuitBreakerSlowReplicationThreshold.Override(ctx, &st.SV, time.Hour)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
r := &circuitBreakerReplicaMock{clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond)}
onTrip := func() {}
onReset := func() {}
br := newReplicaCircuitBreaker(st, stopper, log.AmbientContext{}, r, onTrip, onReset)

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx, cancel := context.WithCancel(ctx)
tok, sig, err := br.Register(ctx, cancel)
if err != nil {
b.Error(err)
}
if pErr := br.Unregister(tok, sig, nil); pErr != nil {
b.Error(pErr)
}
cancel()
}
})
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ func newUnloadedReplica(
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
r.raftMu.stateLoader = stateloader.Make(desc.RangeID)

r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ func (r *Replica) sendWithoutRangeID(

// Circuit breaker handling.
ctx, cancel := context.WithCancel(ctx)
brSig, err := r.checkCircuitBreaker(ctx, cancel)
tok, brSig, err := r.breaker.Register(ctx, cancel)
if err != nil {
return nil, roachpb.NewError(err)
}
defer func() {
rErr = maybeAdjustWithBreakerError(rErr, brSig.Err())
rErr = r.breaker.Unregister(tok, brSig, rErr)
cancel()
}()

Expand Down

0 comments on commit d25ac5c

Please sign in to comment.