From 3e0a0282aaaa2d71f136ac565b88efff4bcbcb0b Mon Sep 17 00:00:00 2001 From: Sajjad Rizvi Date: Thu, 24 Jun 2021 16:20:16 -0400 Subject: [PATCH] jobs: retry failed jobs with exponential-backoff Failed jobs were being retried with a constant interval in the previous implementation. This commit enables jobs to be retried with exponentially increasing delays with an upper bound. This change enables to retry the jobs that are not currently retried when they fail due to transient problems. Release note: None Fixes: #44594 --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 7 + pkg/jobs/BUILD.bazel | 1 + pkg/jobs/adopt.go | 58 +- pkg/jobs/config.go | 68 +- pkg/jobs/jobs.go | 12 +- pkg/jobs/jobspb/jobs.pb.go | 2 +- pkg/jobs/jobspb/jobs.proto | 2 +- pkg/jobs/registry.go | 7 + pkg/jobs/registry_test.go | 227 +++++++ pkg/jobs/testing_knobs.go | 10 +- pkg/jobs/update.go | 70 +- pkg/migration/migrations/BUILD.bazel | 1 + pkg/migration/migrations/helpers_test.go | 29 + pkg/migration/migrations/migrations.go | 4 + .../retry_jobs_with_exponential_backoff.go | 244 +++++++ ..._with_exponential_backoff_external_test.go | 603 ++++++++++++++++++ pkg/sql/catalog/systemschema/system.go | 55 +- 19 files changed, 1350 insertions(+), 54 deletions(-) create mode 100644 pkg/migration/migrations/helpers_test.go create mode 100644 pkg/migration/migrations/retry_jobs_with_exponential_backoff.go create mode 100644 pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 2caddc91426c..6becf3dbb529 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -139,4 +139,4 @@ trace.datadog.project string CockroachDB the project under which traces will be trace.debug.enable boolean false if set, traces for recent requests can be seen at https:///debug/requests trace.lightstep.token string if set, traces go to Lightstep using this token trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time. -version version 21.1-116 set the active cluster version in the format '.' +version version 21.1-118 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e9325ba76cab..e6f120f76a6b 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -140,6 +140,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen at https:///debug/requests trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time. -versionversion21.1-116set the active cluster version in the format '.' +versionversion21.1-118set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index ab74c51eca83..f5c4634bf9bd 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -309,6 +309,8 @@ const ( // SQLStatsTable adds the system tables for storing persisted SQL statistics // for statements and transactions. SQLStatsTable + // RetryJobsWithExponentialBackoff retries failed jobs with exponential delays. + RetryJobsWithExponentialBackoff // Step (1): Add new versions here. ) @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{ Key: SQLStatsTable, Version: roachpb.Version{Major: 21, Minor: 1, Internal: 116}, }, + { + Key: RetryJobsWithExponentialBackoff, + Version: roachpb.Version{Major: 21, Minor: 1, Internal: 118}, + }, + // Step (2): Add new versions here. } diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 0fd169d9cbb7..4c7fabc7b356 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -23,6 +23,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs/jobspb", "//pkg/kv", "//pkg/roachpb", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 67d37620d2fb..be8ed2071fe5 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -15,6 +15,7 @@ import ( "fmt" "sync" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -42,16 +43,16 @@ const ( // NonTerminalStatusTupleString is a sql tuple corresponding to statuses of // non-terminal jobs. NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)` -) -const claimQuery = ` + claimQuery = ` UPDATE system.jobs SET claim_session_id = $1, claim_instance_id = $2 - WHERE (claim_session_id IS NULL) - AND (status IN ` + claimableStatusTupleString + `) + WHERE ((claim_session_id IS NULL) + AND (status IN ` + claimableStatusTupleString + `)) ORDER BY created DESC LIMIT $3 RETURNING id;` +) // claimJobs places a claim with the given SessionID to job rows that are // available. @@ -64,8 +65,7 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { } numRows, err := r.ex.Exec( ctx, "claim-jobs", txn, claimQuery, - s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop, - ) + s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop) if err != nil { return errors.Wrap(err, "could not query jobs table") } @@ -76,14 +76,49 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error { }) } +const ( + processQuery = ` +SELECT id FROM system.jobs +WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4) +` + + // Select only those jobs that have their next retry time before the current time. + // 2^62 is the max positive number. + // $5 is registry's current timestamp. + // $6 is the starting value in exponential backoff calculation. + // $7 is the max retry delay. + nextRunClause = ` +AND +($5 > (COALESCE(last_run, created) ++ +least(($6::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1)), $7::INTERVAL)))` +) + +// getProcessQuery returns the query that selects the jobs that are claimed +// by this node. +func getProcessQuery( + ctx context.Context, s sqlliveness.Session, r *Registry, +) (string, []interface{}) { + // Select the running or reverting jobs that this node has claimed. + query := processQuery + args := []interface{}{StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID()} + // Gating the version that introduced job retries with exponential backoff. + if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + // Select only those jobs that can be executed right now. + query += nextRunClause + retryStartVal := retryStartValSetting.Get(&r.settings.SV).Seconds() + retryMax := retryMaxDelaySetting.Get(&r.settings.SV).Seconds() + args = append(args, []interface{}{r.clock.Now().GoTime(), retryStartVal, retryMax}...) + } + return query, args +} + // processClaimedJobs processes all jobs currently claimed by the registry. func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error { + query, args := getProcessQuery(ctx, s, r) it, err := r.ex.QueryIteratorEx( ctx, "select-running/get-claimed-jobs", nil, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, ` -SELECT id FROM system.jobs -WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`, - StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(), + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args..., ) if err != nil { return errors.Wrapf(err, "could not query for claimed jobs") @@ -219,6 +254,9 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`, if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) { // Wait for the job to finish. No need to print the error because if there // was one it's been set in the job status already. + // TODO(sajjad): To discuss: In some cases, the error may not be set. For + // example, errors in update() called in runJob() will result in + // silently dropping the error. _ = r.runJob(resumeCtx, resumer, job, status, job.taskName()) }); err != nil { r.removeAdoptedJob(jobID) diff --git a/pkg/jobs/config.go b/pkg/jobs/config.go index 1d759ae96cdc..13eb2573c226 100644 --- a/pkg/jobs/config.go +++ b/pkg/jobs/config.go @@ -20,32 +20,45 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -const intervalBaseSettingKey = "jobs.registry.interval.base" -const adoptIntervalSettingKey = "jobs.registry.interval.adopt" -const cancelIntervalSettingKey = "jobs.registry.interval.cancel" -const gcIntervalSettingKey = "jobs.registry.interval.gc" -const retentionTimeSettingKey = "jobs.retention_time" -const cancelUpdateLimitKey = "jobs.cancel_update_limit" +const ( + intervalBaseSettingKey = "jobs.registry.interval.base" + adoptIntervalSettingKey = "jobs.registry.interval.adopt" + cancelIntervalSettingKey = "jobs.registry.interval.cancel" + gcIntervalSettingKey = "jobs.registry.interval.gc" + retentionTimeSettingKey = "jobs.retention_time" + cancelUpdateLimitKey = "jobs.cancel_update_limit" + retryStartValSettingKey = "jobs.registry.retry.backoff_start" + retryMaxDelaySettingKey = "jobs.registry.retry.backoff_max" +) + +var ( + // defaultAdoptInterval is the default adopt interval. + defaultAdoptInterval = 30 * time.Second -// defaultAdoptInterval is the default adopt interval. -var defaultAdoptInterval = 30 * time.Second + // defaultCancelInterval is the default cancel interval. + defaultCancelInterval = 10 * time.Second -// defaultCancelInterval is the default cancel interval. -var defaultCancelInterval = 10 * time.Second + // defaultGcInterval is the default GC Interval. + defaultGcInterval = 1 * time.Hour -// defaultGcInterval is the default GC Interval. -var defaultGcInterval = 1 * time.Hour + // defaultIntervalBase is the default interval base. + defaultIntervalBase = 1.0 -// defaultIntervalBase is the default interval base. -var defaultIntervalBase = 1.0 + // defaultRetentionTime is the default duration for which terminal jobs are + // kept in the records. + defaultRetentionTime = 14 * 24 * time.Hour -// defaultRetentionTime is the default duration for which terminal jobs are -// kept in the records. -var defaultRetentionTime = 14 * 24 * time.Hour + // defaultCancellationsUpdateLimit is the default number of jobs that can be + // updated when canceling jobs concurrently from dead sessions. + defaultCancellationsUpdateLimit int64 = 1000 -// defaultCancellationsUpdateLimit is the default number of jobs that can be -// updated when canceling jobs concurrently from dead sessions. -var defaultCancellationsUpdateLimit int64 = 1000 + // defaultRetryStartVal is the starting value in the calculation of retry interval + // of a failed job using exponential backoff. + defaultRetryStartVal = 30 * time.Second + + // defaultRetryMaxDelay is the maximum backoff delay to retry a failed job. + defaultRetryMaxDelay = 24 * time.Hour +) var ( intervalBaseSetting = settings.RegisterFloatSetting( @@ -93,6 +106,21 @@ var ( defaultCancellationsUpdateLimit, settings.NonNegativeInt, ) + + retryStartValSetting = settings.RegisterDurationSetting( + retryStartValSettingKey, + "the starting value of the exponential-backoff delay "+ + " in the next retry of a failed job", + defaultRetryStartVal, + settings.NonNegativeDuration, + ) + + retryMaxDelaySetting = settings.RegisterDurationSetting( + retryMaxDelaySettingKey, + "the maximum delay to retry a failed job", + defaultRetryMaxDelay, + settings.PositiveDuration, + ) ) // jitter adds a small jitter in the given duration. diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 6ff7f57d75c0..9d902ac758ef 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -16,6 +16,7 @@ import ( "reflect" "sync/atomic" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" @@ -237,7 +238,14 @@ func (j *Job) started(ctx context.Context, txn *kv.Txn) error { // TODO(spaskob): Remove this status change after we stop supporting // pending job states. ju.UpdateStatus(StatusRunning) - md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime()) + if md.Payload.StartedMicros == 0 { + md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime()) + } + + if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + ju.UpdateRunStats(md.RunStats.NumRuns+1, j.registry.clock.Now().GoTime()) + } + ju.UpdatePayload(md.Payload) return nil }) @@ -514,6 +522,8 @@ func (j *Job) pauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequest }) } +// TODO(sajjad): Update runInfo in reverted + // reverted sets the status of the tracked job to reverted. func (j *Job) reverted( ctx context.Context, txn *kv.Txn, err error, fn func(context.Context, *kv.Txn) error, diff --git a/pkg/jobs/jobspb/jobs.pb.go b/pkg/jobs/jobspb/jobs.pb.go index f550721387d8..6bf8169ebaf9 100644 --- a/pkg/jobs/jobspb/jobs.pb.go +++ b/pkg/jobs/jobspb/jobs.pb.go @@ -2520,7 +2520,7 @@ func (*Progress) XXX_OneofWrappers() []interface{} { type Job struct { Id JobID `protobuf:"varint,1,opt,name=id,proto3,customtype=JobID" json:"id"` - // Keep progress first as it may bre more relevant to see when looking at a + // Keep progress first as it may be more relevant to see when looking at a // running job. Progress *Progress `protobuf:"bytes,2,opt,name=progress,proto3" json:"progress,omitempty"` Payload *Payload `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 73ea25c11b75..f012a70b2d7a 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -746,7 +746,7 @@ enum Type { message Job { int64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "JobID"]; - // Keep progress first as it may bre more relevant to see when looking at a + // Keep progress first as it may be more relevant to see when looking at a // running job. Progress progress = 2; Payload payload = 3; diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 7769ad47aa9c..6186eef16b69 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -191,6 +191,9 @@ func MakeRegistry( } if knobs != nil { r.knobs = *knobs + if knobs.TimeSource != nil { + r.clock = knobs.TimeSource + } } r.mu.adoptedJobs = make(map[jobspb.JobID]*adoptedJob) r.metrics.init(histogramWindowInterval) @@ -1006,6 +1009,7 @@ func (r *Registry) stepThroughStateMachine( jobType := payload.Type() log.Infof(ctx, "%s job %d: stepping through state %s with error: %+v", jobType, job.ID(), status, jobErr) jm := r.metrics.JobMetrics[jobType] + switch status { case StatusRunning: if jobErr != nil { @@ -1022,6 +1026,9 @@ func (r *Registry) stepThroughStateMachine( func() { jm.CurrentlyRunning.Inc(1) defer jm.CurrentlyRunning.Dec(1) + if err != nil { + return + } err = resumer.Resume(resumeCtx, execCtx) }() if err == nil { diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 62d1e5884b02..c8d16fbc73ab 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -13,7 +13,9 @@ package jobs import ( "context" "fmt" + "math" "strconv" + "sync/atomic" "testing" "time" @@ -21,18 +23,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -276,3 +282,224 @@ func TestRegistryGCPagination(t *testing.T) { db.QueryRow(t, `SELECT count(1) FROM system.jobs`).Scan(&count) require.Zero(t, count) } + +// TestRetriesWithExponentialBackoff tests job retries with exponentially growing +// intervals. Moreover, it tests the effectiveness of the upper bound on the +// retry delay. +func TestRetriesWithExponentialBackoff(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + unitTime := 1 * time.Millisecond + // Interval to process claimed jobs. + adoptInterval := unitTime + + clusterSettings := func( + ctx context.Context, retryBase time.Duration, retryMax time.Duration, + ) *cluster.Settings { + s := cluster.MakeTestingClusterSettings() + // Set a small adopt interval to reduce test time. + adoptIntervalSetting.Override(ctx, &s.SV, adoptInterval) + // Set exponential backoff base and max retry interval based on the tests. + retryStartValSetting.Override(ctx, &s.SV, retryBase) + retryMaxDelaySetting.Override(ctx, &s.SV, retryMax) + return s + } + + for _, test := range [...]struct { + name string // Test case ID. + maxRetries int + retryInitVal time.Duration + retryMax time.Duration + }{ + { + name: "exponential backoff", + retryInitVal: 2 * time.Millisecond, + retryMax: 100 * time.Millisecond, + // Number of retries should be large enough such that the delay becomes + // larger than the error margin. Moreover, it should be large enough + // to exceed the backoff time from retryMax. + maxRetries: 6, + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + var registry *Registry + const ( + running = iota + passed + failed + ) + status := int32(running) + retries := -1 // -1 because the counter has to start from zero. + var lastRetry time.Time + // We use a manual clock to control and evaluate job execution times. + t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + timeSource := timeutil.NewManualTime(t0) + clock := hlc.NewClock(func() int64 { + return timeSource.Now().UnixNano() + }, base.DefaultMaxClockOffset) + // Intercept intercepts the update call to mark the job as succeeded and + // prevents the job from succeeding for a fixed number of runs. In each + // run, it validates whether the job is run at the expected time, which + // follows an exponentially increasing retry delays. + intercept := func(orig, updated JobMetadata) error { + // If updated is not marking as succeeded or if the test has already failed. + if updated.Status != StatusSucceeded || atomic.LoadInt32(&status) == failed { + return nil + } + retries++ + now := clock.Now().GoTime() + if retries == 0 { // If the job is run the first time. + lastRetry = now + } + // Expected next retry time. + backoff := test.retryInitVal * ((1 << int(math.Min(float64(retries), 62))) - 1) + delay := time.Duration(math.Min(float64(backoff), float64(test.retryMax))) + expected := lastRetry.Add(delay) + + require.Equal(t, expected, now, "job executed at an unexpected time: "+ + "expected = %v, now = %v", expected, now) + + // The test passes if the job keeps running at expected times for a sufficient + // number of times. + if retries >= test.maxRetries { + atomic.StoreInt32(&status, passed) + return nil + } + lastRetry = now + return errors.Errorf( + "Preventing the job from succeeding in try %d, delayed by %d ms", retries, delay/1e6) + } + + // Setup the test cluster. + cs := clusterSettings(ctx, test.retryInitVal, test.retryMax) + args := base.TestServerArgs{ + Settings: cs, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: &TestingKnobs{ + BeforeUpdate: intercept, + TimeSource: clock, + }, + }, + } + s, _, kvDB := serverutils.StartServer(t, args) + defer s.Stopper().Stop(ctx) + // Create and run a dummy job. + RegisterConstructor(jobspb.TypeImport, func(_ *Job, cs *cluster.Settings) Resumer { + return FakeResumer{} + }) + registry = s.JobRegistry().(*Registry) + id := registry.MakeJobID() + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := registry.CreateJobWithTxn(ctx, Record{ + // Job does not accept an empty Details field, so arbitrarily provide + // ImportDetails. + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + }, id, txn) + return err + })) + + // Wait for the job to be succeed. + testutils.SucceedsSoon(t, func() error { + if atomic.LoadInt32(&status) != running { + return nil + } + return errors.Errorf("waiting for the job to complete") + }) + require.Equal(t, int32(passed), atomic.LoadInt32(&status)) + }) + } +} + +func TestExponentialBackoffSettings(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, test := range [...]struct { + name string // Test case ID. + // The setting to test. + settingKey string + // The value of the setting to set. + value time.Duration + }{ + { + name: "backoff base setting", + settingKey: retryStartValSettingKey, + value: 2 * time.Millisecond, + }, + { + name: "backoff max setting", + settingKey: retryMaxDelaySettingKey, + value: 2 * time.Millisecond, + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + var tdb *sqlutils.SQLRunner + var finished atomic.Value + finished.Store(false) + var intercepted atomic.Value + intercepted.Store(false) + intercept := func(orig, updated JobMetadata) error { + // If this updated is not to mark as succeeded or the test has already failed. + if updated.Status != StatusSucceeded { + return nil + } + + // If marking the first time, prevent the marking and update the cluster + // setting based on test params. The setting value should be reduced + // from a large value to a small value. + if !intercepted.Load().(bool) { + tdb.Exec(t, fmt.Sprintf("SET CLUSTER SETTING %s = '%v'", test.settingKey, test.value)) + intercepted.Store(true) + return errors.Errorf("preventing the job from succeeding") + } + // Let the job to succeed. As we began with a long interval and prevented + // the job than succeeding in the first attempt, its re-execution + // indicates that the setting is updated successfully and is in effect. + finished.Store(true) + return nil + } + + // Setup the test cluster. + cs := cluster.MakeTestingClusterSettings() + // Set a small adopt interval to reduce test time. + adoptIntervalSetting.Override(ctx, &cs.SV, 2*time.Millisecond) + // Begin with a very long delay. + retryStartValSetting.Override(ctx, &cs.SV, time.Hour) + retryMaxDelaySetting.Override(ctx, &cs.SV, time.Hour) + args := base.TestServerArgs{ + Settings: cs, + Knobs: base.TestingKnobs{JobsTestingKnobs: &TestingKnobs{BeforeUpdate: intercept}}, + } + s, sdb, kvDB := serverutils.StartServer(t, args) + defer s.Stopper().Stop(ctx) + tdb = sqlutils.MakeSQLRunner(sdb) + // Create and run a dummy job. + RegisterConstructor(jobspb.TypeImport, func(_ *Job, cs *cluster.Settings) Resumer { + return FakeResumer{} + }) + registry := s.JobRegistry().(*Registry) + id := registry.MakeJobID() + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := registry.CreateJobWithTxn(ctx, Record{ + // Job does not accept an empty Details field, so arbitrarily provide + // ImportDetails. + Details: jobspb.ImportDetails{}, + Progress: jobspb.ImportProgress{}, + }, id, txn) + return err + })) + + // Wait for the job to be succeed. + testutils.SucceedsSoon(t, func() error { + if finished.Load().(bool) { + return nil + } + return errors.Errorf("waiting for the job to complete") + }) + }) + } +} diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index 003d2e080914..4df72b370f9c 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestingKnobs are base.ModuleTestingKnobs for testing jobs related infra. @@ -54,8 +55,14 @@ type TestingKnobs struct { // IntervalOverrides consists of override knobs for job intervals. IntervalOverrides TestingIntervalOverrides + + // TimeSource replaces registry's clock. + TimeSource *hlc.Clock } +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {} + // TestingIntervalOverrides contains variables to override the intervals and // settings of periodic tasks. type TestingIntervalOverrides struct { @@ -75,9 +82,6 @@ type TestingIntervalOverrides struct { RetentionTime *time.Duration } -// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. -func (*TestingKnobs) ModuleTestingKnobs() {} - // NewTestingKnobsWithShortIntervals return a TestingKnobs structure with // overrides for short adopt and cancel intervals. func NewTestingKnobsWithShortIntervals() *TestingKnobs { diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index d0c45045c27c..1a5a8d119d7d 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -15,7 +15,9 @@ import ( "context" "fmt" "strings" + "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" @@ -36,12 +38,19 @@ import ( // changes will be ignored unless JobUpdater is used). type UpdateFn func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error +// RunStats consists of job-run statistics: num of runs and last-run timestamp. +type RunStats struct { + LastRun time.Time + NumRuns int +} + // JobMetadata groups the job metadata values passed to UpdateFn. type JobMetadata struct { ID jobspb.JobID Status Status Payload *jobspb.Payload Progress *jobspb.Progress + RunStats *RunStats } // CheckRunningOrReverting returns an InvalidStatusError if md.Status is not @@ -80,6 +89,13 @@ func (ju *JobUpdater) hasUpdates() bool { return ju.md != JobMetadata{} } +func (ju *JobUpdater) UpdateRunStats(numRuns int, lastRun time.Time) { + ju.md.RunStats = &RunStats{ + NumRuns: numRuns, + LastRun: lastRun, + } +} + // UpdateHighwaterProgressed updates job updater progress with the new high water mark. func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobUpdater) error { if err := md.CheckRunningOrReverting(); err != nil { @@ -119,18 +135,27 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error var payload *jobspb.Payload var progress *jobspb.Progress if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { - stmt := "SELECT status, payload, progress FROM system.jobs WHERE id = $1 FOR UPDATE" + sessionIDClause := "" if j.sessionID != "" { - stmt = "SELECT status, payload, progress, claim_session_id FROM system." + - "jobs WHERE id = $1 FOR UPDATE" + sessionIDClause = ", claim_session_id" + } + // Retrieve run stats if the jobs table version supports it. + statsClause := "" + if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + statsClause = ", COALESCE(last_run, created), COALESCE(num_runs, 0)" } + stmt := fmt.Sprintf("SELECT status, payload, progress%s%s FROM system.jobs WHERE id = $1 FOR UPDATE", + sessionIDClause, statsClause) + var err error var row tree.Datums - row, err = j.registry.ex.QueryRowEx( - ctx, "log-job", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, + if row, err = j.registry.ex.QueryRowEx( + ctx, + "log-job", + txn, + sessiondata.InternalExecutorOverride{User: security.RootUserName()}, stmt, j.ID(), - ) - if err != nil { + ); err != nil { return err } if row == nil { @@ -142,6 +167,7 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error return errors.AssertionFailedf("job %d: expected string status, but got %T", j.ID(), statusString) } + status := Status(*statusString) if j.sessionID != "" { if row[3] == tree.DNull { return errors.Errorf( @@ -155,8 +181,6 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error j.ID(), statusString, j.sessionID, storedSession) } } - - status := Status(*statusString) if payload, err = UnmarshalPayload(row[1]); err != nil { return err } @@ -170,6 +194,28 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error Payload: payload, Progress: progress, } + + if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) { + offset := 0 + if j.sessionID != "" { + offset = 1 + } + var lastRun *tree.DTimestamp + lastRun, ok = row[3+offset].(*tree.DTimestamp) + if !ok { + return errors.AssertionFailedf("job %d: expected timestamp last_run, but got %T", j.ID(), lastRun) + } + var numRuns *tree.DInt + numRuns, ok = row[4+offset].(*tree.DInt) + if !ok { + return errors.AssertionFailedf("job %d: expected int num_runs, but got %T", j.ID(), numRuns) + } + md.RunStats = &RunStats{ + NumRuns: int(*numRuns), + LastRun: lastRun.Time, + } + } + var ju JobUpdater if err := updateFn(txn, md, &ju); err != nil { return err @@ -179,6 +225,7 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error return err } } + if !ju.hasUpdates() { return nil } @@ -224,6 +271,11 @@ func (j *Job) Update(ctx context.Context, txn *kv.Txn, updateFn UpdateFn) error addSetter("progress", progressBytes) } + if ju.md.RunStats != nil { + addSetter("last_run", ju.md.RunStats.LastRun) + addSetter("num_runs", ju.md.RunStats.NumRuns) + } + updateStmt := fmt.Sprintf( "UPDATE system.jobs SET %s WHERE id = $1", strings.Join(setters, ", "), diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index d476a11a97a2..6895d09498a9 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "migrations.go", "protected_ts_meta_migration.go", "sql_stats.go", + "retry_jobs_with_exponential_backoff.go", "truncated_state.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrations", diff --git a/pkg/migration/migrations/helpers_test.go b/pkg/migration/migrations/helpers_test.go new file mode 100644 index 000000000000..03777936fabe --- /dev/null +++ b/pkg/migration/migrations/helpers_test.go @@ -0,0 +1,29 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import "github.com/cockroachdb/cockroach/pkg/sql/catalog" + +func HasBackoffCols(jobsTable catalog.TableDescriptor, col string) (bool, error) { + return hasColumn(jobsTable, col) +} + +func HasBackoffIndex(jobsTable catalog.TableDescriptor, index string) (bool, error) { + return hasIndex(jobsTable, index) +} + +func NeutralizeSQL(stmt string) string { + return neutralizeSQL(stmt) +} + +const NumRunsQuery = numRunsQuery +const LastRunQuery = lastRunQuery +const AddIndexQuery = addIndexQuery diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index b328d061ee97..3819148708a3 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -77,6 +77,10 @@ var migrations = []migration.Migration{ toCV(clusterversion.SQLStatsTable), sqlTransactionStatsTableMigration, ), + migration.NewTenantMigration( + "add last_run and num_runs columns to system.jobs", + toCV(clusterversion.RetryJobsWithExponentialBackoff), + retryJobsWithExponentialBackoff), } func init() { diff --git a/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go b/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go new file mode 100644 index 000000000000..ca49360a0b20 --- /dev/null +++ b/pkg/migration/migrations/retry_jobs_with_exponential_backoff.go @@ -0,0 +1,244 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + "regexp" + "strings" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" + "github.com/kr/pretty" +) + +const ( + numRunsQuery = `ALTER TABLE system.jobs ADD COLUMN num_runs INT8 FAMILY claim` + lastRunQuery = `ALTER TABLE system.jobs ADD COLUMN last_run TIMESTAMP FAMILY claim;` + addIndexQuery = `CREATE INDEX jobs_run_stats_idx + ON system.jobs (claim_session_id, status, created) + STORING (last_run, num_runs, claim_instance_id) + WHERE + status + IN ( + 'running', + 'reverting', + 'pending', + 'pause-requested', + 'cancel-requested' + )` +) + +// retryJobsWithExponentialBackoff changes the schema of system.jobs table in +// two steps. It first adds two new columns and then an index. +func retryJobsWithExponentialBackoff( + ctx context.Context, cs clusterversion.ClusterVersion, d migration.TenantDeps, +) error { + ops := [...]struct { + // Schema name. + name string + // Schema change query. + query string + // Function to check existing schema. + schemaExists func(catalog.TableDescriptor, string) (bool, error) + }{ + {"num_runs", numRunsQuery, hasColumn}, + {"last_run", lastRunQuery, hasColumn}, + {"jobs_run_stats_idx", addIndexQuery, hasIndex}, + } + for _, op := range ops { + if err := runJobsTableMigration(ctx, cs, d, "jobs-migration-"+op.name, op.query, + func(table catalog.TableDescriptor) (bool, error) { + return op.schemaExists(table, op.name) + }); err != nil { + return err + } + } + return nil +} + +// runJobsTableMigration changes the jobs table schema based on the given query. +// The change is ignored if the table already has the required changes, which are +// explicitly limited to two new columns (last_run and num_runs) and the index +// jobs_run_stats_idx. +func runJobsTableMigration( + ctx context.Context, + _ clusterversion.ClusterVersion, + d migration.TenantDeps, + opTag string, + schemaChangeQuery string, + schemaExists func(descriptor catalog.TableDescriptor) (bool, error), +) error { + for { + // Fetch the jobs table + // Check if mutation job exists for system.jobs + // if exists + // wait for job to finish, continue the loop + // otherwise + // check if migration is complete (check new cols and ix), return + // run schema change + // on success continue the loop + + // Retrieve the jobs table. + var jt catalog.TableDescriptor + if err := descs.Txn(ctx, d.Settings, d.LeaseManager, d.InternalExecutor, d.DB, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + jt, err = descriptors.GetImmutableTableByID(ctx, txn, keys.JobsTableID, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidCached: true, + Required: true, + }, + }) + return err + }); err != nil { + return err + } + + // Wait for any in-flight schema changes to complete. + if mutations := jt.GetMutationJobs(); len(mutations) > 0 { + for _, mutation := range mutations { + log.Infof(ctx, "[SR] waiting for a mutation job %v to complete before "+ + "modify the jobs table", mutation.JobID) + if _, err := d.InternalExecutor.Exec(ctx, "migration-mutations-wait", + nil, "SHOW JOB WHEN COMPLETE $1", mutation.JobID); err != nil { + return err + } + } + continue + } + + // Ignore the schema change if the jobs table already has the required schema. + if ok, err := schemaExists(jt); err != nil { + return errors.Wrapf(err, "[SR] Error while validating descriptors during"+ + " operation %s", opTag) + } else if ok { + log.Info(ctx, fmt.Sprintf("[SR] skipping %s operation as the schema change already exists.", opTag)) + return nil + } + + // Modify the jobs table. + log.Info(ctx, fmt.Sprintf("[SR] Performing operation: %s", opTag)) + if _, err := d.InternalExecutor.ExecEx( + ctx, + "migration-alter-jobs-table", + nil, /* txn */ + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + schemaChangeQuery); err != nil { + return err + } + return nil + } +} + +// hasColumn return true if the table has the given column. It returns +// an error if the column exists but doesn't match with the table's +// descriptor defined in systemschema/system.go. The comparison is not strict +// as several descriptor fields are ignored. +func hasColumn(jobsTable catalog.TableDescriptor, colName string) (bool, error) { + storedCol, err := jobsTable.FindColumnWithName(tree.Name(colName)) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return false, nil + } + return false, err + } + + expectedCol, err := systemschema.JobsTable.FindColumnWithName(tree.Name(colName)) + if err != nil { + return false, errors.Wrapf(err, "[SR] columns name %s is invalid.", colName) + } + + expectedCopy := expectedCol.ColumnDescDeepCopy() + storedCopy := storedCol.ColumnDescDeepCopy() + + storedCopy.ID = 0 + expectedCopy.ID = 0 + + if err = mustBeEqual(&expectedCopy, &storedCopy); err != nil { + return false, err + } + return true, nil +} + +// hasIndex return true if the table has the given index. It returns +// an error if the index exists but doesn't match with the table's +// descriptor defined in systemschema/system.go. The comparison is not strict +// as several descriptor fields are ignored. +func hasIndex(jobsTable catalog.TableDescriptor, indexName string) (bool, error) { + storedIdx, err := jobsTable.FindIndexWithName(indexName) + if err != nil { + if strings.Contains(err.Error(), "does not exist") { + return false, nil + } + return false, err + } + expectedIdx, err := systemschema.JobsTable.FindIndexWithName(indexName) + if err != nil { + return false, errors.Wrapf(err, "[SR] index name %s is invalid", indexName) + } + storedCopy := storedIdx.IndexDescDeepCopy() + expectedCopy := expectedIdx.IndexDescDeepCopy() + // Ignore the fields that don't matter in the comparison. + // TODO(sajjad): FIXME: Please check am I ingoring the right fields. + storedCopy.ID = 0 + expectedCopy.ID = 0 + storedCopy.Version = 0 + expectedCopy.Version = 0 + storedCopy.CreatedExplicitly = false + expectedCopy.CreatedExplicitly = false + expectedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + storedCopy.StoreColumnIDs = []descpb.ColumnID{0, 0, 0} + expectedCopy.Predicate = neutralizeSQL(expectedCopy.Predicate) + + if err = mustBeEqual(&expectedCopy, &storedCopy); err != nil { + return false, err + } + return true, nil +} + +func mustBeEqual(expected, found protoutil.Message) error { + expectedBytes, err := protoutil.Marshal(expected) + if err != nil { + return err + } + foundBytes, err := protoutil.Marshal(found) + if err != nil { + return err + } + if bytes.Equal(expectedBytes, foundBytes) { + return nil + } + return errors.Errorf("[SR] expected descriptor doesn't match "+ + "with found descriptor: %s", strings.Join(pretty.Diff(expected, found), "\n")) +} + +func neutralizeSQL(stmt string) string { + stmt = strings.Replace(stmt, "system.jobs", "system.public.jobs", -1) + stmt = strings.TrimSpace(regexp.MustCompile(`(\s+|;+)`).ReplaceAllString(stmt, " ")) + stmt = strings.ReplaceAll(stmt, "( ", "(") + stmt = strings.ReplaceAll(stmt, " )", ")") + return stmt +} diff --git a/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go b/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go new file mode 100644 index 000000000000..a65f10ea3fde --- /dev/null +++ b/pkg/migration/migrations/retry_jobs_with_exponential_backoff_external_test.go @@ -0,0 +1,603 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations_test + +import ( + "context" + gosql "database/sql" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration/migrations" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExponentialBackoffMigration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.RetryJobsWithExponentialBackoff - 1), + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Inject the old copy of the descriptor. + injectLegacyTable(t, ctx, s) + // Validate that the jobs table has old schema. + validateSchemaExists(t, ctx, s, sqlDB, false) + // Run the migration. + migrate(t, sqlDB, false, nil) + // Validate that the jobs table has new schema. + validateSchemaExists(t, ctx, s, sqlDB, true) + // Make sure that jobs work by running a job. + validateJobExec(t, tdb) +} + +func TestMigrationWithFailures(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, test := range []struct { + name string + status jobs.Status + query string + }{ + { + "fail num_run before update", + jobs.StatusRunning, + migrations.NumRunsQuery, + }, + { + "fail num_run after update", + jobs.StatusSucceeded, + migrations.NumRunsQuery, + }, + { + "fail last_run before update", + jobs.StatusRunning, + migrations.LastRunQuery, + }, + { + "fail last_run after update", + jobs.StatusSucceeded, + migrations.LastRunQuery, + }, + { + "fail index before update", + jobs.StatusRunning, + migrations.AddIndexQuery, + }, + { + "fail index after update", + jobs.StatusSucceeded, + migrations.AddIndexQuery, + }, + } { + ctx := context.Background() + var s serverutils.TestServerInterface + var injectedError atomic.Value + injectedError.Store(false) + beforeUpdate := func(orig, updated jobs.JobMetadata) error { + fmt.Println("[SR] BU: ", orig.ID, orig.Status, orig.Payload.Type(), updated.Status) + if orig.Payload.Type() == jobspb.TypeMigration { + //migrationJob = orig.ID + return nil + } + if orig.Payload.Description != migrations.NeutralizeSQL(test.query) || + injectedError.Load().(bool) || + updated.Status != test.status { + return nil + } + injectedError.Store(true) + fmt.Println("[SR] injecting error ...") + assert.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + return s.JobRegistry().(*jobs.Registry).CancelRequested(ctx, txn, orig.ID) + })) + return nil + } + + shortInterval := 10 * time.Millisecond + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.RetryJobsWithExponentialBackoff - 1), + }, + JobsTestingKnobs: &jobs.TestingKnobs{ + IntervalOverrides: jobs.TestingIntervalOverrides{ + Adopt: &shortInterval, + Cancel: &shortInterval, + }, + BeforeUpdate: beforeUpdate, + }, + }, + }, + } + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + s = tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Inject the old copy of the descriptor. + injectLegacyTable(t, ctx, s) + // Validate that the jobs table has old schema. + validateSchemaExists(t, ctx, s, sqlDB, false) + // Run the migration, expecting failure. + migrate(t, sqlDB, true, nil) + // Run the migration again, expecting success. + migrate(t, sqlDB, false, nil) + // Validate that the jobs table has new schema. + validateSchemaExists(t, ctx, s, sqlDB, true) + // Make sure that jobs work by running a job. + validateJobExec(t, tdb) + } +} + +func TestWaitForMutations(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Test to ensure that the migration waits for existing mutation jobs to + // reach a terminal state before it performs the migration. + // + // Start migration, which create schema change jobs. Intercept a schema change + // jobs, pause the job, cancel the migration, restart the migration, ensure + // that the migration is waiting, resume the schema-change job, leading to + // test completion with successful migration. + + for _, test := range []struct { + name string + status jobs.Status + query string + waitForMigrationRestart bool + cancelSchemaJob bool + }{ + { + "fail index after update", + jobs.StatusRunning, + migrations.NumRunsQuery, + true, + false, + }, + } { + test.query = migrations.NeutralizeSQL(test.query) + type updateEvent struct { + orig, updated jobs.JobMetadata + errChan chan error + } + updateEventChan := make(chan updateEvent) + beforeUpdate := func(orig, updated jobs.JobMetadata) error { + ue := updateEvent{ + orig: orig, + updated: updated, + errChan: make(chan error), + } + updateEventChan <- ue + return <-ue.errChan + } + + shortInterval := 10 * time.Millisecond + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey( + clusterversion.RetryJobsWithExponentialBackoff - 1), + }, + JobsTestingKnobs: &jobs.TestingKnobs{ + IntervalOverrides: jobs.TestingIntervalOverrides{ + Adopt: &shortInterval, + Cancel: &shortInterval, + }, + BeforeUpdate: beforeUpdate, + }, + }, + }, + } + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, clusterArgs) + defer tc.Stopper().Stop(ctx) + s := tc.Server(0) + sqlDB := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Inject the old copy of the descriptor. + injectLegacyTable(t, ctx, s) + // Validate that the jobs table has old schema. + validateSchemaExists(t, ctx, s, sqlDB, false) + // Run the migration, expecting failure. + log.Info(ctx, "[SR] Trying migration, expecting to fail") + finishChan := make(chan struct{}) + go migrate(t, sqlDB, true, finishChan) + + var migJobID jobspb.JobID + var schemaEvent updateEvent + // Intercept the target schema-change job and get migration-job's ID. + log.Info(ctx, "[SR] Intercepting the schema job") + for { + e := <-updateEventChan + // The migration job creates schema-change jobs. Therefore, we are guaranteed + // to get the migration-job's ID before cancelling the job later on. + if e.orig.Payload.Type() == jobspb.TypeMigration { + migJobID = e.orig.ID + log.Infof(ctx, "[SR] Migration job ID: %v", migJobID) + } else if e.orig.Payload.Description == test.query { + schemaEvent = e + log.Infof(ctx, "[SR] Intercepted schema change job: %v", e.orig.ID) + break + } + e.errChan <- nil + } + // Cancel the migration job + log.Info(ctx, "[SR] cancelling the migration job") + cancelJob(t, ctx, s, migJobID) + + // several cases after canceling the migration + // 1. Wait for the migration to resume and then continue + // 2. Wait for the migration to resume and then fail + // 3. continue without waiting for the migration to resume + // 4. fail without waiting for the migration to resume + + // Wait for the migration job to finish. + log.Info(ctx, "[SR] waiting for the migration job to finish.") + testutils.SucceedsSoon(t, func() error { + select { + case <-finishChan: + return nil + case e := <-updateEventChan: + e.errChan <- nil + default: + } + return errors.Errorf("Waiting for the migration job to finish.") + }) + + // Restart the migration job. + log.Info(ctx, "[SR] restarting the migration job") + go migrate(t, sqlDB, true, finishChan) + + // Wait until the migration job restarts. + log.Info(ctx, "[SR] retrying migration, expecting to succeed") + if test.waitForMigrationRestart { + for { + e := <-updateEventChan + e.errChan <- nil + if e.orig.Payload.Type() == jobspb.TypeMigration && e.updated.Status == jobs.StatusRunning { + break + } + } + } + + // Resume the schema change. + log.Info(ctx, "[SR] resuming the schema change jobs") + schemaEvent.errChan <- nil + done := make(chan struct{}) + go func() { + for { + select { + case e := <-updateEventChan: + e.errChan <- nil + case <-done: + break + } + } + }() + + // Let all jobs to finish. + log.Infof(ctx, "[SR] waiting for the new migration job to complete.") + testutils.SucceedsSoon(t, func() error { + select { + case <-finishChan: + return nil + default: + } + return errors.Errorf("Waiting for the migration job to finish.") + }) + + // Validate that the jobs table has new schema. + validateSchemaExists(t, ctx, s, sqlDB, true) + // Make sure that jobs work by running a job. + validateJobExec(t, tdb) + done <- struct{}{} + } +} + +func cancelJob( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, jobID jobspb.JobID, +) { + assert.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + log.Infof(ctx, "[SR] cancelling job: %v", jobID) + //return s.JobRegistry().(*jobs.Registry).CancelRequested(ctx, txn, jobID) + return s.JobRegistry().(*jobs.Registry).UpdateJobWithTxn( + ctx, jobID, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater, + ) error { + ju.UpdateStatus(jobs.StatusCancelRequested) + log.Infof(ctx, "[SR] updated the status to cancel-requested for job %v", md.ID) + return nil + }) + })) +} + +func migrate(t *testing.T, sqlDB *gosql.DB, expectError bool, finished chan struct{}) { + defer func() { + fmt.Println("[SR] migration finished") + finished <- struct{}{} + }() + fmt.Println("[SR] Expecting migration to fail: ", expectError) + _, err := sqlDB.Exec(`SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.RetryJobsWithExponentialBackoff).String()) + if expectError { + require.Error(t, err) + return + } + require.NoError(t, err) +} + +func injectLegacyTable(t *testing.T, ctx context.Context, s serverutils.TestServerInterface) { + err := descs.Txn( + ctx, + s.ClusterSettings(), + s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), + s.DB(), + func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) error { + id := systemschema.JobsTable.GetID() + tab, err := descriptors.GetMutableTableByID(ctx, txn, id, tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return err + } + builder := tabledesc.NewBuilder(deprecatedDescriptor()) + require.NoError(t, builder.RunPostDeserializationChanges(ctx, nil)) + tab.TableDescriptor = builder.BuildCreatedMutableTable().TableDescriptor + tab.Version = tab.ClusterVersion.Version + 1 + return descriptors.WriteDesc(ctx, false, tab, txn) + }) + require.NoError(t, err) +} + +/** +FIXME [SR] +- Test failing the jobs before they start, using BeforeResume in schemachanger +- Test failing the jobs after they succeed, using BeforeUpdate in jobs +- Test failing a migration but succeeding schema changes +- Test do we wait for a mutation job to complete or not +- Test whether the schema changes are idempotent or not +*/ + +func validateSchemaExists( + t *testing.T, + ctx context.Context, + s serverutils.TestServerInterface, + sqlDB *gosql.DB, + expectExists bool, +) { + for _, stmt := range []string{ + "SELECT last_run, num_runs FROM system.jobs LIMIT 0", + "SELECT num_runs, last_run, claim_instance_id from system.jobs@jobs_run_stats_idx LIMIT 0", + } { + _, err := sqlDB.Exec(stmt) + if expectExists { + require.NoError( + t, err, "expected schema to exist, but unable to query it, using statement: %s", stmt, + ) + } else { + require.Error( + t, err, "expected schema to not exist, but queried it successfully, using statement: %s", stmt, + ) + } + } + + table := jobsTable(t, ctx, s) + str := "not have" + if expectExists { + str = "have" + } + for _, schema := range [...]struct { + name string + validationFn func(catalog.TableDescriptor, string) (bool, error) + }{ + {"num_runs", migrations.HasBackoffCols}, + {"last_run", migrations.HasBackoffCols}, + {"jobs_run_stats_idx", migrations.HasBackoffIndex}, + } { + updated, err := schema.validationFn(table, schema.name) + require.NoError(t, err) + require.Equal(t, expectExists, updated, + "expected jobs table to %s %s", str, schema) + } +} + +func jobsTable( + t *testing.T, ctx context.Context, s serverutils.TestServerInterface, +) catalog.TableDescriptor { + var table catalog.TableDescriptor + // Retrieve the jobs table. + err := descs.Txn(ctx, + s.ClusterSettings(), + s.LeaseManager().(*lease.Manager), + s.InternalExecutor().(sqlutil.InternalExecutor), + s.DB(), + func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ) (err error) { + table, err = descriptors.GetImmutableTableByID(ctx, txn, keys.JobsTableID, tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + AvoidCached: true, + Required: true, + }, + }) + return err + }) + require.NoError(t, err) + return table +} + +// validateJobExec creates and alters a dummy table to trigger jobs machinery, +// which validates its working. +func validateJobExec(t *testing.T, tdb *sqlutils.SQLRunner) { + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;") + tdb.Exec(t, "DROP TABLE foo CASCADE;") + var jobID int64 + tdb.QueryRow(t, ` +SELECT job_id + FROM [SHOW JOBS] + WHERE job_type = 'SCHEMA CHANGE GC' AND description LIKE '%foo%';`, + ).Scan(&jobID) + var status jobs.Status + tdb.QueryRow(t, + "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID, + ).Scan(&status) + require.Equal(t, jobs.StatusSucceeded, status) +} + +// deprecatedDescriptor returns the system.jobs table descriptor that was being used +// before adding two new columns and an index in the current version. +func deprecatedDescriptor() *descpb.TableDescriptor { + uniqueRowIDString := "unique_rowid()" + nowString := "now():::TIMESTAMP" + pk := func(name string) descpb.IndexDescriptor { + return descpb.IndexDescriptor{ + Name: tabledesc.PrimaryKeyIndexName, + ID: 1, + Unique: true, + KeyColumnNames: []string{name}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{1}, + } + } + + return &descpb.TableDescriptor{ + Name: "jobs", + ID: keys.JobsTableID, + ParentID: keys.SystemDatabaseID, + UnexposedParentSchemaID: keys.PublicSchemaID, + Version: 1, + Columns: []descpb.ColumnDescriptor{ + {Name: "id", ID: 1, Type: types.Int, DefaultExpr: &uniqueRowIDString}, + {Name: "status", ID: 2, Type: types.String}, + {Name: "created", ID: 3, Type: types.Timestamp, DefaultExpr: &nowString}, + {Name: "payload", ID: 4, Type: types.Bytes}, + {Name: "progress", ID: 5, Type: types.Bytes, Nullable: true}, + {Name: "created_by_type", ID: 6, Type: types.String, Nullable: true}, + {Name: "created_by_id", ID: 7, Type: types.Int, Nullable: true}, + {Name: "claim_session_id", ID: 8, Type: types.Bytes, Nullable: true}, + {Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true}, + }, + NextColumnID: 10, + Families: []descpb.ColumnFamilyDescriptor{ + { + Name: "fam_0_id_status_created_payload", + ID: 0, + ColumnNames: []string{"id", "status", "created", "payload", "created_by_type", "created_by_id"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 6, 7}, + }, + { + Name: "progress", + ID: 1, + ColumnNames: []string{"progress"}, + ColumnIDs: []descpb.ColumnID{5}, + DefaultColumnID: 5, + }, + { + Name: "claim", + ID: 2, + ColumnNames: []string{"claim_session_id", "claim_instance_id"}, + ColumnIDs: []descpb.ColumnID{8, 9}, + }, + }, + NextFamilyID: 3, + PrimaryIndex: pk("id"), + Indexes: []descpb.IndexDescriptor{ + { + Name: "jobs_status_created_idx", + ID: 2, + Unique: false, + KeyColumnNames: []string{"status", "created"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{2, 3}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + }, + { + Name: "jobs_created_by_type_created_by_id_idx", + ID: 3, + Unique: false, + KeyColumnNames: []string{"created_by_type", "created_by_id"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{6, 7}, + StoreColumnIDs: []descpb.ColumnID{2}, + StoreColumnNames: []string{"status"}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + }, + }, + NextIndexID: 4, + Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor( + descpb.SystemAllowedPrivileges[keys.JobsTableID], security.NodeUserName()), + FormatVersion: descpb.InterleavedFormatVersion, + NextMutationID: 1, + } +} diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 55e1360857f1..77cfdde454fc 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -181,12 +181,27 @@ CREATE TABLE system.jobs ( created_by_id INT, claim_session_id BYTES, claim_instance_id INT8, + num_runs INT8, + last_run TIMESTAMP, INDEX (status, created), INDEX (created_by_type, created_by_id) STORING (status), + INDEX jobs_run_stats_idx ( + claim_session_id, + status, + created + ) STORING(last_run, num_runs, claim_instance_id) + WHERE status + IN ( + 'running', + 'reverting', + 'pending', + 'pause-requested', + 'cancel-requested' + ), FAMILY fam_0_id_status_created_payload (id, status, created, payload, created_by_type, created_by_id), FAMILY progress (progress), - FAMILY claim (claim_session_id, claim_instance_id) + FAMILY claim (claim_session_id, claim_instance_id, num_runs, last_run) );` // web_sessions are used to track authenticated user actions over stateless @@ -883,8 +898,17 @@ var ( NextMutationID: 1, }) - nowString = "now():::TIMESTAMP" - nowTZString = "now():::TIMESTAMPTZ" + nowString = "now():::TIMESTAMP" + // The predicate to create a partial index + statsIdxPred = ` +status + IN ( + 'running':::STRING, + 'reverting':::STRING, + 'pending':::STRING, + 'pause-requested':::STRING, + 'cancel-requested':::STRING + )` // JobsTable is the descriptor for the jobs table. JobsTable = makeTable(descpb.TableDescriptor{ @@ -903,8 +927,10 @@ var ( {Name: "created_by_id", ID: 7, Type: types.Int, Nullable: true}, {Name: "claim_session_id", ID: 8, Type: types.Bytes, Nullable: true}, {Name: "claim_instance_id", ID: 9, Type: types.Int, Nullable: true}, + {Name: "num_runs", ID: 10, Type: types.Int, Nullable: true}, + {Name: "last_run", ID: 11, Type: types.Timestamp, Nullable: true}, }, - NextColumnID: 10, + NextColumnID: 12, Families: []descpb.ColumnFamilyDescriptor{ { // NB: We are using family name that existed prior to adding created_by_type and @@ -925,8 +951,8 @@ var ( { Name: "claim", ID: 2, - ColumnNames: []string{"claim_session_id", "claim_instance_id"}, - ColumnIDs: []descpb.ColumnID{8, 9}, + ColumnNames: []string{"claim_session_id", "claim_instance_id", "num_runs", "last_run"}, + ColumnIDs: []descpb.ColumnID{8, 9, 10, 11}, }, }, NextFamilyID: 3, @@ -954,8 +980,21 @@ var ( KeySuffixColumnIDs: []descpb.ColumnID{1}, Version: descpb.StrictIndexColumnIDGuaranteesVersion, }, + { + Name: "jobs_run_stats_idx", + ID: 4, + Unique: false, + KeyColumnNames: []string{"claim_session_id", "status", "created"}, + KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC, descpb.IndexDescriptor_ASC}, + KeyColumnIDs: []descpb.ColumnID{8, 2, 3}, + StoreColumnNames: []string{"last_run", "num_runs", "claim_instance_id"}, + StoreColumnIDs: []descpb.ColumnID{11, 10, 9}, + KeySuffixColumnIDs: []descpb.ColumnID{1}, + Version: descpb.StrictIndexColumnIDGuaranteesVersion, + Predicate: statsIdxPred, + }, }, - NextIndexID: 4, + NextIndexID: 5, Privileges: descpb.NewCustomSuperuserPrivilegeDescriptor( descpb.SystemAllowedPrivileges[keys.JobsTableID], security.NodeUserName()), FormatVersion: descpb.InterleavedFormatVersion, @@ -1650,6 +1689,8 @@ var ( NextMutationID: 1, }) + nowTZString = "now():::TIMESTAMPTZ" + // ScheduledJobsTable is the descriptor for the scheduled jobs table. ScheduledJobsTable = makeTable(descpb.TableDescriptor{ Name: "scheduled_jobs",