Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
44458: storage/engine: MVCC Metamorphic test suite, first phase r=itsbilal a=itsbilal

This PR adds a new test-only sub-package to engine, metamorphic,
which has one test, TestMeta, that generates and runs random MVCC
operations on rocksdb and pebble instances with default settings.

Future additions to this test suite could include:

- [x]  A "check" mode that takes an output file as input, parses it, runs the operations in that sequence, and compares output strings.
- [ ]  Diffing test output between rocksdb and pebble and failing if there's a difference
- [ ]  Adding support for more operations
- [ ]  Adding a "restart" operation that closes the engine and restarts a different kind of engine in the store directory, then confirming operations after that point generate the same output.

First-but-biggest part of #43762 .

Release note: None

44730: storage: fix some tests that were fooling themselves r=andreimatei a=andreimatei

A couple of tests wanted multiple write too old errors, but they were
running at the wrong timestamp and so they were really getting a single
one.
Also add a test showing a funky scenario where 1PC batches behave
differently from non-1PC.

Release note: None

44786: jobs: allow blocking job adoption via sentinel file r=dt a=dt

This change adds a check for a sentinel file, DISABLE_STARTING_BACKGROUND_JOBS,
in the first on-disk store directory to the job adoption loop. If it is found,
jobs will not be adopted by that node and current job executions are cancelled.

Operators can thus touch this file if a misbehaving job is causing problems
and otherwise preventing traditional job control that requires being able to
write to the database.

Release note (general change): background job execution is disabled on nodes where the file DISABLE_STARTING_BACKGROUND_JOBS exists in the first store directory providing an emergency tool for disabling job execution.

Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
4 people committed Feb 6, 2020
4 parents c3bd5eb + b03ca7f + 9095b1f + d19f12a commit ece0b94
Show file tree
Hide file tree
Showing 10 changed files with 1,699 additions and 18 deletions.
44 changes: 36 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -98,6 +99,9 @@ type Registry struct {
planFn planHookMaker
metrics Metrics

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string

mu struct {
syncutil.Mutex
// epoch is present to support older nodes that are not using
Expand Down Expand Up @@ -132,6 +136,10 @@ type Registry struct {
// back into the sql package. There's maybe a better way that I'm unaware of.
type planHookMaker func(opName, user string) (interface{}, func())

// PreventAdoptionFile is the name of the file which, if present in the first
// on-disk store, will prevent the adoption of background jobs by that node.
const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS"

// MakeRegistry creates a new Registry. planFn is a wrapper around
// sql.newInternalPlanner. It returns a sql.PlanHookState, but must be
// coerced into that in the Resumer functions.
Expand All @@ -145,16 +153,18 @@ func MakeRegistry(
settings *cluster.Settings,
histogramWindowInterval time.Duration,
planFn planHookMaker,
preventAdoptionFile string,
) *Registry {
r := &Registry{
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
preventAdoptionFile: preventAdoptionFile,
}
r.mu.epoch = 1
r.mu.jobs = make(map[int64]context.CancelFunc)
Expand Down Expand Up @@ -376,6 +386,10 @@ func (r *Registry) Start(
case <-stopper.ShouldStop():
return
case <-time.After(adoptInterval):
if r.adoptionDisabled(ctx) {
r.cancelAll(ctx)
continue
}
if err := r.maybeAdoptJob(ctx, nl); err != nil {
log.Errorf(ctx, "error while adopting jobs: %s", err)
}
Expand Down Expand Up @@ -784,6 +798,20 @@ func (r *Registry) resume(
return errCh, nil
}

func (r *Registry) adoptionDisabled(ctx context.Context) bool {
if r.preventAdoptionFile != "" {
if _, err := os.Stat(r.preventAdoptionFile); err != nil {
if !os.IsNotExist(err) {
log.Warning(ctx, "error checking if job adoption is currently disabled", err)
}
return false
}
log.Warningf(ctx, "job adoption is currently disabled by existence of %s", r.preventAdoptionFile)
return true
}
return false
}

func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
const stmt = `
SELECT id, payload, progress IS NULL, status
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
nodeID.Reset(id)
r := jobs.MakeRegistry(
ac, s.Stopper(), clock, db, s.InternalExecutor().(sqlutil.InternalExecutor),
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS,
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "",
)
if err := r.Start(ctx, s.Stopper(), nodeLiveness, cancelInterval, adoptInterval); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRegistryCancelation(t *testing.T) {
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
registry := MakeRegistry(
log.AmbientContext{}, stopper, clock, db, nil /* ex */, FakeNodeID, cluster.NoSettings,
histogramWindowInterval, FakePHS)
histogramWindowInterval, FakePHS, "")

const nodeCount = 1
nodeLiveness := NewFakeNodeLiveness(nodeCount)
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.ClusterSettings(), s.nodeLiveness, internalExecutor)

s.sessionRegistry = sql.NewSessionRegistry()
var jobAdoptionStopFile string
for _, spec := range s.cfg.Stores.Specs {
if !spec.InMemory && spec.Path != "" {
jobAdoptionStopFile = filepath.Join(spec.Path, jobs.PreventAdoptionFile)
break
}
}
s.jobRegistry = jobs.MakeRegistry(
s.cfg.AmbientCtx,
s.stopper,
Expand All @@ -572,6 +579,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, &execCfg)
},
jobAdoptionStopFile,
)
s.registry.AddMetricStruct(s.jobRegistry.MetricsStruct())

Expand Down
68 changes: 68 additions & 0 deletions pkg/storage/engine/metamorphic/deck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package metamorphic

import (
"math/rand"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// deck is a random number generator that generates numbers in the range
// [0,len(weights)-1] where the probability of i is
// weights(i)/sum(weights). Unlike Weighted, the weights are specified as
// integers and used in a deck-of-cards style random number selection which
// ensures that each element is returned with a desired frequency within the
// size of the deck.
type deck struct {
rng *rand.Rand
mu struct {
syncutil.Mutex
index int
deck []int
}
}

// newDeck returns a new deck random number generator.
func newDeck(rng *rand.Rand, weights ...int) *deck {
var sum int
for i := range weights {
sum += weights[i]
}
expandedDeck := make([]int, 0, sum)
for i := range weights {
for j := 0; j < weights[i]; j++ {
expandedDeck = append(expandedDeck, i)
}
}
d := &deck{
rng: rng,
}
d.mu.index = len(expandedDeck)
d.mu.deck = expandedDeck
return d
}

// Int returns a random number in the range [0,len(weights)-1] where the
// probability of i is weights(i)/sum(weights).
func (d *deck) Int() int {
d.mu.Lock()
if d.mu.index == len(d.mu.deck) {
d.rng.Shuffle(len(d.mu.deck), func(i, j int) {
d.mu.deck[i], d.mu.deck[j] = d.mu.deck[j], d.mu.deck[i]
})
d.mu.index = 0
}
result := d.mu.deck[d.mu.index]
d.mu.index++
d.mu.Unlock()
return result
}
Loading

0 comments on commit ece0b94

Please sign in to comment.