Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker/server: update metrics on requeue #4461

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/osbuild-service-maintenance/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {

res, err := json.Marshal(result)
require.NoError(t, err)
err = q.RequeueOrFinishJob(id, 0, res)
requeued, err := q.RequeueOrFinishJob(id, 0, res)
require.NoError(t, err)
require.False(t, requeued)

_, _, r, _, _, _, _, _, _, err := q.JobStatus(id)
require.NoError(t, err)
Expand Down
24 changes: 12 additions & 12 deletions internal/jobqueue/fsjobqueue/fsjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func New(dir string) (*fsJobQueue, error) {
if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled {
// Fail older running jobs which don't have a token stored
if j.Token == uuid.Nil {
err = q.RequeueOrFinishJob(j.Id, 0, nil)
_, err = q.RequeueOrFinishJob(j.Id, 0, nil)
if err != nil {
return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err)
}
Expand Down Expand Up @@ -297,21 +297,21 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.U
return j.Token, j.Dependencies, j.Type, j.Args, nil
}

func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()

j, err := q.readJob(id)
if err != nil {
return err
return false, err
}

if j.Canceled {
return jobqueue.ErrCanceled
return false, jobqueue.ErrCanceled
}

if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
return false, jobqueue.ErrNotRunning
}

delete(q.jobIdByToken, j.Token)
Expand All @@ -326,26 +326,27 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result

j.Result, err = json.Marshal(result)
if err != nil {
return fmt.Errorf("error marshaling result: %v", err)
return false, fmt.Errorf("error marshaling result: %v", err)
}

// Write before notifying dependants, because it will be read again.
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
return false, fmt.Errorf("error writing job %s: %v", id, err)
}

for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
return err
return false, err
}
err = q.maybeEnqueue(dep, false)
if err != nil {
return err
return false, err
}
}
delete(q.dependants, id)
return false, nil
} else {
j.Token = uuid.Nil
j.StartedAt = time.Time{}
Expand All @@ -355,7 +356,7 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return fmt.Errorf("cannot write job: %v", err)
return false, fmt.Errorf("cannot write job: %v", err)
}

// add the job to the list of pending ones
Expand All @@ -368,9 +369,8 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
default:
}
}
return true, nil
}

return nil
}

func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
Expand Down
43 changes: 27 additions & 16 deletions internal/jobqueue/jobqueuetest/jobqueuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result
require.Equal(t, jobType, typ)
require.NotNil(t, args)

err = q.RequeueOrFinishJob(id, 0, result)
requeued, err := q.RequeueOrFinishJob(id, 0, result)
require.NoError(t, err)
require.False(t, requeued)

return id
}
Expand All @@ -91,8 +92,9 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) {
require.NoError(t, err)
require.Equal(t, id, idFromT)

err = q.RequeueOrFinishJob(id, 0, nil)
requeued, err := q.RequeueOrFinishJob(id, 0, nil)
require.NoError(t, err)
require.False(t, requeued)

// Make sure the token gets removed
id, err = q.IdFromToken(tok)
Expand Down Expand Up @@ -366,7 +368,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
_, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.Error(t, err)

// Cancel a running job, which should not dequeue the canceled job from above
Expand All @@ -386,7 +388,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
_, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.Error(t, err)

// Cancel a finished job, which is a no-op
Expand All @@ -399,8 +401,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
err = q.CancelJob(id)
require.Error(t, err)
require.Equal(t, jobqueue.ErrNotRunning, err)
Expand All @@ -414,13 +417,13 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {

func testRequeue(t *testing.T, q jobqueue.JobQueue) {
// Requeue a non-existing job
err := q.RequeueOrFinishJob(uuid.New(), 1, nil)
_, err := q.RequeueOrFinishJob(uuid.New(), 1, nil)
require.Error(t, err)

// Requeue a pending job
id := pushTestJob(t, q, "clownfish", nil, nil, "")
require.NotEmpty(t, id)
err = q.RequeueOrFinishJob(id, 1, nil)
_, err = q.RequeueOrFinishJob(id, 1, nil)
require.Error(t, err)

// Requeue a running job
Expand All @@ -431,8 +434,9 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) {
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.RequeueOrFinishJob(id, 1, nil)
requeued, err := q.RequeueOrFinishJob(id, 1, nil)
require.NoError(t, err)
require.True(t, requeued)
r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
Expand All @@ -446,11 +450,12 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.False(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)

// Requeue a finished job
err = q.RequeueOrFinishJob(id, 1, nil)
_, err = q.RequeueOrFinishJob(id, 1, nil)
require.Error(t, err)
}

Expand All @@ -461,8 +466,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) {
_, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
// Requeue once
err = q.RequeueOrFinishJob(id, 1, nil)
requeued, err := q.RequeueOrFinishJob(id, 1, nil)
require.NoError(t, err)
require.True(t, requeued)
// Start again
_, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
Expand All @@ -471,8 +477,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) {
require.True(t, finished.IsZero())
require.Nil(t, result)
// Requeue a second time, this time finishing it
err = q.RequeueOrFinishJob(id, 1, &testResult{})
requeued, err = q.RequeueOrFinishJob(id, 1, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
_, _, result, _, _, finished, _, _, _, err = q.JobStatus(id)
require.NoError(t, err)
require.False(t, finished.IsZero())
Expand Down Expand Up @@ -504,8 +511,9 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
require.NoError(t, err)
require.Equal(t, id2, id)

err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)

// No heartbeats for finished job
require.Empty(t, q.Heartbeats(time.Second*0))
Expand All @@ -526,8 +534,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, "octopus", typ)
require.NotNil(t, args)

err = q.RequeueOrFinishJob(one, 0, nil)
requeued, err := q.RequeueOrFinishJob(one, 0, nil)
require.NoError(t, err)
require.False(t, requeued)

require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil))
})
Expand All @@ -552,8 +561,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
_, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil)
require.Equal(t, jobqueue.ErrNotPending, err)

