Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sleep adapter behavior #1309

Merged
merged 4 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,20 @@ func FindServiceAgreement(t testing.TB, s *strpkg.Store, id string) models.Servi

// CreateJobSpecViaWeb creates a jobspec via web using /v2/specs
func CreateJobSpecViaWeb(t testing.TB, app *TestApplication, job models.JobSpec) models.JobSpec {
client := app.NewHTTPClient()
marshaled, err := json.Marshal(&job)
assert.NoError(t, err)
resp, cleanup := client.Post("/v2/specs", bytes.NewBuffer(marshaled))
return CreateSpecViaWeb(t, app, string(marshaled))
}

// CreateJobSpecViaWeb creates a jobspec via web using /v2/specs
func CreateSpecViaWeb(t testing.TB, app *TestApplication, spec string) models.JobSpec {
client := app.NewHTTPClient()
resp, cleanup := client.Post("/v2/specs", bytes.NewBufferString(spec))
defer cleanup()
AssertServerResponse(t, resp, 200)

var createdJob models.JobSpec
err = ParseJSONAPIResponse(t, resp, &createdJob)
err := ParseJSONAPIResponse(t, resp, &createdJob)
require.NoError(t, err)
return createdJob
}
Expand Down Expand Up @@ -672,6 +677,15 @@ func WaitForJobRunToPendConfirmations(
return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingConfirmations)
}

// WaitForJobRunToPendSleep waits for a JobRun to reach PendingBridge Status
func WaitForJobRunToPendSleep(
t testing.TB,
store *strpkg.Store,
jr models.JobRun,
) models.JobRun {
return WaitForJobRunStatus(t, store, jr, models.RunStatusPendingSleep)
}

// WaitForJobRunStatus waits for a JobRun to reach given status
func WaitForJobRunStatus(
t testing.TB,
Expand All @@ -695,14 +709,21 @@ func JobRunStays(
store *strpkg.Store,
jr models.JobRun,
status models.RunStatus,
optionalDuration ...time.Duration,
) models.JobRun {
t.Helper()

duration := time.Second
if len(optionalDuration) > 0 {
duration = optionalDuration[0]
}

var err error
gomega.NewGomegaWithT(t).Consistently(func() models.RunStatus {
jr, err = store.FindJobRun(jr.ID)
assert.NoError(t, err)
return jr.Status
}).Should(gomega.Equal(status))
}, duration).Should(gomega.Equal(status))
return jr
}

Expand Down
10 changes: 1 addition & 9 deletions core/internal/cltest/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ const (

// FixtureCreateJobViaWeb creates a job from a fixture using /v2/specs
func FixtureCreateJobViaWeb(t *testing.T, app *TestApplication, path string) models.JobSpec {
client := app.NewHTTPClient()
resp, cleanup := client.Post("/v2/specs", bytes.NewBuffer(MustReadFile(t, path)))
defer cleanup()
AssertServerResponse(t, resp, 200)

var job models.JobSpec
err := ParseJSONAPIResponse(t, resp, &job)
require.NoError(t, err)
return job
return CreateSpecViaWeb(t, app, string(MustReadFile(t, path)))
}

// FixtureCreateServiceAgreementViaWeb creates a service agreement from a fixture using /v2/service_agreements
Expand Down
18 changes: 18 additions & 0 deletions core/internal/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,3 +648,21 @@ func TestIntegration_SyncJobRuns(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, j.ID, run.JobSpecID)
}

func TestIntegration_SleepAdapter(t *testing.T) {
t.Parallel()

sleepSeconds := 3
app, cleanup := cltest.NewApplication(t)
defer cleanup()
app.Start()

j := cltest.FixtureCreateJobViaWeb(t, app, "./testdata/sleep_job.json")

runInput := fmt.Sprintf("{\"until\": \"%s\"}", time.Now().Local().Add(time.Second*time.Duration(sleepSeconds)))
jr := cltest.CreateJobRunViaWeb(t, app, j, runInput)

cltest.WaitForJobRunToPendSleep(t, app.Store, jr)
cltest.JobRunStays(t, app.Store, jr, models.RunStatusPendingSleep, time.Second*2)
cltest.WaitForJobRunToComplete(t, app.Store, jr)
}
9 changes: 9 additions & 0 deletions core/internal/testdata/sleep_job.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"initiators": [
{"type": "web"}
],
"tasks": [
{"type": "sleep"},
{"type": "noop"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix this in another PR.

]
}
31 changes: 20 additions & 11 deletions core/services/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,33 +200,42 @@ func prepareTaskInput(run *models.JobRun, input models.JSON) (models.JSON, error
return input, nil
}

func executeTask(run *models.JobRun, currentTaskRun *models.TaskRun, store *store.Store) models.RunResult {
taskCopy := currentTaskRun.TaskSpec // deliberately copied to keep mutations local

var err error
if taskCopy.Params, err = taskCopy.Params.Merge(run.Overrides.Data); err != nil {
currentTaskRun.Result.SetError(err)
return currentTaskRun.Result
func prepareAdapter(
taskRun *models.TaskRun,
data models.JSON,
store *store.Store,
) (*adapters.PipelineAdapter, error) {
taskCopy := taskRun.TaskSpec // deliberately copied to keep mutations local

merged, err := taskCopy.Params.Merge(data)
if err != nil {
return nil, err
}
taskCopy.Params = merged

adapter, err := adapters.For(taskCopy, store)
return adapters.For(taskCopy, store)
}

func executeTask(run *models.JobRun, currentTaskRun *models.TaskRun, store *store.Store) models.RunResult {
adapter, err := prepareAdapter(currentTaskRun, run.Overrides.Data, store)
if err != nil {
currentTaskRun.Result.SetError(err)
return currentTaskRun.Result
}

logger.Infow(fmt.Sprintf("Processing task %s", taskCopy.Type), []interface{}{"task", currentTaskRun.ID}...)

data, err := prepareTaskInput(run, currentTaskRun.Result.Data)
if err != nil {
currentTaskRun.Result.SetError(err)
return currentTaskRun.Result
}

taskSpec := currentTaskRun.TaskSpec
logger.Infow(fmt.Sprintf("Processing task %s", taskSpec.Type), []interface{}{"task", currentTaskRun.ID}...)

currentTaskRun.Result.Data = data
result := adapter.Perform(currentTaskRun.Result, store)

logger.Infow(fmt.Sprintf("Finished processing task %s", taskCopy.Type), []interface{}{
logger.Infow(fmt.Sprintf("Finished processing task %s", taskSpec.Type), []interface{}{
"task", currentTaskRun.ID,
"result", result.Status,
"result_data", result.Data,
Expand Down
4 changes: 1 addition & 3 deletions core/services/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ func ResumePendingTask(
store *store.Store,
input models.RunResult,
) error {

logger.Debugw("External adapter resuming job", []interface{}{
"run", run.ID,
"job", run.JobSpecID,
Expand Down Expand Up @@ -260,8 +259,7 @@ func QueueSleepingTask(
return fmt.Errorf("Attempting to resume sleeping run with non sleeping task %s", run.ID)
}

adapter, err := adapters.For(currentTaskRun.TaskSpec, store)

adapter, err := prepareAdapter(currentTaskRun, run.Overrides.Data, store)
if err != nil {
currentTaskRun.SetError(err)
run.SetError(err)
Expand Down