From 487afe348b5f0617f78403c4cacb0f10edd79721 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 21 Oct 2021 12:34:15 +0200 Subject: [PATCH] kv: circuit-break requests to unavailable replicas Fixes #33007. Closes #61311. This PR uses the circuit breaker package introduced in #73641 to address issue #33007: When a replica becomes unavailable, it should eagerly refuse traffic that it believes would simply hang. Concretely, every request to the Replica gets a cancelable context that is sensitive to the circuit breaker tripping (relying on context cancellation makes sure we don't end up with a second channel that needs to be plumbed to everyone and their dog; Go makes it really difficult to join two cancellation chains); if the breaker is tripped when the request arrives, it is refused right away. In either case, the outgoing error is augmented to carry information about the tripped breaker. This isn't 100% trivial, since retreating from in-flight replication typically causes an `AmbiguousResultError`, and while we could avoid it in some cases we can't in all. A similar problem occurs when the canceled request was waiting on a lease, in which case the result is a NotLeaseholderError. For any request that made it in, if it enters replication but does not manage to succeed within the timeout specified by the `kv.replica_circuit_breaker.slow_replication_threshold` cluster setting, the breaker is tripped, cancelling all inflight and future requests until the breaker heals. Perhaps surprisingly, the existing "slowness" detection (the informational "have been waiting ... for proposal" warning in `executeWriteBatch`) was moved deeper into replication (`refreshProposalsLocked`), where it now trips the breaker. This has the advantage of providing a unified path for lease requests (which don't go through `executeWriteBatch`) and pipelined writes (which return before waiting on the inflight replication process). To make this work, we need to pick a little fight with how leases are (not) refreshed (#74711) and we need to remember the ticks at which a proposal was first inserted (despite potential reproposals). Perhaps surprisingly, when the breaker is tripped, *all* traffic to the Replica gets the fail-fast behavior, not just mutations. This is because even though a read may look good to serve based on the lease, we would also need to check for latches, and in particular we would need to fail-fast if there is a transitive dependency on any write (since that write is presumably stuck). This is not trivial and so we don't do it in this first iteration (see #74799). A tripped breaker deploys a background probe which sends a `ProbeRequest` (introduced in #72972) to determine when to heal; this is roughly the case whenever replicating a command through Raft is possible for the Replica, either by appending to the log as the Raft leader, or by forwarding to the Raft leader. A tiny bit of special casing allows requests made by the probe to bypass the breaker. As of this PR, the cluster setting defaults to zero (disabling the entire mechanism) until some necessary follow-up items have been addressed (see #74705). For example, the breaker-sensitive context cancelation is a toy implementation that comes with too much of a performance overhead (one extra goroutine per request); #74707 will address that. Other things not done here that we certainly want in the 22.1 release are UI work (#74713) and metrics (#74505). The release note is deferred to #74705 (where breakers are turned on). Release note: None touchie --- docs/generated/settings/settings.html | 1 + pkg/kv/kvserver/BUILD.bazel | 6 + .../client_replica_circuit_breaker_test.go | 600 ++++++++++++++++++ pkg/kv/kvserver/helpers_test.go | 5 + pkg/kv/kvserver/kvserverpb/state.proto | 3 + pkg/kv/kvserver/replica.go | 13 + pkg/kv/kvserver/replica_circuit_breaker.go | 229 +++++++ .../kvserver/replica_circuit_breaker_test.go | 42 ++ pkg/kv/kvserver/replica_init.go | 2 + pkg/kv/kvserver/replica_proposal.go | 4 + pkg/kv/kvserver/replica_proposal_buf.go | 3 + pkg/kv/kvserver/replica_raft.go | 61 +- pkg/kv/kvserver/replica_range_lease.go | 5 + pkg/kv/kvserver/replica_send.go | 76 ++- pkg/kv/kvserver/replica_test.go | 39 -- pkg/kv/kvserver/replica_write.go | 80 --- pkg/kv/kvserver/store.go | 7 - .../testdata/replica_unavailable_error.txt | 3 + pkg/kv/kvserver/testing_knobs.go | 4 + pkg/roachpb/errors.go | 3 + pkg/testutils/lint/lint_test.go | 1 + pkg/util/hlc/hlc.go | 10 + 22 files changed, 1068 insertions(+), 129 deletions(-) create mode 100644 pkg/kv/kvserver/client_replica_circuit_breaker_test.go create mode 100644 pkg/kv/kvserver/replica_circuit_breaker.go create mode 100644 pkg/kv/kvserver/replica_circuit_breaker_test.go create mode 100644 pkg/kv/kvserver/testdata/replica_unavailable_error.txt diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 6d804c80b0e4..c000c4280f56 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -40,6 +40,7 @@ kv.range_split.by_load_enabledbooleantrueallow automatic splits of ranges based on where load is concentrated kv.range_split.load_qps_thresholdinteger2500the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabledbooleanfalseif set, rangefeed registration is enabled +kv.replica_circuit_breaker.slow_replication_thresholdduration0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers) kv.replication_reports.intervalduration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index cb45e57eb7f4..cb174368921c 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "replica_application_state_machine.go", "replica_backpressure.go", "replica_batch_updates.go", + "replica_circuit_breaker.go", "replica_closedts.go", "replica_command.go", "replica_consistency.go", @@ -162,6 +163,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/bufalloc", + "//pkg/util/circuit", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", @@ -221,6 +223,7 @@ go_test( "client_rangefeed_test.go", "client_relocate_range_test.go", "client_replica_backpressure_test.go", + "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", "client_replica_test.go", "client_spanconfigs_test.go", @@ -252,6 +255,7 @@ go_test( "replica_application_cmd_buf_test.go", "replica_application_state_machine_test.go", "replica_batch_updates_test.go", + "replica_circuit_breaker_test.go", "replica_closedts_internal_test.go", "replica_closedts_test.go", "replica_command_test.go", @@ -364,6 +368,7 @@ go_test( "//pkg/storage/enginepb", "//pkg/storage/fs", "//pkg/testutils", + "//pkg/testutils/echotest", "//pkg/testutils/gossiputil", "//pkg/testutils/kvclientutils", "//pkg/testutils/serverutils", @@ -374,6 +379,7 @@ go_test( "//pkg/ts/tspb", "//pkg/util", "//pkg/util/caller", + "//pkg/util/circuit", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go new file mode 100644 index 000000000000..0636e622b0ad --- /dev/null +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -0,0 +1,600 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/circuit" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// In all scenarios below, we are starting out with our range on n1 and n2, +// and all other ranges (in particular the liveness range) on n1. +// +// TODO(tbg): via tracing, test that when the breaker is tripped, requests fail +// fast right upon entering the replica. + +const ( + n1 = 0 + n2 = 1 + + pauseHeartbeats = true + keepHeartbeats = true +) + +// This is a sanity check in which the breaker plays no role. +func TestReplicaCircuitBreaker_NotTripped(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Circuit breaker doesn't get in the way of anything unless + // something trips it. + require.NoError(t, tc.Write(n1)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + require.NoError(t, tc.Read(n1)) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) +} + +// In this test, n1 holds the lease and we disable the probe and trip the +// breaker. While the breaker is tripped, requests fail-fast with either a +// breaker or lease error. When the probe is re-enabled, everything heals. +func TestReplicaCircuitBreaker_LeaseholderTripped(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Get lease on n1. + require.NoError(t, tc.Write(n1)) + // Disable the probe so that when the breaker trips, it stays tripped. + tc.SetProbeEnabled(n1, false) + tc.Report(n1, errors.New("boom")) + + // n1 could theoretically still serve reads (there is a valid lease + // and none of the latches are taken), but since it is hard to determine + // that upfront we currently fail all reads as well. + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + + // When we go through the KV client stack, we still get the breaker error + // back. + tc.RequireIsBreakerOpen(t, tc.WriteDS(n1)) + tc.RequireIsBreakerOpen(t, tc.WriteDS(n2)) + + // n2 does not have the lease so all it does is redirect to the leaseholder + // n1. + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + + // Enable the probe. Even a read should trigger the probe + // and within due time the breaker should heal. + tc.SetProbeEnabled(n1, true) + tc.UntripsSoon(t, tc.Read, n1) + // Same behavior on writes. + tc.Report(n1, errors.New("boom again")) + tc.UntripsSoon(t, tc.Write, n1) +} + +// In this scenario we have n1 holding the lease and we permanently trip the +// breaker on follower n2. Before the breaker is tripped, we see +// NotLeaseholderError. When it's tripped, those are supplanted by the breaker +// errors. Once we allow the breaker to probe, the breaker untrips. In +// particular, this tests that the probe can succeed even when run on a +// follower (which would not be true if it required the local Replica to +// execute an operation that requires the lease). +func TestReplicaCircuitBreaker_FollowerTripped(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Get lease on n1. + require.NoError(t, tc.Write(n1)) + // Disable the probe on n2 so that when the breaker trips, it stays tripped. + tc.SetProbeEnabled(n2, false) + tc.Report(n2, errors.New("boom")) + + // We didn't trip the leaseholder n1, so it is unaffected. + require.NoError(t, tc.Read(n1)) + require.NoError(t, tc.Write(n1)) + // Even if we go through DistSender, we reliably reach the leaseholder. + // TODO(tbg): I think this relies on the leaseholder being cached. If + // DistSender tried to contact the follower and got the breaker error, at + // time of writing it would propagate it. + require.NoError(t, tc.WriteDS(n1)) + + tc.RequireIsBreakerOpen(t, tc.Read(n2)) + tc.RequireIsBreakerOpen(t, tc.Write(n2)) + + // Enable the probe. Even a read should trigger the probe + // and within due time the breaker should heal, giving us + // NotLeaseholderErrors again. + // + // TODO(tbg): this test would be more meaningful with follower reads. They + // should succeed when the breaker is open and fail if the breaker is + // tripped. However knowing that the circuit breaker check sits at the top + // of Replica.sendWithRangeID, it's clear that it won't make a difference. + tc.SetProbeEnabled(n2, true) + testutils.SucceedsSoon(t, func() error { + if err := tc.Read(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + return err + } + return nil + }) + // Same behavior on writes. + tc.Report(n2, errors.New("boom again")) + testutils.SucceedsSoon(t, func() error { + if err := tc.Write(n2); !errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) { + return err + } + return nil + }) +} + +// In this scenario, the breaker is tripped and the probe is disabled and +// additionally, the liveness records for both nodes have expired. Soon after +// the probe is re-enabled, the breaker heals. In particular, the probe isn't +// doing anything that requires the lease (or whatever it does that requires +// the lease is sufficiently special cased; at time of writing it's the former +// but as the probe learns deeper checks, the plan is ultimately the latter). +func TestReplicaCircuitBreaker_LeaselessTripped(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Put the lease on n1 but then trip the breaker with the probe + // disabled. + require.NoError(t, tc.Write(n1)) + tc.SetProbeEnabled(n1, false) + tc.Report(n1, errors.New("boom")) + resumeHeartbeats := tc.ExpireAllLeases(t, pauseHeartbeats) + + // n2 (not n1) will return a NotLeaseholderError. This may be surprising - + // why isn't it trying and succeeding to acquire a lease - but it does + // not do that because it sees that the new leaseholder (n2) is not live + // itself. We'll revisit this after re-enabling liveness later in the test. + { + err := tc.Read(n2) + // At time of writing: not incrementing epoch on n1 because next + // leaseholder (n2) not live. + t.Log(err) + tc.RequireIsNotLeaseholderError(t, err) + // Same behavior for write on n2. + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + } + // On n1, run into the circuit breaker when requesting lease. + { + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + } + + // Let the breaker heal and things should go back to normal. This is not a + // trivial thing to hold, as the probe needs to go through for this, and if + // we're not careful, the probe itself is held up by the breaker as well, or + // the probe will try to acquire a lease (which we're currently careful to + // avoid). + resumeHeartbeats() + tc.SetProbeEnabled(n1, true) + tc.UntripsSoon(t, tc.Read, n1) + tc.UntripsSoon(t, tc.Write, n1) + tc.RequireIsNotLeaseholderError(t, tc.Read(n2)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) +} + +// In this test, the range is on n1 and n2 and we take down the follower n2, +// thus losing quorum (but not the lease or leaseholder). After the +// SlowReplicationThreshold (which is reduced suitably to keep the test +// snappy) has passed, the breaker on n1's Replica trips. When n2 comes back, +// the probe on n1 succeeds and requests to n1 can acquire a lease and +// succeed. +func TestReplicaCircuitBreaker_Leaseholder_QuorumLoss(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Get lease on n1. + require.NoError(t, tc.Write(n1)) + tc.StopServer(n2) // lose quorum + + // We didn't lose the liveness range (which is only on n1). + require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.SetSlowThreshold(10 * time.Millisecond) + { + err := tc.Write(n1) + var ae *roachpb.AmbiguousResultError + require.True(t, errors.As(err, &ae), "%+v", err) + t.Log(err) + } + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + + // Bring n2 back and service should be restored. + tc.SetSlowThreshold(0) // reset + require.NoError(t, tc.RestartServer(n2)) + tc.UntripsSoon(t, tc.Read, n1) + require.NoError(t, tc.Write(n1)) +} + +// In this test, the range is on n1 and n2 and we place the lease on n2 and +// shut down n2 and expire the lease. n1 will be a non-leaseholder without +// quorum, and requests to it should trip the circuit breaker. This is an +// interesting test case internally because here, the request that trips the +// breaker is the slow lease request, and not the test's actual write. Since +// leases have lots of special casing internally, this is easy to get wrong. +func TestReplicaCircuitBreaker_Follower_QuorumLoss(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Get lease to n2 so that we can lose it without taking down the system ranges. + desc := tc.LookupRangeOrFatal(t, tc.ScratchRange(t)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(n2)) + resumeHeartbeats := tc.ExpireAllLeases(t, keepHeartbeats) + tc.StopServer(n2) // lose quorum and leaseholder + resumeHeartbeats() + + // We didn't lose the liveness range (which is only on n1). + require.NoError(t, tc.Server(n1).HeartbeatNodeLiveness()) + tc.SetSlowThreshold(10 * time.Millisecond) + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + + // Bring n2 back and service should be restored. + tc.SetSlowThreshold(0) // reset + require.NoError(t, tc.RestartServer(n2)) + tc.UntripsSoon(t, tc.Read, n1) + require.NoError(t, tc.Write(n1)) +} + +// This test is skipped but documents that the current circuit breakers cannot +// prevent hung requests when the *liveness* range is down. +// +// The liveness range is usually 5x-replicated and so is less likely to lose +// quorum, for resilience against asymmetric partitions it would be nice to +// also trip the local breaker if liveness updated cannot be performed. We +// can't rely on receiving an error from the liveness range back, as we may not +// be able to reach any of its Replicas (and in fact all of its Replicas may +// have been lost, in extreme cases), so we would need to guard all +// interactions with the liveness range in a timeout, which is unsatisfying. +// +// A somewhat related problem needs to be solved for general loss of all +// Replicas of a Range. In that case the request will never reach a +// per-Replica circuit breaker and it will thus fail slow. Instead, we would +// need DistSender to detect this scenario (for example, by cross-checking +// liveness against the available targets, but this gets complicated again +// due to our having bootstrapped liveness on top of the KV stack). +// +// Solving the general problem, however, wouldn't obviate the need for +// special-casing of lease-related liveness interactions, since we also want +// to protect against the case in which the liveness range is "there" but +// simply will not make progress for whatever reason. +// +// An argument can be made that in such a case it is likely that the cluster +// is unavailable in its entirety. +func TestReplicaCircuitBreaker_Liveness_QuorumLoss(t *testing.T) { + defer leaktest.AfterTest(t)() + + skip.IgnoreLint(t, "See: https://github.com/cockroachdb/cockroach/issues/74616") + + defer log.Scope(t).Close(t) + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(context.Background()) + + // Up-replicate liveness range and move lease to n2. + tc.AddVotersOrFatal(t, keys.NodeLivenessPrefix, tc.Target(n2)) + tc.TransferRangeLeaseOrFatal(t, tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix), tc.Target(n2)) + // Sanity check that things still work. + require.NoError(t, tc.Write(n1)) + tc.RequireIsNotLeaseholderError(t, tc.Write(n2)) + // Remove the second replica for our main range. + tc.RemoveVotersOrFatal(t, tc.ScratchRange(t), tc.Target(n2)) + + // Now stop n2. This will lose the liveness range only since the other + // ranges are on n1 only. + tc.StopServer(n2) + + // Expire all leases. We also pause all heartbeats but that doesn't really + // matter since the liveness range is unavailable anyway. + resume := tc.ExpireAllLeases(t, pauseHeartbeats) + defer resume() + + // Since there isn't a lease, and the liveness range is down, the circuit + // breaker should kick into gear. + tc.SetSlowThreshold(10 * time.Millisecond) + + // This is what fails, as the lease acquisition hangs on the liveness range + // but nothing will ever report a problem to the breaker. + tc.RequireIsBreakerOpen(t, tc.Read(n1)) + tc.RequireIsBreakerOpen(t, tc.Write(n1)) + + tc.SetSlowThreshold(0) // reset + require.NoError(t, tc.RestartServer(n2)) + + tc.UntripsSoon(t, tc.Read, n1) + require.NoError(t, tc.Write(n1)) +} + +// Test infrastructure below. + +func makeBreakerToggleable(b *circuit.Breaker) (setProbeEnabled func(bool)) { + opts := b.Opts() + origProbe := opts.AsyncProbe + var disableProbe int32 + opts.AsyncProbe = func(report func(error), done func()) { + if atomic.LoadInt32(&disableProbe) == 1 { + done() + return + } + origProbe(report, done) + } + b.Reconfigure(opts) + return func(to bool) { + var n int32 + if !to { + n = 1 + } + atomic.StoreInt32(&disableProbe, n) + } +} + +type replWithKnob struct { + *kvserver.Replica + setProbeEnabled func(bool) +} + +type circuitBreakerTest struct { + *testcluster.TestCluster + slowThresh *atomic.Value // time.Duration + ManualClock *hlc.HybridManualClock + repls []replWithKnob // 0 -> repl on Servers[0], etc +} + +func runCircuitBreakerTest( + t *testing.T, name string, f func(*testing.T, context.Context, *circuitBreakerTest), +) { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 4*testutils.DefaultSucceedsSoonDuration) + defer cancel() + tc := setupCircuitBreakerTest(t) + defer tc.Stopper().Stop(ctx) + f(t, ctx, tc) + }) +} + +func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { + manualClock := hlc.NewHybridManualClock() + var rangeID int64 // atomic + slowThresh := &atomic.Value{} // supports .SetSlowThreshold(x) + slowThresh.Store(time.Duration(0)) + storeKnobs := &kvserver.StoreTestingKnobs{ + SlowReplicationThresholdOverride: func(ba *roachpb.BatchRequest) time.Duration { + t.Helper() + if rid := roachpb.RangeID(atomic.LoadInt64(&rangeID)); rid == 0 || ba == nil || ba.RangeID != rid { + return 0 + } + dur := slowThresh.Load().(time.Duration) + if dur > 0 { + t.Logf("%s: using slow replication threshold %s", ba.Summary(), dur) + } + return dur // 0 = default + }, + // The test will often check individual replicas and the lease will always be on + // n1. However, we don't control raft leadership placement and without this knob, + // n1 may refuse to acquire the lease, which we don't want. + AllowLeaseRequestProposalsWhenNotLeader: true, + // The TestingApplyFilter prevents n2 from requesting a lease (or from the lease + // being transferred to n2). The test seems to pass pretty reliably without this + // but it can't hurt. + TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + if !args.IsLeaseRequest { + return 0, nil + } + lease := args.State.Lease + if lease == nil { + return 0, nil + } + if lease.Replica.NodeID != 2 { + return 0, nil + } + pErr := roachpb.NewErrorf("test prevents lease acquisition by n2") + return 0, pErr + }, + } + // In some tests we'll restart servers, which means that we will be waiting + // for raft elections. Speed this up by campaigning aggressively. This also + // speeds up the test by calling refreshProposalsLocked more frequently, which + // is where the logic to trip the breaker sits. Together, this cuts most tests + // involving a restart from >4s to ~300ms. + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + raftCfg.RaftHeartbeatIntervalTicks = 1 + raftCfg.RaftElectionTimeoutTicks = 2 + reg := server.NewStickyInMemEnginesRegistry() + args := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: raftCfg, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + StickyEngineRegistry: reg, + }, + Store: storeKnobs, + }, + }, + } + tc := testcluster.StartTestCluster(t, 2, args) + tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllStickyInMemEngines)) + + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '45s'`) + require.NoError(t, err) + + k := tc.ScratchRange(t) + atomic.StoreInt64(&rangeID, int64(tc.LookupRangeOrFatal(t, k).RangeID)) + + tc.AddVotersOrFatal(t, k, tc.Target(1)) + + var repls []replWithKnob + for i := range tc.Servers { + repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(keys.MustAddr(k)) + enableProbe := makeBreakerToggleable(repl.Breaker()) + repls = append(repls, replWithKnob{repl, enableProbe}) + } + return &circuitBreakerTest{ + TestCluster: tc, + ManualClock: manualClock, + repls: repls, + slowThresh: slowThresh, + } +} + +func (cbt *circuitBreakerTest) SetProbeEnabled(idx int, to bool) { + cbt.repls[idx].setProbeEnabled(to) +} + +func (cbt *circuitBreakerTest) Report(idx int, err error) { + cbt.repls[idx].Replica.Breaker().Report(err) +} + +func (cbt *circuitBreakerTest) UntripsSoon(t *testing.T, method func(idx int) error, idx int) { + t.Helper() + testutils.SucceedsSoon(t, func() error { + t.Helper() + err := method(idx) + // All errors coming out should be annotated as coming from + // the circuit breaker. + if err != nil && !errors.Is(err, circuit.ErrBreakerOpen) { + t.Fatalf("saw unexpected error %+v", err) + } + return err + }) +} + +func (cbt *circuitBreakerTest) ExpireAllLeases(t *testing.T, pauseHeartbeats bool) (undo func()) { + t.Helper() + var maxWT int64 + var fs []func() + for _, srv := range cbt.Servers { + lv := srv.NodeLiveness().(*liveness.NodeLiveness) + if pauseHeartbeats { + undo := lv.PauseAllHeartbeatsForTest() + fs = append(fs, undo) + } + self, ok := lv.Self() + require.True(t, ok) + if maxWT < self.Expiration.WallTime { + maxWT = self.Expiration.WallTime + } + } + cbt.ManualClock.Forward(maxWT + 1) + return func() { + for _, f := range fs { + f() + } + } +} + +func (*circuitBreakerTest) sendViaRepl(repl *kvserver.Replica, req roachpb.Request) error { + var ba roachpb.BatchRequest + ba.RangeID = repl.Desc().RangeID + ba.Timestamp = repl.Clock().Now() + ba.Add(req) + ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + defer cancel() + _, pErr := repl.Send(ctx, ba) + // If our context got canceled, return an opaque error regardless of presence or + // absence of actual error. This makes sure we don't accidentally pass tests as + // a result of our context cancellation. + if err := ctx.Err(); err != nil { + pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) + } + return pErr.GoError() +} + +func (*circuitBreakerTest) sendViaDistSender(ds *kvcoord.DistSender, req roachpb.Request) error { + var ba roachpb.BatchRequest + ba.Add(req) + ctx, cancel := context.WithTimeout(context.Background(), testutils.DefaultSucceedsSoonDuration) + defer cancel() + _, pErr := ds.Send(ctx, ba) + // If our context got canceled, return an opaque error regardless of presence or + // absence of actual error. This makes sure we don't accidentally pass tests as + // a result of our context cancellation. + if err := ctx.Err(); err != nil { + pErr = roachpb.NewErrorf("timed out waiting for batch response: %v", pErr) + } + return pErr.GoError() +} + +func (*circuitBreakerTest) RequireIsBreakerOpen(t *testing.T, err error) { + t.Helper() + require.True(t, errors.Is(err, circuit.ErrBreakerOpen), "%+v", err) +} + +func (*circuitBreakerTest) RequireIsNotLeaseholderError(t *testing.T, err error) { + t.Helper() + ok := errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) + require.True(t, ok, "%+v", err) +} + +func (cbt *circuitBreakerTest) Write(idx int) error { + return cbt.writeViaRepl(cbt.repls[idx].Replica) +} + +func (cbt *circuitBreakerTest) WriteDS(idx int) error { + put := roachpb.NewPut(cbt.repls[idx].Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + return cbt.sendViaDistSender(cbt.Servers[idx].DistSender(), put) +} + +// SetSlowThreshold sets the SlowReplicationThreshold for requests sent through the +// test harness (i.e. via Write) to the provided duration. The zero value restores +// the default. +func (cbt *circuitBreakerTest) SetSlowThreshold(dur time.Duration) { + cbt.slowThresh.Store(dur) +} + +func (cbt *circuitBreakerTest) Read(idx int) error { + return cbt.readViaRepl(cbt.repls[idx].Replica) +} + +func (cbt *circuitBreakerTest) writeViaRepl(repl *kvserver.Replica) error { + put := roachpb.NewPut(repl.Desc().StartKey.AsRawKey(), roachpb.MakeValueFromString("hello")) + return cbt.sendViaRepl(repl, put) +} + +func (cbt *circuitBreakerTest) readViaRepl(repl *kvserver.Replica) error { + get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) + return cbt.sendViaRepl(repl, get) +} diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 057e6d2abb43..42b4ec4e6f78 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" + circuit2 "github.com/cockroachdb/cockroach/pkg/util/circuit" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -221,6 +222,10 @@ func NewTestStorePool(cfg StoreConfig) *StorePool { ) } +func (r *Replica) Breaker() *circuit2.Breaker { + return r.breaker.wrapped +} + func (r *Replica) AssertState(ctx context.Context, reader storage.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index 8f484529721f..08034e04dfb3 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -177,6 +177,9 @@ message RangeInfo { // Closed timestamp info communicated through the side-transport. See also // state.raft_closed_timestamp. RangeSideTransportInfo closed_timestamp_sidetransport_info = 19 [(gogoproto.customname) = "ClosedTimestampSideTransportInfo", (gogoproto.nullable) = false ]; + // The circuit breaker error, if any. This is nonzero if and only if the + // circuit breaker on the source Replica is tripped. + string circuit_breaker_error = 20; } // RangeSideTransportInfo describes a range's closed timestamp info communicated diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 10b365396255..16da2b78753f 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -264,6 +264,16 @@ type Replica struct { // miss out on anything. raftCtx context.Context + // breaker is a per-Replica circuit breaker. Its purpose is to avoid incurring + // large (infinite) latencies on client requests when the Replica is unable to + // serve commands. This circuit breaker does *not* recruit the occasional + // request to determine whether it is safe to heal the breaker. Instead, it + // has its own probe that is executed asynchronously and determines when the + // Replica is healthy again. + // + // See replica_circuit_breaker.go for details. + breaker *replicaCircuitBreaker + // raftMu protects Raft processing the replica. // // Locking notes: Replica.raftMu < Replica.mu @@ -1227,6 +1237,9 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { ctx, r.RangeID, r.mu.state.Lease.Replica.NodeID) ri.ClosedTimestampSideTransportInfo.CentralClosed = centralClosed ri.ClosedTimestampSideTransportInfo.CentralLAI = centralLAI + if err := r.breaker.Signal().Err(); err != nil { + ri.CircuitBreakerError = err.Error() + } return ri } diff --git a/pkg/kv/kvserver/replica_circuit_breaker.go b/pkg/kv/kvserver/replica_circuit_breaker.go new file mode 100644 index 000000000000..fa5918d63442 --- /dev/null +++ b/pkg/kv/kvserver/replica_circuit_breaker.go @@ -0,0 +1,229 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/circuit" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "go.etcd.io/etcd/raft/v3" +) + +type replicaInCircuitBreaker interface { + Clock() *hlc.Clock + Desc() *roachpb.RangeDescriptor + Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) + slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) + replicaUnavailableError() error +} + +var defaultReplicaCircuitBreakerSlowReplicationThreshold = envutil.EnvOrDefaultDuration( + "COCKROACH_REPLICA_CIRCUIT_BREAKER_SLOW_REPLICATION_THRESHOLD", 0, +) + +var replicaCircuitBreakerSlowReplicationThreshold = settings.RegisterPublicDurationSettingWithExplicitUnit( + settings.SystemOnly, + "kv.replica_circuit_breaker.slow_replication_threshold", + "duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)", + defaultReplicaCircuitBreakerSlowReplicationThreshold, + settings.NonNegativeDuration, +) + +// replicaCircuitBreaker is a wrapper around *circuit.Breaker that makes it +// convenient for use as a per-Replica circuit breaker. +type replicaCircuitBreaker struct { + ambCtx log.AmbientContext + stopper *stop.Stopper + r replicaInCircuitBreaker + st *cluster.Settings + wrapped *circuit.Breaker +} + +func (br *replicaCircuitBreaker) enabled() bool { + return replicaCircuitBreakerSlowReplicationThreshold.Get(&br.st.SV) > 0 && + br.st.Version.IsActive(context.Background(), clusterversion.ProbeRequest) +} + +func (br *replicaCircuitBreaker) newError() error { + return br.r.replicaUnavailableError() +} + +func (br *replicaCircuitBreaker) TripAsync() { + if !br.enabled() { + return + } + + _ = br.stopper.RunAsyncTask( + br.ambCtx.AnnotateCtx(context.Background()), "trip-breaker", + func(ctx context.Context) { + br.wrapped.Report(br.newError()) + }, + ) +} + +type signaller interface { + Err() error + C() <-chan struct{} +} + +type neverTripSignaller struct{} + +func (s neverTripSignaller) Err() error { return nil } +func (s neverTripSignaller) C() <-chan struct{} { return nil } + +func (br *replicaCircuitBreaker) Signal() signaller { + if !br.enabled() { + return neverTripSignaller{} + } + return br.wrapped.Signal() +} + +func newReplicaCircuitBreaker( + cs *cluster.Settings, + stopper *stop.Stopper, + ambientCtx log.AmbientContext, + r replicaInCircuitBreaker, +) *replicaCircuitBreaker { + br := &replicaCircuitBreaker{ + stopper: stopper, + ambCtx: ambientCtx, + r: r, + st: cs, + } + + br.wrapped = circuit.NewBreaker(circuit.Options{ + Name: "breaker", // log bridge has ctx tags + AsyncProbe: br.asyncProbe, + EventHandler: &circuit.EventLogger{ + Log: func(buf redact.StringBuilder) { + log.Infof(ambientCtx.AnnotateCtx(context.Background()), "%s", buf) + }, + }, + }) + + return br +} + +type probeKey struct{} + +func isCircuitBreakerProbe(ctx context.Context) bool { + return ctx.Value(probeKey{}) != nil +} + +func withCircuitBreakerProbeMarker(ctx context.Context) context.Context { + return context.WithValue(ctx, probeKey{}, probeKey{}) +} + +func (br *replicaCircuitBreaker) asyncProbe(report func(error), done func()) { + bgCtx := br.ambCtx.AnnotateCtx(context.Background()) + if err := br.stopper.RunAsyncTask(bgCtx, "replica-probe", func(ctx context.Context) { + defer done() + + if !br.enabled() { + report(nil) + return + } + + err := sendProbe(ctx, br.r) + report(err) + }); err != nil { + done() + } +} + +func sendProbe(ctx context.Context, r replicaInCircuitBreaker) error { + ctx = withCircuitBreakerProbeMarker(ctx) + desc := r.Desc() + if !desc.IsInitialized() { + return nil + } + ba := roachpb.BatchRequest{} + ba.Timestamp = r.Clock().Now() + ba.RangeID = r.Desc().RangeID + probeReq := &roachpb.ProbeRequest{} + probeReq.Key = desc.StartKey.AsRawKey() + ba.Add(probeReq) + thresh, ok := r.slowReplicationThreshold(&ba) + if !ok { + // Breakers are disabled now. + return nil + } + if err := contextutil.RunWithTimeout(ctx, "probe", thresh, + func(ctx context.Context) error { + _, pErr := r.Send(ctx, ba) + return pErr.GoError() + }, + ); err != nil { + return errors.CombineErrors(r.replicaUnavailableError(), err) + } + return nil +} + +func replicaUnavailableError( + desc *roachpb.RangeDescriptor, + replDesc roachpb.ReplicaDescriptor, + lm liveness.IsLiveMap, + rs *raft.Status, +) error { + nonLiveRepls := roachpb.MakeReplicaSet(nil) + for _, rDesc := range desc.Replicas().Descriptors() { + if lm[rDesc.NodeID].IsLive { + continue + } + nonLiveRepls.AddReplica(rDesc) + } + + canMakeProgress := desc.Replicas().CanMakeProgress( + func(replDesc roachpb.ReplicaDescriptor) bool { + return lm[replDesc.NodeID].IsLive + }, + ) + + // Ensure good redaction. + var _ redact.SafeFormatter = nonLiveRepls + var _ redact.SafeFormatter = desc + var _ redact.SafeFormatter = replDesc + + err := errors.Errorf("replica %s of %s is unavailable", desc, replDesc) + err = errors.Wrapf( + err, + "raft status: %+v", redact.Safe(rs), // raft status contains no PII + ) + if len(nonLiveRepls.AsProto()) > 0 { + err = errors.Wrapf(err, "replicas on non-live nodes: %v (lost quorum: %t)", nonLiveRepls, !canMakeProgress) + } + + return err +} + +func (r *Replica) replicaUnavailableError() error { + desc := r.Desc() + replDesc, _ := desc.GetReplicaDescriptor(r.store.StoreID()) + + var isLiveMap liveness.IsLiveMap + if nl := r.store.cfg.NodeLiveness; nl != nil { // exclude unit test + isLiveMap = nl.GetIsLiveMap() + } + return replicaUnavailableError(desc, replDesc, isLiveMap, r.RaftStatus()) +} diff --git a/pkg/kv/kvserver/replica_circuit_breaker_test.go b/pkg/kv/kvserver/replica_circuit_breaker_test.go new file mode 100644 index 000000000000..79426fb0a1da --- /dev/null +++ b/pkg/kv/kvserver/replica_circuit_breaker_test.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "path/filepath" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/redact" + "go.etcd.io/etcd/raft/v3" +) + +func TestReplicaUnavailableError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var repls roachpb.ReplicaSet + repls.AddReplica(roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 10, ReplicaID: 100}) + repls.AddReplica(roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 20, ReplicaID: 200}) + desc := roachpb.NewRangeDescriptor(10, roachpb.RKey("a"), roachpb.RKey("z"), repls) + var ba roachpb.BatchRequest + ba.Add(&roachpb.RequestLeaseRequest{}) + lm := liveness.IsLiveMap{ + 1: liveness.IsLiveMapEntry{IsLive: true}, + } + rs := raft.Status{} + err := replicaUnavailableError(desc, desc.Replicas().AsProto()[0], lm, &rs) + echotest.Require(t, string(redact.Sprint(err)), filepath.Join("testdata", "replica_unavailable_error.txt")) +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index fbb1b1dc0c69..5771e6f3e651 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -127,6 +127,8 @@ func newUnloadedReplica( r.splitQueueThrottle = util.Every(splitQueueThrottleDuration) r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration) + + r.breaker = newReplicaCircuitBreaker(store.cfg.Settings, store.stopper, r.AmbientContext, r) return r } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index bfafefb9ebdf..8bdd0bce36f4 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -68,6 +68,10 @@ type ProposalData struct { // last (re-)proposed. proposedAtTicks int + // createdAtTicks is the (logical) time at which this command was + // *first* proposed. + createdAtTicks int + // command is serialized and proposed to raft. In the event of // reproposals its MaxLeaseIndex field is mutated. command *kvserverpb.RaftCommand diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index e4435633ee43..28209fb97332 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -1021,6 +1021,9 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { // Record when the proposal was submitted to Raft so that we can later // decide if/when to re-propose it. p.proposedAtTicks = rp.mu.ticks + if p.createdAtTicks == 0 { + p.createdAtTicks = rp.mu.ticks + } rp.mu.proposals[p.idKey] = p } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 52fbd1f27941..a125963b71c4 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1057,6 +1057,21 @@ func (r *Replica) hasRaftReadyRLocked() bool { return r.mu.internalRaftGroup.HasReady() } +// slowReplicationThreshold returns the threshold after which in-flight +// replicated commands should be considered "stuck" and should trip the +// per-Replica circuit breaker. The boolean indicates whether this +// mechanism is enabled; if it isn't no action should be taken. +func (r *Replica) slowReplicationThreshold(ba *roachpb.BatchRequest) (time.Duration, bool) { + if knobs := r.store.TestingKnobs(); knobs != nil && knobs.SlowReplicationThresholdOverride != nil { + if dur := knobs.SlowReplicationThresholdOverride(ba); dur > 0 { + return dur, true + } + // Fall through. + } + dur := replicaCircuitBreakerSlowReplicationThreshold.Get(&r.store.cfg.Settings.SV) + return dur, dur > 0 +} + //go:generate stringer -type refreshRaftReason type refreshRaftReason int @@ -1092,9 +1107,29 @@ func (r *Replica) refreshProposalsLocked( log.Fatalf(ctx, "refreshAtDelta specified for reason %s != reasonTicks", reason) } + var maxSlowProposalDurationRequest *roachpb.BatchRequest + var maxSlowProposalDuration time.Duration + var slowProposalCount int64 var reproposals pendingCmdSlice for _, p := range r.mu.proposals { - if p.command.MaxLeaseIndex == 0 { + slowReplicationThreshold, ok := r.slowReplicationThreshold(p.Request) + // NB: ticks can be delayed, in which this detection would kick in too late + // as well. This is unlikely to become a concern since the configured + // durations here should be very large compared to the refresh interval, and + // so delays shouldn't dramatically change the detection latency. + inflightDuration := r.store.cfg.RaftTickInterval * time.Duration(r.mu.ticks-p.createdAtTicks) + if ok && inflightDuration > slowReplicationThreshold { + if maxSlowProposalDuration < inflightDuration { + maxSlowProposalDuration = inflightDuration + maxSlowProposalDurationRequest = p.Request + slowProposalCount++ + } + } + // TODO(tbg): the enabled() call is a hack until we've figured out what to + // do about #74711. If leases are finished instead of reproposed, they can't + // ever trigger the breaker, which is bad as there usually isn't anything + // else around that will. + if p.command.MaxLeaseIndex == 0 && !r.breaker.enabled() { // Commands without a MaxLeaseIndex cannot be reproposed, as they might // apply twice. We also don't want to ask the proposer to retry these // special commands. @@ -1141,6 +1176,30 @@ func (r *Replica) refreshProposalsLocked( } } + r.store.metrics.SlowRaftRequests.Update(slowProposalCount) + + // If the breaker isn't tripped yet but we've detected commands that have + // taken too long to replicate, trip the breaker now. + // + // NB: we still keep reproposing commands on this and subsequent ticks + // even though this seems strictly counter-productive, except perhaps + // for the probe's proposals. We could consider being more strict here + // which could avoid build-up of raft log entries during outages, see + // for example: + // https://github.com/cockroachdb/cockroach/issues/60612 + if r.breaker.Signal().Err() == nil && maxSlowProposalDuration > 0 { + log.Warningf(ctx, + "have been waiting %.2fs for slow proposal %s", + maxSlowProposalDuration.Seconds(), maxSlowProposalDurationRequest, + ) + // NB: this is async because we're holding lots of locks here, and we want + // to avoid having to pass all the information about the replica into the + // breaker (since the breaker needs access to this information at will to + // power the probe anyway). Over time, we anticipate there being multiple + // mechanisms which trip the breaker. + r.breaker.TripAsync() + } + if log.V(1) && len(reproposals) > 0 { log.Infof(ctx, "pending commands: reproposing %d (at %d.%d) %s", diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index a0482910b9c2..7eadd4a8084c 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -436,6 +436,11 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // Send the RequestLeaseRequest or TransferLeaseRequest and wait for the new // lease to be applied. if pErr == nil { + // The Replica circuit breakers together with round-tripping a ProbeRequest + // here before asking for the lease could provide an alternative, simpler + // solution to the the below issue: + // + // https://github.com/cockroachdb/cockroach/issues/37906 ba := roachpb.BatchRequest{} ba.Timestamp = p.repl.store.Clock().Now() ba.RangeID = p.repl.RangeID diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 86956fb884ea..a7bd3e7f2776 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -96,6 +96,68 @@ func (r *Replica) Send( return r.sendWithRangeID(ctx, r.RangeID, &ba) } +func (r *Replica) checkCircuitBreaker( + ctx context.Context, +) (context.Context, func(), signaller, error) { + ctx, cancel := context.WithCancel(ctx) + // NB: brSig will never trip if circuit breakers are not enabled. + brSig := r.breaker.Signal() + if isCircuitBreakerProbe(ctx) { + brSig = neverTripSignaller{} + } + + if err := brSig.Err(); err != nil { + // TODO(tbg): we may want to exclude some requests from this check, or allow + // requests to exclude themselves from the check (via their header). + return ctx, nil, nil, err + } + + // NB: this is a total crutch, see: + // https://github.com/cockroachdb/cockroach/issues/74707 + // It will do until breakers default to on: + // https://github.com/cockroachdb/cockroach/issues/74705 + if ch := brSig.C(); ch != nil { + _ = r.store.Stopper().RunAsyncTask(ctx, "watch", func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-ch: + cancel() + } + }) + } + + return ctx, cancel, brSig, nil +} + +func maybeAdjustWithBreakerError(pErr *roachpb.Error, brErr error) *roachpb.Error { + if pErr == nil || brErr == nil { + return pErr + } + err := pErr.GoError() + if ae := (&roachpb.AmbiguousResultError{}); errors.As(err, &ae) { + // The breaker tripped while a command was inflight, so we have to + // propagate an ambiguous result. We don't want to replace it, but there + // is a way to stash an Error in it so we use that. + // + // TODO(tbg): could also wrap it; there is no other write to WrappedErr + // in the codebase and it might be better to remove it. Nested *Errors + // are not a good idea. + wrappedErr := brErr + if ae.WrappedErr != nil { + wrappedErr = errors.Wrapf(brErr, "%v", ae.WrappedErr) + } + ae.WrappedErr = roachpb.NewError(wrappedErr) + return roachpb.NewError(ae) + } else if le := (&roachpb.NotLeaseHolderError{}); errors.As(err, &le) { + // When a lease acquisition triggered by this request is short-circuited + // by the breaker, it will return an opaque NotLeaseholderError, which we + // replace with the breaker's error. + return roachpb.NewError(errors.CombineErrors(brErr, le)) + } + return pErr +} + // sendWithRangeID takes an unused rangeID argument so that the range // ID will be accessible in stack traces (both in panics and when // sampling goroutines from a live server). This line is subject to @@ -109,7 +171,7 @@ func (r *Replica) Send( // github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) func (r *Replica) sendWithRangeID( ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { +) (_ *roachpb.BatchResponse, rErr *roachpb.Error) { var br *roachpb.BatchResponse if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { r.leaseholderStats.record(ba.Header.GatewayNodeID) @@ -126,6 +188,16 @@ func (r *Replica) sendWithRangeID( return nil, roachpb.NewError(err) } + // Circuit breaker handling. + ctx, cancel, brSig, err := r.checkCircuitBreaker(ctx) + if err != nil { + return nil, roachpb.NewError(err) + } + defer func() { + rErr = maybeAdjustWithBreakerError(rErr, brSig.Err()) + cancel() + }() + if err := r.maybeBackpressureBatch(ctx, ba); err != nil { return nil, roachpb.NewError(err) } @@ -137,7 +209,7 @@ func (r *Replica) sendWithRangeID( } // NB: must be performed before collecting request spans. - ba, err := maybeStripInFlightWrites(ba) + ba, err = maybeStripInFlightWrites(ba) if err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index a572f76f7356..2b6a7d52143e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -66,7 +66,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" - "github.com/cockroachdb/redact" "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -12990,44 +12989,6 @@ func enableTraceDebugUseAfterFree() (restore func()) { return func() { trace.DebugUseAfterFinish = prev } } -func TestRangeUnavailableMessage(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - var repls roachpb.ReplicaSet - repls.AddReplica(roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 10, ReplicaID: 100}) - repls.AddReplica(roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 20, ReplicaID: 200}) - desc := roachpb.NewRangeDescriptor(10, roachpb.RKey("a"), roachpb.RKey("z"), repls) - dur := time.Minute - var ba roachpb.BatchRequest - ba.Add(&roachpb.RequestLeaseRequest{}) - lm := liveness.IsLiveMap{ - 1: liveness.IsLiveMapEntry{IsLive: true}, - } - rs := raft.Status{} - var s redact.StringBuilder - rangeUnavailableMessage(&s, desc, lm, &rs, &ba, dur) - const exp = `have been waiting 60.00s for proposing command RequestLease [‹/Min›,‹/Min›). -This range is likely unavailable. -Please submit this message to Cockroach Labs support along with the following information: - -Descriptor: r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0] -Live: (n1,s10):1 -Non-live: (n2,s20):2 -Raft Status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"} - -and a copy of https://yourhost:8080/#/reports/range/10 - -If you are using CockroachDB Enterprise, reach out through your -support contract. Otherwise, please open an issue at: - - https://github.com/cockroachdb/cockroach/issues/new/choose -` - act := s.RedactableString() - t.Log(act) - require.EqualValues(t, exp, act) -} - // Test that, depending on the request's ClientRangeInfo, descriptor and lease // updates are returned. func TestRangeInfoReturned(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 723a3c7eca68..c2965c8cce5b 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -33,8 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" - "go.etcd.io/etcd/raft/v3" ) // migrateApplicationTimeout is the duration to wait for a Migrate command @@ -173,26 +170,6 @@ func (r *Replica) executeWriteBatch( // If the command was accepted by raft, wait for the range to apply it. ctxDone := ctx.Done() shouldQuiesce := r.store.stopper.ShouldQuiesce() - startPropTime := timeutil.Now() - slowTimer := timeutil.NewTimer() - defer slowTimer.Stop() - slowTimer.Reset(r.store.cfg.SlowReplicationThreshold) - // NOTE: this defer was moved from a case in the select statement to here - // because escape analysis does a better job avoiding allocations to the - // heap when defers are unconditional. When this was in the slowTimer select - // case, it was causing pErr to escape. - defer func() { - if slowTimer.Read { - r.store.metrics.SlowRaftRequests.Dec(1) - log.Infof( - ctx, - "slow command %s finished after %.2fs with error %v", - ba, - timeutil.Since(startPropTime).Seconds(), - pErr, - ) - } - }() for { select { @@ -286,15 +263,6 @@ func (r *Replica) executeWriteBatch( return propResult.Reply, nil, propResult.Err - case <-slowTimer.C: - slowTimer.Read = true - r.store.metrics.SlowRaftRequests.Inc(1) - - var s redact.StringBuilder - rangeUnavailableMessage(&s, r.Desc(), r.store.cfg.NodeLiveness.GetIsLiveMap(), - r.RaftStatus(), ba, timeutil.Since(startPropTime)) - log.Errorf(ctx, "range unavailable: %v", s) - case <-ctxDone: // If our context was canceled, return an AmbiguousResultError, // which indicates to the caller that the command may have executed. @@ -345,54 +313,6 @@ func (r *Replica) executeWriteBatch( } } -func rangeUnavailableMessage( - s *redact.StringBuilder, - desc *roachpb.RangeDescriptor, - lm liveness.IsLiveMap, - rs *raft.Status, - ba *roachpb.BatchRequest, - dur time.Duration, -) { - var liveReplicas, otherReplicas []roachpb.ReplicaDescriptor - for _, rDesc := range desc.Replicas().Descriptors() { - if lm[rDesc.NodeID].IsLive { - liveReplicas = append(liveReplicas, rDesc) - } else { - otherReplicas = append(otherReplicas, rDesc) - } - } - - // Ensure that these are going to redact nicely. - var _ redact.SafeFormatter = ba - var _ redact.SafeFormatter = desc - var _ redact.SafeFormatter = roachpb.ReplicaSet{} - - s.Printf(`have been waiting %.2fs for proposing command %s. -This range is likely unavailable. -Please submit this message to Cockroach Labs support along with the following information: - -Descriptor: %s -Live: %s -Non-live: %s -Raft Status: %+v - -and a copy of https://yourhost:8080/#/reports/range/%d - -If you are using CockroachDB Enterprise, reach out through your -support contract. Otherwise, please open an issue at: - - https://github.com/cockroachdb/cockroach/issues/new/choose -`, - dur.Seconds(), - ba, - desc, - roachpb.MakeReplicaSet(liveReplicas), - roachpb.MakeReplicaSet(otherReplicas), - redact.Safe(rs), // raft status contains no PII - desc.RangeID, - ) -} - // canAttempt1PCEvaluation looks at the batch and decides whether it can be // executed as 1PC. func (r *Replica) canAttempt1PCEvaluation( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 1e43b2ce8b5a..a6af7e40f153 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1059,10 +1059,6 @@ type StoreConfig struct { // KVAdmissionController is an optional field used for admission control. KVAdmissionController KVAdmissionController - - // SlowReplicationThreshold is the duration after which an in-flight proposal - // is tracked in the requests.slow.raft metric. - SlowReplicationThreshold time.Duration } // ConsistencyTestingKnobs is a BatchEvalTestingKnobs struct used to control the @@ -1109,9 +1105,6 @@ func (sc *StoreConfig) SetDefaults() { if sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction == 0 { sc.TestingKnobs.GossipWhenCapacityDeltaExceedsFraction = defaultGossipWhenCapacityDeltaExceedsFraction } - if sc.SlowReplicationThreshold == 0 { - sc.SlowReplicationThreshold = base.SlowRequestThreshold - } } // GetStoreConfig exposes the config used for this store. diff --git a/pkg/kv/kvserver/testdata/replica_unavailable_error.txt b/pkg/kv/kvserver/testdata/replica_unavailable_error.txt new file mode 100644 index 000000000000..b11beef6d0e1 --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_unavailable_error.txt @@ -0,0 +1,3 @@ +echo +---- +replicas on non-live nodes: (n2,s20):2 (lost quorum: true): raft status: {"id":"0","term":0,"vote":"0","commit":0,"lead":"0","raftState":"StateFollower","applied":0,"progress":{},"leadtransferee":"0"}: replica r10:‹{a-z}› [(n1,s10):1, (n2,s20):2, next=3, gen=0] of (n1,s10):1 is unavailable diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 9ec3168eecb3..8e21061a3373 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -81,6 +81,10 @@ type StoreTestingKnobs struct { // error returned to the client, or to simulate network failures. TestingResponseFilter kvserverbase.ReplicaResponseFilter + // SlowReplicationThresholdOverride is an interceptor that allows setting a + // per-Batch SlowReplicationThreshold. + SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration + // TestingRangefeedFilter is called before a replica processes a rangefeed // in order for unit tests to modify the request, error returned to the client // or data. diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index d319da006261..7ecb8f742743 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -650,6 +650,9 @@ func (e *AmbiguousResultError) Error() string { } func (e *AmbiguousResultError) message(_ *Error) string { + if e.WrappedErr != nil { + return fmt.Sprintf("result is ambiguous (%v)", e.WrappedErr) + } return fmt.Sprintf("result is ambiguous (%s)", e.Message) } diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 9cace4a7866c..4ec69e9cced0 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1167,6 +1167,7 @@ func TestLint(t *testing.T) { ":!*.pb.go", ":!*.pb.gw.go", ":!kv/kvclient/kvcoord/lock_spans_over_budget_error.go", + ":!roachpb/replica_unavailable_error.go", ":!sql/pgwire/pgerror/constraint_name.go", ":!sql/pgwire/pgerror/severity.go", ":!sql/pgwire/pgerror/with_candidate_code.go", diff --git a/pkg/util/hlc/hlc.go b/pkg/util/hlc/hlc.go index 794ec7b753d6..075b461ecbcc 100644 --- a/pkg/util/hlc/hlc.go +++ b/pkg/util/hlc/hlc.go @@ -160,6 +160,16 @@ func (m *HybridManualClock) Increment(nanos int64) { m.mu.Unlock() } +// Forward sets the wall time to the supplied timestamp this moves the clock +// forward in time. +func (m *HybridManualClock) Forward(nanos int64) { + m.mu.Lock() + if nanos > m.mu.nanos { + m.mu.nanos = nanos + } + m.mu.Unlock() +} + // Pause pauses the hybrid manual clock; the passage of time no longer causes // the clock to tick. Increment can still be used, though. func (m *HybridManualClock) Pause() {