err = q.RequeueOrFinishJob(one, 0, nil)
requeued, err := q.RequeueOrFinishJob(one, 0, nil)
require.NoError(t, err)
require.False(t, requeued)

_, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil)
require.Equal(t, jobqueue.ErrNotPending, err)
Expand Down Expand Up @@ -732,8 +742,9 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) {
err = q.UpdateWorkerStatus(uuid.New())
require.Equal(t, err, jobqueue.ErrWorkerNotExist)

err = q.RequeueOrFinishJob(one, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(one, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)

err = q.DeleteWorker(w1)
require.NoError(t, err)
Expand Down
10 changes: 9 additions & 1 deletion internal/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result j
}
}

err = s.jobs.RequeueOrFinishJob(jobId, maxRetries, result)
requeued, err := s.jobs.RequeueOrFinishJob(jobId, maxRetries, result)
if err != nil {
switch err {
case jobqueue.ErrNotRunning:
Expand All @@ -718,6 +718,14 @@ func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result j
}
}

if requeued {
jobInfo, err := s.jobInfo(jobId, nil)
if err != nil {
return fmt.Errorf("error requeueing job: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more like error getting job details for prometheus metrics
should we skip prometheus metrics here and try to continue, rather than fail?
although this will then probably fail some lines below…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's very unlikely that this call will fail if the job exists.

}
prometheus.EnqueueJobMetrics(jobInfo.JobType, jobInfo.Channel)
}

jobType, err := s.JobType(jobId)
if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions pkg/jobqueue/dbjobqueue/dbjobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,16 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u
return token, dependencies, jobType, args, nil
}

func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %w", err)
return false, fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()

tx, err := conn.Begin(context.Background())
if err != nil {
return fmt.Errorf("error starting database transaction: %w", err)
return false, fmt.Errorf("error starting database transaction: %w", err)
}
defer func() {
err = tx.Rollback(context.Background())
Expand All @@ -513,60 +513,61 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
canceled := false
err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &retries, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
if canceled {
return jobqueue.ErrCanceled
return false, jobqueue.ErrCanceled
}
if started == nil || finished != nil {
return jobqueue.ErrNotRunning
return false, jobqueue.ErrNotRunning
}

// Remove from heartbeats if token is null
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error removing job %s from heartbeats: %w", id, err)
return false, fmt.Errorf("error removing job %s from heartbeats: %w", id, err)
}

if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}

if retries >= maxRetries {
err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %w", id, err)
return false, fmt.Errorf("error finishing job %s: %w", id, err)
}
} else {
tag, err = tx.Exec(context.Background(), sqlRequeue, id)
if err != nil {
return fmt.Errorf("error requeueing job %s: %w", id, err)
return false, fmt.Errorf("error requeueing job %s: %w", id, err)
}

if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
}

_, err = tx.Exec(context.Background(), sqlNotify)
if err != nil {
return fmt.Errorf("error notifying jobs channel: %w", err)
return false, fmt.Errorf("error notifying jobs channel: %w", err)
}

err = tx.Commit(context.Background())
if err != nil {
return fmt.Errorf("unable to commit database transaction: %w", err)
return false, fmt.Errorf("unable to commit database transaction: %w", err)
}

if retries >= maxRetries {
q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String())
return false, nil
} else {
q.logger.Info("Requeued job", "job_type", jobType, "job_id", id.String())
return true, nil
}
return nil
}

func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobqueue/jobqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ type JobQueue interface {

// Tries to requeue a running job by its ID
//
// Returns the given job to the pending state. If the job has reached
// the maxRetries number of retries already, finish the job instead.
// If the job has reached the maxRetries number of retries already, finish the job instead.
// `result` must fit the associated job type and must be serializable to JSON.
RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error
// Fills in result, and returns if the job was requeued, or an error.
RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error)

// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error
Expand Down
Loading