diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index eb370481f5b8..d6c3e0c60749 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -231,7 +231,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { defer log.Scope(t).Close(t) // This test prints a consistency checker diff, so it's - // good to make sure we're overly redacting said diff. + // good to make sure we're not overly redacting said diff. defer log.TestingSetRedactable(true)() // Test uses sticky registry to have persistent pebble state that could diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d0ddb55950af..f3e65fd558b8 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -370,6 +370,8 @@ type Replica struct { // timestamp, independent of the source. sideTransportClosedTimestamp sidetransportAccess + checksumStorage checksumStorage + mu struct { // Protects all fields in the mu struct. syncutil.RWMutex @@ -556,9 +558,6 @@ type Replica struct { // live node will not lose leaseholdership. lastUpdateTimes lastUpdateTimesMap - // Computed checksum at a snapshot UUID. - checksums map[uuid.UUID]replicaChecksum - // proposalQuota is the quota pool maintained by the lease holder where // incoming writes acquire quota from a fixed quota pool before going // through. If there is no quota available, the write is throttled diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 3fc943c63155..2328a387ef61 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -17,6 +17,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -37,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -65,19 +67,6 @@ const replicaChecksumGCInterval = time.Hour // know old CRDB versions (<19.1 at time of writing) were not involved. var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false) -// replicaChecksum contains progress on a replica checksum computation. -type replicaChecksum struct { - CollectChecksumResponse - // started is true if the checksum computation has started. - started bool - // If gcTimestamp is nonzero, GC this checksum after gcTimestamp. gcTimestamp - // is zero if and only if the checksum computation is in progress. - gcTimestamp time.Time - // This channel is closed after the checksum is computed, and is used - // as a notification. - notify chan struct{} -} - // CheckConsistency runs a consistency check on the range. It first applies a // ComputeChecksum through Raft and then issues CollectChecksum commands to the // other replicas. These are inspected and a CheckConsistencyResponse is assembled. @@ -393,6 +382,15 @@ func (r *Replica) RunConsistencyCheck( if len(results) > 0 { masterChecksum = results[0].Response.Checksum } + if ctx.Err() != nil { + // If our Context has timed out, we still want to visit all of the replicas to + // terminate their checksum computations. A 1s timeout is enough to get the RPC + // onto the other Replica, where the cancellation will then short-circuit the + // in-progress cancellation. + var cancel func() + ctx, cancel = context.WithTimeout(r.AnnotateCtx(context.Background()), time.Second) // nolint:context + defer cancel() + } resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum) resultCh <- ConsistencyCheckResult{ Replica: replica, @@ -401,7 +399,7 @@ func (r *Replica) RunConsistencyCheck( } }); err != nil { wg.Done() - // If we can't start tasks, the node is likely draining. Just return the error verbatim. + // If we can't start tasks, the store is draining. Just return the error verbatim. return nil, err } @@ -411,10 +409,10 @@ func (r *Replica) RunConsistencyCheck( if len(results) == 0 { wg.Wait() result := <-resultCh - if err := result.Err; err != nil { - // If we can't compute the local checksum, give up. - return nil, errors.Wrap(err, "computing own checksum") - } + // It's tempting to return early if we can't "even" collect the local + // checksum, but we want to collect checksums to make sure there aren't + // any consistency check computations left running when this consistency + // check wraps up. results = append(results, result) } } @@ -430,146 +428,174 @@ func (r *Replica) RunConsistencyCheck( return results, nil } -func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { - for id, val := range r.mu.checksums { - // The timestamp is valid only if set. - if !val.gcTimestamp.IsZero() && now.After(val.gcTimestamp) { - delete(r.mu.checksums, id) +// replicaChecksum contains progress on a replica checksum computation. +type replicaChecksum struct { + deadline time.Time // once in the past, entry is considered abandoned + + cancel atomic.Value // func(); non-nil iff computation started + + notify chan struct{} // closed when computation done/failed + CollectChecksumResponse // immutable on close(notify) +} + +// SignalStart must be called only once. It indicates that the consistency +// checksum computation is now ongoing, signalling WaitStarted. The provided +// cancel function aborts the computation. +func (c *replicaChecksum) SignalStart(cancel func()) { + c.cancel.Store(cancel) +} + +// SignalResult must be called only once, and after SignalStart. It populates +// the replicaChecksum with the result of the checksum computation, and signals +// any waiter(s) on WaitResult. +func (c *replicaChecksum) SignalResult(res *replicaHash, snapData *roachpb.RaftSnapshotData) { + c.deadline = timeutil.Now().Add(replicaChecksumGCInterval) + if res != nil { + c.Checksum = res.SHA512[:] + delta := res.PersistedMS + delta.Subtract(res.RecomputedMS) + c.Delta = enginepb.MVCCStatsDelta(delta) + c.Persisted = res.PersistedMS + c.Snapshot = snapData + } + close(c.notify) +} + +// WaitStarted waits for up to maxWait for the checksum computation to begin. If +// this does not occur, or the Context cancels, returns an error. As a special +// case, a zero maxWait disables the corresponding upper limit. +func (c *replicaChecksum) WaitStarted(ctx context.Context, maxWait time.Duration) error { + var ch <-chan time.Time + if maxWait > 0 { + ch = time.After(maxWait) + } + // Wait + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch: + if c.cancel.Load() == nil { + return errors.Errorf("max wait of %.2fs exceeded", maxWait.Seconds()) } + return nil + case <-c.notify: + // Not only did it start, it finished. + return nil } } -// getChecksum waits for the result of ComputeChecksum and returns it. -// It returns false if there is no checksum being computed for the id, -// or it has already been GCed. -func (r *Replica) getChecksum(ctx context.Context, id uuid.UUID) (replicaChecksum, error) { - now := timeutil.Now() - r.mu.Lock() - r.gcOldChecksumEntriesLocked(now) - c, ok := r.mu.checksums[id] - if !ok { - // TODO(tbg): we need to unconditionally set a gcTimestamp or this - // request can simply get stuck forever or cancel anyway and leak an - // entry in r.mu.checksums. - if d, dOk := ctx.Deadline(); dOk { - c.gcTimestamp = d - } - c.notify = make(chan struct{}) - r.mu.checksums[id] = c +// WaitResult waits until the result of the checksum computation is available. +// Callers should invoke WaitStarted before (and receive a nil error from it) to +// avoid waiting for a computation that will never even begin. WaitResult +// respects context cancellation. +func (c *replicaChecksum) WaitResult(ctx context.Context) (*CollectChecksumResponse, error) { + if err := c.WaitStarted(ctx, 0 /* maxWait */); err != nil { + return nil, err + } + if len(c.CollectChecksumResponse.Checksum) == 0 { + return nil, errors.New("no checksum found") } - r.mu.Unlock() + return &c.CollectChecksumResponse, nil +} - // Wait for the checksum to compute or at least to start. - computed, err := r.checksumInitialWait(ctx, id, c.notify) - if err != nil { - return replicaChecksum{}, err +type checksumStorage struct { + mu struct { + syncutil.Mutex + m map[uuid.UUID]*replicaChecksum } - // If the checksum started, but has not completed commit - // to waiting the full deadline. - if !computed { - _, err = r.checksumWait(ctx, id, c.notify, nil) - if err != nil { - return replicaChecksum{}, err - } +} + +// GetOrInit returns the checksum computation, creating it with the supplied +// deadline if necessary. Once the deadline is elapsed, calls to GC will remove +// the computation. When the entry already exists and the provided deadine is +// stricter, it will replace the previous deadline. +func (css *checksumStorage) GetOrInit(id uuid.UUID, deadline time.Time) *replicaChecksum { + css.mu.Lock() + defer css.mu.Unlock() + + if css.mu.m == nil { + // "Make the zero value useful". + css.mu.m = map[uuid.UUID]*replicaChecksum{} } - if log.V(1) { - log.Infof(ctx, "waited for compute checksum for %s", timeutil.Since(now)) + c, ok := css.mu.m[id] + if !ok { + c = &replicaChecksum{ + notify: make(chan struct{}), + deadline: deadline, + } + css.mu.m[id] = c } - r.mu.RLock() - c, ok = r.mu.checksums[id] - r.mu.RUnlock() - // If the checksum wasn't found or the checksum could not be computed, error out. - // The latter case can occur when there's a version mismatch or, more generally, - // when the (async) checksum computation fails. - if !ok || c.Checksum == nil { - return replicaChecksum{}, errors.Errorf("no checksum found (ID = %s)", id) + if c.deadline.After(deadline) { + c.deadline = deadline } - return c, nil + return c } -// Waits for the checksum to be available or for the checksum to start computing. -// If we waited for 10% of the deadline and it has not started, then it's -// unlikely to start because this replica is most likely being restored from -// snapshots. -func (r *Replica) checksumInitialWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, -) (bool, error) { - d, dOk := ctx.Deadline() - // The max wait time should be 5 seconds, so we dont end up waiting for - // minutes for a huge range. - maxInitialWait := 5 * time.Second - var initialWait <-chan time.Time - if dOk { - duration := time.Duration(timeutil.Until(d).Nanoseconds() / 10) - if duration > maxInitialWait { - duration = maxInitialWait - } - initialWait = time.After(duration) - } else { - initialWait = time.After(maxInitialWait) +// Delete removes a checksum computation, canceling it if it has already started. +func (css *checksumStorage) Delete(id uuid.UUID) (existed bool) { + css.mu.Lock() + defer css.mu.Unlock() + return css.delLocked(id) +} + +func (css *checksumStorage) delLocked(id uuid.UUID) (existed bool) { + c, existed := css.mu.m[id] + if !existed { + return false } - return r.checksumWait(ctx, id, notify, initialWait) + delete(css.mu.m, id) + if f := c.cancel.Load(); f != nil { + f.(func())() + } + return true // existed } -// checksumWait waits for the checksum to be available or for the computation -// to start within the initialWait time. The bool return flag is used to -// indicate if a checksum is available (true) or if the initial wait has expired -// and the caller should wait more, since the checksum computation started. -func (r *Replica) checksumWait( - ctx context.Context, id uuid.UUID, notify chan struct{}, initialWait <-chan time.Time, -) (bool, error) { - // Wait - select { - case <-r.store.Stopper().ShouldQuiesce(): - return false, - errors.Errorf("store quiescing while waiting for compute checksum (ID = %s)", id) - case <-ctx.Done(): - return false, - errors.Wrapf(ctx.Err(), "while waiting for compute checksum (ID = %s)", id) - case <-initialWait: - { - r.mu.Lock() - started := r.mu.checksums[id].started - r.mu.Unlock() - if !started { - return false, - errors.Errorf("checksum computation did not start in time for (ID = %s)", id) - } - return false, nil +func (css *checksumStorage) GC(now time.Time) { + css.mu.Lock() + defer css.mu.Unlock() + + for id, c := range css.mu.m { + if c.deadline.Before(now) { + css.delLocked(id) } - case <-notify: - return true, nil } } -// computeChecksumDone adds the computed checksum, sets a deadline for GCing the -// checksum, and sends out a notification. -func (r *Replica) computeChecksumDone( - ctx context.Context, id uuid.UUID, result *replicaHash, snapshot *roachpb.RaftSnapshotData, -) { - r.mu.Lock() - defer r.mu.Unlock() - if c, ok := r.mu.checksums[id]; ok { - if result != nil { - c.Checksum = result.SHA512[:] - - delta := result.PersistedMS - delta.Subtract(result.RecomputedMS) - c.Delta = enginepb.MVCCStatsDelta(delta) - c.Persisted = result.PersistedMS - } - c.gcTimestamp = timeutil.Now().Add(replicaChecksumGCInterval) - c.Snapshot = snapshot - r.mu.checksums[id] = c - // Notify - close(c.notify) - } else { - // ComputeChecksum adds an entry into the map, and the entry can - // only be GCed once the gcTimestamp is set above. Something - // really bad happened. - log.Errorf(ctx, "no map entry for checksum (ID = %s)", id) +// getChecksum waits for the result of ComputeChecksum and returns it. An error +// is returned if the checksum could not be obtained. This method should only be +// called once per computation as it cleans up the in-memory state, including +// cancelling the inflight computation upon return if necessary (in the case of +// ctx cancellation). +func getChecksum( + ctx context.Context, css *checksumStorage, id uuid.UUID, +) (*CollectChecksumResponse, error) { + tBegin := timeutil.Now() + defer css.GC(tBegin) + + deadline := tBegin.Add(replicaChecksumGCInterval) + if t, ok := ctx.Deadline(); ok && t.Before(deadline) { + deadline = t + } + + c := css.GetOrInit(id, deadline) + // Whether the consistency check is still running or will + // never happen is immaterial; if we are no longer waiting + // for the result, stop tracking the computation. This is + // important, see: + // + // https://github.com/cockroachdb/cockroach/pull/75448 + defer css.Delete(id) + + maxWait := 5 * time.Second + if dur := deadline.Sub(tBegin); dur < maxWait && dur > 0 { + maxWait = dur } + if err := c.WaitStarted(ctx, maxWait); err != nil { + return nil, errors.Wrapf(err, "waiting for checksum %s to start", id.Short()) + } + // Checksum computation has started, so commit to waiting for its completion. + return c.WaitResult(ctx) } type replicaHash struct { @@ -577,9 +603,9 @@ type replicaHash struct { PersistedMS, RecomputedMS enginepb.MVCCStats } -// sha512 computes the SHA512 hash of all the replica data at the snapshot. +// sha512Checksum computes the SHA512 hash of all the replica data at the snapshot. // It will dump all the kv data into snapshot if it is provided. -func (*Replica) sha512( +func sha512Checksum( ctx context.Context, desc roachpb.RangeDescriptor, snap storage.Reader, @@ -706,38 +732,24 @@ func (*Replica) sha512( } func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.ComputeChecksum) { - stopper := r.store.Stopper() - now := timeutil.Now() - r.mu.Lock() - var notify chan struct{} - if c, ok := r.mu.checksums[cc.ChecksumID]; !ok { - // There is no record of this ID. Make a new notification. - notify = make(chan struct{}) - } else if !c.started { - // A CollectChecksumRequest is waiting on the existing notification. - notify = c.notify - } else { - log.Fatalf(ctx, "attempted to apply ComputeChecksum command with duplicated checksum ID %s", - cc.ChecksumID) - } - - r.gcOldChecksumEntriesLocked(now) + css := &r.checksumStorage + tBegin := timeutil.Now() + css.GC(tBegin) - // Create an entry with checksum == nil and gcTimestamp unset. - r.mu.checksums[cc.ChecksumID] = replicaChecksum{started: true, notify: notify} - desc := *r.mu.state.Desc - r.mu.Unlock() + // NB: we are careful in this method to not tie `ctx` to the computation, + // as we are intentionally spawning a possibly long-running task below. + c := css.GetOrInit(cc.ChecksumID, tBegin.Add(replicaChecksumGCInterval)) if cc.Version != batcheval.ReplicaChecksumVersion { - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) - log.Infof(ctx, "incompatible ComputeChecksum versions (requested: %d, have: %d)", - cc.Version, batcheval.ReplicaChecksumVersion) + c.SignalResult(nil /* hash */, nil /* snap */) return } // Caller is holding raftMu, so an engine snapshot is automatically - // Raft-consistent (i.e. not in the middle of an AddSSTable). + // Raft-consistent (i.e. not in the middle of an AddSSTable or descriptor + // change). snap := r.store.engine.NewSnapshot() + desc := *r.Desc() if cc.Checkpoint { sl := stateloader.Make(r.RangeID) as, err := sl.LoadRangeAppliedState(ctx, snap) @@ -756,41 +768,45 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // Compute SHA asynchronously and store it in a map by UUID. // Don't use the proposal's context for this, as it likely to be canceled very // soon. - if err := stopper.RunAsyncTask(r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { - func() { - defer snap.Close() - var snapshot *roachpb.RaftSnapshotData - if cc.SaveSnapshot { - snapshot = &roachpb.RaftSnapshotData{} - } + if err := r.store.stopper.RunAsyncTask( + r.AnnotateCtx(context.Background()), "storage.Replica: computing checksum", func(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + c.SignalStart(cancel) + func() { + defer snap.Close() + var raftSnapData *roachpb.RaftSnapshotData + if cc.SaveSnapshot { + raftSnapData = &roachpb.RaftSnapshotData{} + } - result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, r.store.consistencyLimiter) - if err != nil { - log.Errorf(ctx, "%v", err) - result = nil - } - r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot) - }() + hash, err := sha512Checksum(ctx, desc, snap, raftSnapData, cc.Mode, r.store.consistencyLimiter) + if err != nil { + log.Errorf(ctx, "%v", err) + hash, raftSnapData = nil, nil + } + c.SignalResult(hash, raftSnapData) + }() - var shouldFatal bool - for _, rDesc := range cc.Terminate { - if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { - shouldFatal = true + var shouldFatal bool + for _, rDesc := range cc.Terminate { + if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID { + shouldFatal = true + } } - } - if shouldFatal { - // This node should fatal as a result of a previous consistency - // check (i.e. this round is carried out only to obtain a diff). - // If we fatal too early, the diff won't make it back to the lease- - // holder and thus won't be printed to the logs. Since we're already - // in a goroutine that's about to end, simply sleep for a few seconds - // and then terminate. - auxDir := r.store.engine.GetAuxiliaryDir() - _ = r.store.engine.MkdirAll(auxDir) - path := base.PreventedStartupFile(auxDir) + if shouldFatal { + // This node should fatal as a result of a previous consistency + // check (i.e. this round is carried out only to obtain a diff). + // If we fatal too early, the diff won't make it back to the lease- + // holder and thus won't be printed to the logs. Since we're already + // in a goroutine that's about to end, simply sleep for a few seconds + // and then terminate. + auxDir := r.store.engine.GetAuxiliaryDir() + _ = r.store.engine.MkdirAll(auxDir) + path := base.PreventedStartupFile(auxDir) - const attentionFmt = `ATTENTION: + const attentionFmt = `ATTENTION: this node is terminating because a replica inconsistency was detected between %s and its other replicas. Please check your cluster-wide log files for more @@ -803,23 +819,22 @@ A checkpoints directory to aid (expert) debugging should be present in: A file preventing this node from restarting was placed at: %s ` - preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) - if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { - log.Warningf(ctx, "%v", err) - } + preventStartupMsg := fmt.Sprintf(attentionFmt, r, auxDir, path) + if err := fs.WriteFile(r.store.engine, path, []byte(preventStartupMsg)); err != nil { + log.Warningf(ctx, "%v", err) + } - if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { - p(*r.store.Ident) - } else { - time.Sleep(10 * time.Second) - log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) + if p := r.store.cfg.TestingKnobs.ConsistencyTestingKnobs.OnBadChecksumFatal; p != nil { + p(*r.store.Ident) + } else { + time.Sleep(10 * time.Second) + log.Fatalf(r.AnnotateCtx(context.Background()), attentionFmt, r, auxDir, path) + } } - } - }); err != nil { - defer snap.Close() + }); err != nil { + snap.Close() log.Errorf(ctx, "could not run async checksum computation (ID = %s): %v", cc.ChecksumID, err) - // Set checksum to nil. - r.computeChecksumDone(ctx, cc.ChecksumID, nil, nil) + c.SignalResult(nil, nil /* snap */) } } diff --git a/pkg/kv/kvserver/replica_consistency_test.go b/pkg/kv/kvserver/replica_consistency_test.go index 66afdee534ff..4e97917596ba 100644 --- a/pkg/kv/kvserver/replica_consistency_test.go +++ b/pkg/kv/kvserver/replica_consistency_test.go @@ -18,10 +18,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -47,12 +49,12 @@ func TestReplicaChecksumVersion(t *testing.T) { cc.Version = 1 } tc.repl.computeChecksumPostApply(ctx, cc) - rc, err := tc.repl.getChecksum(ctx, cc.ChecksumID) + rc, err := getChecksum(ctx, &tc.repl.checksumStorage, cc.ChecksumID) if !matchingVersion { if !testutils.IsError(err, "no checksum found") { t.Fatal(err) } - require.Nil(t, rc.Checksum) + require.Nil(t, rc) } else { require.NoError(t, err) require.NotNil(t, rc.Checksum) @@ -60,61 +62,69 @@ func TestReplicaChecksumVersion(t *testing.T) { }) } -func TestGetChecksumNotSuccessfulExitConditions(t *testing.T) { +func TestChecksumStorage(t *testing.T) { defer leaktest.AfterTest(t)() - ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) - defer cancel() + ctx := context.Background() + css := &checksumStorage{} - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) + id, err := uuid.NewV4() + require.NoError(t, err) - id := uuid.FastMakeV4() - notify := make(chan struct{}) - close(notify) + t0 := timeutil.Unix(0, 0) - // Simple condition, the checksum is notified, but not computed. - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: notify} - tc.repl.mu.Unlock() - rc, err := tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "no checksum found") { - t.Fatal(err) - } - require.Nil(t, rc.Checksum) - // Next condition, the initial wait expires and checksum is not started, - // this will take 10ms. - id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{})} - tc.repl.mu.Unlock() - rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "checksum computation did not start") { - t.Fatal(err) - } - require.Nil(t, rc.Checksum) - // Next condition, initial wait expired and we found the started flag, - // so next step is for context deadline. - id = uuid.FastMakeV4() - tc.repl.mu.Lock() - tc.repl.mu.checksums[id] = replicaChecksum{notify: make(chan struct{}), started: true} - tc.repl.mu.Unlock() - rc, err = tc.repl.getChecksum(ctx, id) - if !testutils.IsError(err, "context deadline exceeded") { - t.Fatal(err) - } - require.Nil(t, rc.Checksum) + // Expected usage. + t.Run("basic", func(t *testing.T) { + c := css.GetOrInit(id, t0.Add(time.Minute)) + var canceled bool + c.SignalStart(func() { + canceled = true + }) + require.NoError(t, c.WaitStarted(ctx, time.Nanosecond)) + hash := &replicaHash{PersistedMS: enginepb.MVCCStats{AbortSpanBytes: 123}} + raftData := &roachpb.RaftSnapshotData{} + c.SignalResult(hash, raftData) + ccr, err := c.WaitResult(ctx) + require.NoError(t, err) + require.EqualValues(t, 123, ccr.Persisted.AbortSpanBytes) + require.Equal(t, raftData, ccr.Snapshot) + require.True(t, css.Delete(id)) + require.True(t, canceled) + }) - // Need to reset the context, since we deadlined it above. - ctx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - // Next condition, node should quiesce. - tc.repl.store.Stopper().Quiesce(ctx) - rc, err = tc.repl.getChecksum(ctx, uuid.FastMakeV4()) - if !testutils.IsError(err, "store quiescing") { - t.Fatal(err) - } - require.Nil(t, rc.Checksum) + // Context deadline respected in getChecksum. + t.Run("deadline-getchecksum", func(t *testing.T) { + c := css.GetOrInit(id, timeutil.Now().Add(24*time.Hour)) + require.NotNil(t, c) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond) + ccr, err := getChecksum(ctx, css, id) + // We don't verify the error because it could either be Canceled + // or DeadlineExceeded. + t.Log(err) + cancel() + require.Error(t, err) + require.Nil(t, ccr) + require.False(t, css.Delete(id)) // getChecksum deleted it + }) + + // Deadline on the computation itself is observed by GC. + t.Run("deadline-gc", func(t *testing.T) { + tBegin := timeutil.Now() + c := css.GetOrInit(id, tBegin.Add(time.Millisecond)) + require.NotNil(t, c) + css.GC(tBegin.Add(2 * time.Millisecond)) + require.False(t, css.Delete(id)) // GC deleted it + }) + + // Ongoing computation is canceled on deletion. + t.Run("delete-cancels", func(t *testing.T) { + c := css.GetOrInit(id, timeutil.Now().Add(24*time.Hour)) + require.NotNil(t, c) + var canceled bool + c.SignalStart(func() { + canceled = true + }) + require.True(t, css.Delete(id)) + require.True(t, canceled) + }) } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2545b21644d7..debd515cd866 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "go.etcd.io/etcd/raft/v3" @@ -100,7 +99,6 @@ func newUnloadedReplica( return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} - r.mu.checksums = map[uuid.UUID]replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index f25851e85496..079d09b06b34 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -10409,7 +10409,7 @@ func TestReplicaServersideRefreshes(t *testing.T) { // Regression test for #31870. snap := tc.engine.NewSnapshot() defer snap.Close() - res, err := tc.repl.sha512(ctx, *tc.repl.Desc(), tc.engine, + res, err := sha512Checksum(ctx, *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL, quotapool.NewRateLimiter("ConsistencyQueue", quotapool.Limit(math.MaxFloat64), math.MaxInt64)) if err != nil { diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d14c89dbbfe5..4f7d70e249fc 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -56,15 +56,16 @@ func (is Server) CollectChecksum( resp := &CollectChecksumResponse{} err := is.execStoreCommand(ctx, req.StoreRequestHeader, func(ctx context.Context, s *Store) error { + ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx) + defer cancel() r, err := s.GetReplica(req.RangeID) if err != nil { return err } - c, err := r.getChecksum(ctx, req.ChecksumID) + ccr, err := getChecksum(ctx, &r.checksumStorage, req.ChecksumID) if err != nil { return err } - ccr := c.CollectChecksumResponse if !bytes.Equal(req.Checksum, ccr.Checksum) { // If this check is false, then this request is the replica carrying out // the consistency check. The message is spurious, but we want to leave the @@ -78,7 +79,7 @@ func (is Server) CollectChecksum( } else { ccr.Snapshot = nil } - resp = &ccr + resp = ccr return nil }) return resp, err