Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: allow blocking job adoption via sentinel file #44786

Merged
merged 1 commit into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -98,6 +99,9 @@ type Registry struct {
planFn planHookMaker
metrics Metrics

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string

mu struct {
syncutil.Mutex
// epoch is present to support older nodes that are not using
Expand Down Expand Up @@ -132,6 +136,10 @@ type Registry struct {
// back into the sql package. There's maybe a better way that I'm unaware of.
type planHookMaker func(opName, user string) (interface{}, func())

// PreventAdoptionFile is the name of the file which, if present in the first
// on-disk store, will prevent the adoption of background jobs by that node.
const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS"

// MakeRegistry creates a new Registry. planFn is a wrapper around
// sql.newInternalPlanner. It returns a sql.PlanHookState, but must be
// coerced into that in the Resumer functions.
Expand All @@ -145,16 +153,18 @@ func MakeRegistry(
settings *cluster.Settings,
histogramWindowInterval time.Duration,
planFn planHookMaker,
preventAdoptionFile string,
) *Registry {
r := &Registry{
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
ac: ac,
stopper: stopper,
clock: clock,
db: db,
ex: ex,
nodeID: nodeID,
settings: settings,
planFn: planFn,
preventAdoptionFile: preventAdoptionFile,
}
r.mu.epoch = 1
r.mu.jobs = make(map[int64]context.CancelFunc)
Expand Down Expand Up @@ -376,6 +386,10 @@ func (r *Registry) Start(
case <-stopper.ShouldStop():
return
case <-time.After(adoptInterval):
if r.adoptionDisabled(ctx) {
r.cancelAll(ctx)
continue
}
if err := r.maybeAdoptJob(ctx, nl); err != nil {
log.Errorf(ctx, "error while adopting jobs: %s", err)
}
Expand Down Expand Up @@ -784,6 +798,20 @@ func (r *Registry) resume(
return errCh, nil
}

func (r *Registry) adoptionDisabled(ctx context.Context) bool {
if r.preventAdoptionFile != "" {
if _, err := os.Stat(r.preventAdoptionFile); err != nil {
if !os.IsNotExist(err) {
log.Warning(ctx, "error checking if job adoption is currently disabled", err)
}
return false
}
log.Warningf(ctx, "job adoption is currently disabled by existence of %s", r.preventAdoptionFile)
return true
}
return false
}

func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
const stmt = `
SELECT id, payload, progress IS NULL, status
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) {
nodeID.Reset(id)
r := jobs.MakeRegistry(
ac, s.Stopper(), clock, db, s.InternalExecutor().(sqlutil.InternalExecutor),
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS,
nodeID, s.ClusterSettings(), server.DefaultHistogramWindowInterval, jobs.FakePHS, "",
)
if err := r.Start(ctx, s.Stopper(), nodeLiveness, cancelInterval, adoptInterval); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestRegistryCancelation(t *testing.T) {
clock := hlc.NewClock(mClock.UnixNano, time.Nanosecond)
registry := MakeRegistry(
log.AmbientContext{}, stopper, clock, db, nil /* ex */, FakeNodeID, cluster.NoSettings,
histogramWindowInterval, FakePHS)
histogramWindowInterval, FakePHS, "")

const nodeCount = 1
nodeLiveness := NewFakeNodeLiveness(nodeCount)
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.ClusterSettings(), s.nodeLiveness, internalExecutor)

s.sessionRegistry = sql.NewSessionRegistry()
var jobAdoptionStopFile string
for _, spec := range s.cfg.Stores.Specs {
if !spec.InMemory && spec.Path != "" {
jobAdoptionStopFile = filepath.Join(spec.Path, jobs.PreventAdoptionFile)
break
}
}
s.jobRegistry = jobs.MakeRegistry(
s.cfg.AmbientCtx,
s.stopper,
Expand All @@ -572,6 +579,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, &execCfg)
},
jobAdoptionStopFile,
)
s.registry.AddMetricStruct(s.jobRegistry.MetricsStruct())

Expand Down