Skip to content

Commit

Permalink
Implement kick-job (#13)
Browse files Browse the repository at this point in the history
* feat: implement kick job

* tests: add tests cases for kick job

* feat: make kick job public

* tests: add test cases for kick

* just return err

* rename to kick and accept job as param
  • Loading branch information
jalerson authored Mar 10, 2021
1 parent 3b24fc7 commit 887d002
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 8 deletions.
40 changes: 32 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,20 @@ func (conn *Conn) command(_ context.Context, format string, params ...interface{
return id, body[:size], nil

case "KICKED":
if len(parts) != 2 {
switch len(parts) {
case 1: // response of kick-job: KICKED
return 1, nil, nil
case 2: // response of kick: KICKED <count>
count, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return 0, nil, err
}

return count, nil, nil
default:
return 0, nil, ErrUnexpected
}

count, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return 0, nil, err
}

return count, nil, nil

case "DELETED", "RELEASED", "TOUCHED", "USING", "WATCHING":
return 0, nil, nil
case "BURIED":
Expand Down Expand Up @@ -260,6 +263,27 @@ func (conn *Conn) Kick(ctx context.Context, tube string, bound int) (int64, erro
return int64(count), nil
}

func (conn *Conn) kick(ctx context.Context, job *Job) error {
ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.kick")
defer span.End()

if job == nil {
return ErrNotFound
}

// If the tube is different than the last time, switch tubes.
if job.Stats.Tube != conn.lastTube {
if _, _, err := conn.command(ctx, "use %s", job.Stats.Tube); err != nil {
return err
}

conn.lastTube = job.Stats.Tube
}

_, _, err := conn.lcommand(ctx, "kick-job %d", job.ID)
return err
}

// ListTubes returns a list of available tubes.
func (conn *Conn) ListTubes(ctx context.Context) ([]string, error) {
_, body, err := conn.lcommand(ctx, "list-tubes")
Expand Down
89 changes: 89 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,95 @@ func TestConn(t *testing.T) {
})
})

t.Run("Kick", func(t *testing.T) {
server.HandleFunc(func(line Line) string {
switch {
case line.At(1, "use default"):
return "USING default"
case line.At(2, "kick 10"):
return "KICKED 10"
default:
t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line)
}

return ""
})

count, err := conn.Kick(ctx, "default", 10)
if err != nil {
t.Fatalf("Error kicking jobs: %s", err)
}

if count != 10 {
t.Fatalf("Unexpected number of kicked jobs, expected %d, actual %d", 10, count)
}

t.Run("NoBuriedJobs", func(t *testing.T) {
server.HandleFunc(func(line Line) string {
switch {
case line.At(1, "kick 10"):
return "KICKED 0"
default:
t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line)
}

return ""
})

count, err := conn.Kick(ctx, "default", 10)
switch {
case err != nil:
t.Fatalf("Error kicking job: %s", err)
case count != 0:
t.Fatalf("Unexpected number of kicked jobs, expected %d, actual %d", 0, count)
}
})
})

t.Run("kick", func(t *testing.T) {
server.HandleFunc(func(line Line) string {
switch {
case line.At(1, "kick-job 1"):
return "KICKED"
default:
t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line)
}

return ""
})

job := Job{ID: 1}
job.Stats.Tube = "default"

if err := conn.kick(ctx, &job); err != nil {
t.Fatalf("Error kicking job: %s", err)
}

// NotFound tests what happens when the NOT_FOUND error is returned.
t.Run("JobNotFound", func(t *testing.T) {
server.HandleFunc(func(line Line) string {
switch {
case line.At(1, "kick-job 1"):
return "NOT_FOUND"
default:
t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line)
}

return ""
})

job := Job{ID: 1}
job.Stats.Tube = "default"

err := conn.kick(ctx, &job)
switch {
case err == ErrNotFound:
case err != nil:
t.Fatalf("Error kicking job: %s", err)
}
})
})

t.Run("PeekDelayed", func(t *testing.T) {
server.HandleFunc(func(line Line) string {
switch {
Expand Down
9 changes: 9 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ func (job *Job) TouchAfter() time.Duration {

return time.Until(job.ReservedAt.Add(job.Stats.TimeLeft))
}

// Kick moves the job into the ready queue.
func (job *Job) Kick(ctx context.Context) error {
if job.conn == nil {
return ErrJobFinished
}

return job.conn.kick(ctx, job)
}

0 comments on commit 887d002

Please sign in to comment.