Skip to content

Commit

Permalink
add runAt property to job
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Langenhahn committed Aug 4, 2018
1 parent 12884f9 commit f21a108
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
11 changes: 11 additions & 0 deletions src/main/java/net/greghaines/jesque/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Job implements Serializable {
private Object[] args;
private Map<String,Object> vars;
private Map<String,Object> unknownFields = new HashMap<String,Object>();
private Double runAt; // only set if this job belongs to a delayed queue

/**
* No-argument constructor.
Expand Down Expand Up @@ -194,6 +195,16 @@ public void setVars(final Map<String, ? extends Object> vars) {
this.vars = (Map<String, Object>)vars;
}

@JsonIgnore
public Double getRunAt() {
return runAt;
}

@JsonIgnore
public void setRunAt(Double runAt) {
this.runAt = runAt;
}

/**
* @return true if this Job has a valid class name and arguments
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import static net.greghaines.jesque.utils.ResqueConstants.QUEUES;
import static net.greghaines.jesque.utils.ResqueConstants.STAT;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.*;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
Expand All @@ -35,6 +32,7 @@
import net.greghaines.jesque.utils.PoolUtils;
import net.greghaines.jesque.utils.PoolUtils.PoolWork;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import redis.clients.util.Pool;

/**
Expand Down Expand Up @@ -161,10 +159,12 @@ public QueueInfo doWork(final Jedis jedis) throws Exception {
queueInfo.setName(name);
queueInfo.setSize(size(jedis, name));
queueInfo.setDelayed(delayed(jedis, name));
final Collection<String> payloads = paylods(jedis, name, jobOffset, jobCount);
final List<Job> jobs = new ArrayList<Job>(payloads.size());
for (final String payload : payloads) {
jobs.add(ObjectMapperFactory.get().readValue(payload, Job.class));
final Collection<JobPayload> payloads = getJobPayloads(jedis, name, jobOffset, jobCount);
final List<Job> jobs = new ArrayList<>(payloads.size());
for (final JobPayload jobPayload : payloads) {
final Job job = ObjectMapperFactory.get().readValue(jobPayload.element, Job.class);
job.setRunAt(jobPayload.score);
jobs.add(job);
}
queueInfo.setJobs(jobs);
return queueInfo;
Expand Down Expand Up @@ -225,22 +225,39 @@ private long size(final Jedis jedis, final String queueName) {
}

/**
* Get list of payload from a queue.
* Get list of JobPayload from a queue.
*
* @param jedis
* @param queueName
* @param jobOffset
* @param jobCount
* @return
*/
private Collection<String> paylods(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) {
private Collection<JobPayload> getJobPayloads(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) {
final String key = key(QUEUE, queueName);
final Collection<String> payloads;
if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGE
payloads = jedis.zrange(key, jobOffset, jobOffset + jobCount - 1);
final Collection<JobPayload> jobPayloads = new ArrayList<>();
if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGEWITHSCORES
final Set<Tuple> elements = jedis.zrangeWithScores(key, jobOffset, jobOffset + jobCount - 1);
for (final Tuple elementWithScore : elements) {
jobPayloads.add(new JobPayload(elementWithScore.getElement(), elementWithScore.getScore()));
}
} else { // Else, use LRANGE
payloads = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1);
final List<String> elements = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1);
for (final String element : elements) {
jobPayloads.add(new JobPayload(element, null));
}
}
return payloads;
return jobPayloads;
}

private class JobPayload {
String element;
Double score;

JobPayload(String element, Double score) {
this.element = element;
this.score = score;
}
}

}

0 comments on commit f21a108

Please sign in to comment.