Skip to content

Commit

Permalink
jobs: allow blocking job adoption via sentinel file
Browse files Browse the repository at this point in the history
This change adds a check for a sentinel file, DISABLE_STARTING_BACKGROUND_JOBS,
in the first on-disk store directory to the job adoption loop. If it is found,
jobs will not be adopted by that node and current job executions are cancelled.

Operators can thus touch this file if a misbehaving job is causing problems
and otherwise preventing traditional job control that requires being able to
write to the database.

Release note (general change): background job execution is disabled on nodes where the file DISABLE_STARTING_BACKGROUND_JOBS exists in the first store directory providing an emergency tool for disabling job execution.
  • Loading branch information
dt committed Feb 6, 2020
1 parent 9026f88 commit d19f12a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
44 changes: 36 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -98,6 +99,9 @@ type Registry struct {
planFn planHookMaker
metrics Metrics

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string

mu struct {
syncutil.Mutex
// epoch is present to support older nodes that are not using
Expand Down Expand Up @@ -132,6 +136,10 @@ type Registry struct {
// back into the sql package. There's maybe a better way that I'm unaware of.
type planHookMaker func(opName, user string) (interface{}, func())

// PreventAdoptionFile is the name of the file which, if present in the first
// on-disk store, will prevent the adoption of background jobs by that node.
const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS"

// MakeRegistry creates a new Registry. planFn is a wrapper around
// sql.newInternalPlanner. It returns a sql.PlanHookState, but must be
// coerced into that in the Resumer functions.
Expand All @@ -145,16 +153,18 @@ func MakeRegistry(
settings *cluster.Settings,
histogramWindowInterval time.Duration,
planFn planHookMaker,
preventAdoptionFile string,
) *Registry {
r := &Registry{
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
preventAdoptionFile: preventAdoptionFile,
}
r.mu.epoch = 1
r.mu.jobs = make(map[int64]context.CancelFunc)
Expand Down Expand Up @@ -376,6 +386,10 @@ func (r *Registry) Start(
case <-stopper.ShouldStop():
return
case <-time.After(adoptInterval):
if r.adoptionDisabled(ctx) {
r.cancelAll(ctx)
continue
}
if err := r.maybeAdoptJob(ctx, nl); err != nil {
log.Errorf(ctx, "error while adopting jobs: %s", err)
}
Expand Down Expand Up @@ -784,6 +798,20 @@ func (r *Registry) resume(
return errCh, nil
}

func (r *Registry) adoptionDisabled(ctx context.Context) bool {
if r.preventAdoptionFile != "" {
if _, err := os.Stat(r.preventAdoptionFile); err != nil {
if !os.IsNotExist(err) {
log.Warning(ctx, "error checking if job adoption is currently disabled", err)
}
return false
}
log.Warningf(ctx, "job adoption is currently disabled by existence of %s", r.preventAdoptionFile)
return true
}
return false
}

func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
const stmt = `
SELECT id, payload, progress IS NULL, status
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
nodeID.Reset(id)
r := jobs.MakeRegistry(
ac, s.Stopper(), clock, db, s.InternalExecutor().(sqlutil.InternalExecutor),
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS,
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "",
)
if err := r.Start(ctx, s.Stopper(), nodeLiveness, cancelInterval, adoptInterval); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRegistryCancelation(t *testing.T) {
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
registry := MakeRegistry(
log.AmbientContext{}, stopper, clock, db, nil /* ex */, FakeNodeID, cluster.NoSettings,
histogramWindowInterval, FakePHS)
histogramWindowInterval, FakePHS, "")

const nodeCount = 1
nodeLiveness := NewFakeNodeLiveness(nodeCount)
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.ClusterSettings(), s.nodeLiveness, internalExecutor)

s.sessionRegistry = sql.NewSessionRegistry()
var jobAdoptionStopFile string
for _, spec := range s.cfg.Stores.Specs {
if !spec.InMemory && spec.Path != "" {
jobAdoptionStopFile = filepath.Join(spec.Path, jobs.PreventAdoptionFile)
break
}
}
s.jobRegistry = jobs.MakeRegistry(
s.cfg.AmbientCtx,
s.stopper,
Expand All @@ -572,6 +579,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, &execCfg)
},
jobAdoptionStopFile,
)
s.registry.AddMetricStruct(s.jobRegistry.MetricsStruct())

Expand Down

0 comments on commit d19f12a

Please sign in to comment.