From 254f86b46a54fc9d4f99d548fa1cbc86dc75a1c1 Mon Sep 17 00:00:00 2001 From: Steve Ellis Date: Sun, 2 Jun 2019 15:14:50 -0400 Subject: [PATCH 1/4] add helper to easily create a spec with a string --- core/internal/cltest/cltest.go | 11 ++++++++--- core/internal/cltest/fixtures.go | 10 +--------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 754e112d52e..1ab4c6a4672 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -557,15 +557,20 @@ func FindServiceAgreement(t testing.TB, s *strpkg.Store, id string) models.Servi // CreateJobSpecViaWeb creates a jobspec via web using /v2/specs func CreateJobSpecViaWeb(t testing.TB, app *TestApplication, job models.JobSpec) models.JobSpec { - client := app.NewHTTPClient() marshaled, err := json.Marshal(&job) assert.NoError(t, err) - resp, cleanup := client.Post("/v2/specs", bytes.NewBuffer(marshaled)) + return CreateSpecViaWeb(t, app, string(marshaled)) +} + +// CreateJobSpecViaWeb creates a jobspec via web using /v2/specs +func CreateSpecViaWeb(t testing.TB, app *TestApplication, spec string) models.JobSpec { + client := app.NewHTTPClient() + resp, cleanup := client.Post("/v2/specs", bytes.NewBufferString(spec)) defer cleanup() AssertServerResponse(t, resp, 200) var createdJob models.JobSpec - err = ParseJSONAPIResponse(t, resp, &createdJob) + err := ParseJSONAPIResponse(t, resp, &createdJob) require.NoError(t, err) return createdJob } diff --git a/core/internal/cltest/fixtures.go b/core/internal/cltest/fixtures.go index 6bba9d4167a..e9c3dd34235 100644 --- a/core/internal/cltest/fixtures.go +++ b/core/internal/cltest/fixtures.go @@ -16,15 +16,7 @@ const ( // FixtureCreateJobViaWeb creates a job from a fixture using /v2/specs func FixtureCreateJobViaWeb(t *testing.T, app *TestApplication, path string) models.JobSpec { - client := app.NewHTTPClient() - resp, cleanup := client.Post("/v2/specs", bytes.NewBuffer(MustReadFile(t, path))) - defer cleanup() - AssertServerResponse(t, resp, 200) - - var job models.JobSpec - err := ParseJSONAPIResponse(t, resp, &job) - require.NoError(t, err) - return job + return CreateSpecViaWeb(t, app, string(MustReadFile(t, path))) } // FixtureCreateServiceAgreementViaWeb creates a service agreement from a fixture using /v2/service_agreements From 65564113d259ef440de72fd57b6304fdc8c7d8f2 Mon Sep 17 00:00:00 2001 From: Steve Ellis Date: Sun, 2 Jun 2019 18:43:43 -0400 Subject: [PATCH 2/4] add an optional duration to run status test helper --- core/internal/cltest/cltest.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 1ab4c6a4672..2f3de76c3cb 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -700,14 +700,21 @@ func JobRunStays( store *strpkg.Store, jr models.JobRun, status models.RunStatus, + optionalDuration ...time.Duration, ) models.JobRun { t.Helper() + + duration := time.Second + if len(optionalDuration) > 0 { + duration = optionalDuration[0] + } + var err error gomega.NewGomegaWithT(t).Consistently(func() models.RunStatus { jr, err = store.FindJobRun(jr.ID) assert.NoError(t, err) return jr.Status - }).Should(gomega.Equal(status)) + }, duration).Should(gomega.Equal(status)) return jr } From 8cc3f46200d713b5646addedf219ac4eedd19f8d Mon Sep 17 00:00:00 2001 From: Steve Ellis Date: Sun, 2 Jun 2019 18:48:14 -0400 Subject: [PATCH 3/4] fix templatized sleep adapter - sleep adapter was not working when the `until` parameter was specified on as a run request parameter --- core/internal/cltest/cltest.go | 9 +++++++++ core/internal/features_test.go | 19 +++++++++++++++++++ core/services/job_runner.go | 31 ++++++++++++++++++++----------- core/services/runs.go | 4 +--- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 2f3de76c3cb..95e64e1d1b5 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -677,6 +677,15 @@ func WaitForJobRunToPendConfirmations( return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingConfirmations) } +// WaitForJobRunToPendSleep waits for a JobRun to reach PendingBridge Status +func WaitForJobRunToPendSleep( + t testing.TB, + store *strpkg.Store, + jr models.JobRun, +) models.JobRun { + return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingSleep) +} + // WaitForJobRunStatus waits for a JobRun to reach given status func WaitForJobRunStatus( t testing.TB, diff --git a/core/internal/features_test.go b/core/internal/features_test.go index ebaa5d52020..7ca1646b555 100644 --- a/core/internal/features_test.go +++ b/core/internal/features_test.go @@ -648,3 +648,22 @@ func TestIntegration_SyncJobRuns(t *testing.T) { require.NoError(t, err) assert.Equal(t, j.ID, run.JobSpecID) } + +func TestIntegration_SleepAdapter(t *testing.T) { + t.Parallel() + + sleepSeconds := 3 + specJSON := "{\"initiators\":[{\"type\":\"web\"}],\"tasks\":[{\"type\":\"sleep\"},{\"type\":\"noop\"}]}" + app, cleanup := cltest.NewApplication(t) + defer cleanup() + app.Start() + + j := cltest.CreateSpecViaWeb(t, app, specJSON) + + runInput := fmt.Sprintf("{\"until\": \"%s\"}", time.Now().Local().Add(time.Second*time.Duration(sleepSeconds))) + jr := cltest.CreateJobRunViaWeb(t, app, j, runInput) + + cltest.WaitForJobRunToPendSleep(t, app.Store, jr) + cltest.JobRunStays(t, app.Store, jr, models.RunStatusPendingSleep, time.Second*2) + cltest.WaitForJobRunToComplete(t, app.Store, jr) +} diff --git a/core/services/job_runner.go b/core/services/job_runner.go index 52a4cbd43d3..15f85ea817f 100644 --- a/core/services/job_runner.go +++ b/core/services/job_runner.go @@ -200,33 +200,42 @@ func prepareTaskInput(run *models.JobRun, input models.JSON) (models.JSON, error return input, nil } -func executeTask(run *models.JobRun, currentTaskRun *models.TaskRun, store *store.Store) models.RunResult { - taskCopy := currentTaskRun.TaskSpec // deliberately copied to keep mutations local - - var err error - if taskCopy.Params, err = taskCopy.Params.Merge(run.Overrides.Data); err != nil { - currentTaskRun.Result.SetError(err) - return currentTaskRun.Result +func prepareAdapter( + taskRun *models.TaskRun, + data models.JSON, + store *store.Store, +) (*adapters.PipelineAdapter, error) { + taskCopy := taskRun.TaskSpec // deliberately copied to keep mutations local + + merged, err := taskCopy.Params.Merge(data) + if err != nil { + return nil, err } + taskCopy.Params = merged - adapter, err := adapters.For(taskCopy, store) + return adapters.For(taskCopy, store) +} + +func executeTask(run *models.JobRun, currentTaskRun *models.TaskRun, store *store.Store) models.RunResult { + adapter, err := prepareAdapter(currentTaskRun, run.Overrides.Data, store) if err != nil { currentTaskRun.Result.SetError(err) return currentTaskRun.Result } - logger.Infow(fmt.Sprintf("Processing task %s", taskCopy.Type), []interface{}{"task", currentTaskRun.ID}...) - data, err := prepareTaskInput(run, currentTaskRun.Result.Data) if err != nil { currentTaskRun.Result.SetError(err) return currentTaskRun.Result } + taskSpec := currentTaskRun.TaskSpec + logger.Infow(fmt.Sprintf("Processing task %s", taskSpec.Type), []interface{}{"task", currentTaskRun.ID}...) + currentTaskRun.Result.Data = data result := adapter.Perform(currentTaskRun.Result, store) - logger.Infow(fmt.Sprintf("Finished processing task %s", taskCopy.Type), []interface{}{ + logger.Infow(fmt.Sprintf("Finished processing task %s", taskSpec.Type), []interface{}{ "task", currentTaskRun.ID, "result", result.Status, "result_data", result.Data, diff --git a/core/services/runs.go b/core/services/runs.go index 6ea837e4d31..efc32611dd8 100644 --- a/core/services/runs.go +++ b/core/services/runs.go @@ -208,7 +208,6 @@ func ResumePendingTask( store *store.Store, input models.RunResult, ) error { - logger.Debugw("External adapter resuming job", []interface{}{ "run", run.ID, "job", run.JobSpecID, @@ -260,8 +259,7 @@ func QueueSleepingTask( return fmt.Errorf("Attempting to resume sleeping run with non sleeping task %s", run.ID) } - adapter, err := adapters.For(currentTaskRun.TaskSpec, store) - + adapter, err := prepareAdapter(currentTaskRun, run.Overrides.Data, store) if err != nil { currentTaskRun.SetError(err) run.SetError(err) From b12c985a3235a12929bd8118e59b5e5c267c43d2 Mon Sep 17 00:00:00 2001 From: Steve Ellis Date: Sun, 2 Jun 2019 19:17:54 -0400 Subject: [PATCH 4/4] create fixture instead of putting spec in test --- core/internal/features_test.go | 3 +-- core/internal/testdata/sleep_job.json | 9 +++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) create mode 100644 core/internal/testdata/sleep_job.json diff --git a/core/internal/features_test.go b/core/internal/features_test.go index 7ca1646b555..12326e91cad 100644 --- a/core/internal/features_test.go +++ b/core/internal/features_test.go @@ -653,12 +653,11 @@ func TestIntegration_SleepAdapter(t *testing.T) { t.Parallel() sleepSeconds := 3 - specJSON := "{\"initiators\":[{\"type\":\"web\"}],\"tasks\":[{\"type\":\"sleep\"},{\"type\":\"noop\"}]}" app, cleanup := cltest.NewApplication(t) defer cleanup() app.Start() - j := cltest.CreateSpecViaWeb(t, app, specJSON) + j := cltest.FixtureCreateJobViaWeb(t, app, "./testdata/sleep_job.json") runInput := fmt.Sprintf("{\"until\": \"%s\"}", time.Now().Local().Add(time.Second*time.Duration(sleepSeconds))) jr := cltest.CreateJobRunViaWeb(t, app, j, runInput) diff --git a/core/internal/testdata/sleep_job.json b/core/internal/testdata/sleep_job.json new file mode 100644 index 00000000000..8efab386258 --- /dev/null +++ b/core/internal/testdata/sleep_job.json @@ -0,0 +1,9 @@ +{ + "initiators": [ + {"type": "web"} + ], + "tasks": [ + {"type": "sleep"}, + {"type": "noop"} + ] +}