Skip to content

Commit

Permalink
add runAt property to job
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Langenhahn committed Aug 13, 2018
1 parent 12884f9 commit 6b3ef87
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 17 deletions.
11 changes: 11 additions & 0 deletions src/main/java/net/greghaines/jesque/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Job implements Serializable {
private Object[] args;
private Map<String,Object> vars;
private Map<String,Object> unknownFields = new HashMap<String,Object>();
private Double runAt; // only set if this job belongs to a delayed queue

/**
* No-argument constructor.
Expand Down Expand Up @@ -194,6 +195,16 @@ public void setVars(final Map<String, ? extends Object> vars) {
this.vars = (Map<String, Object>)vars;
}

@JsonIgnore
public Double getRunAt() {
return runAt;
}

@JsonIgnore
public void setRunAt(Double runAt) {
this.runAt = runAt;
}

/**
* @return true if this Job has a valid class name and arguments
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import static net.greghaines.jesque.utils.ResqueConstants.STAT;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
Expand All @@ -35,6 +35,7 @@
import net.greghaines.jesque.utils.PoolUtils;
import net.greghaines.jesque.utils.PoolUtils.PoolWork;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import redis.clients.util.Pool;

/**
Expand Down Expand Up @@ -161,11 +162,7 @@ public QueueInfo doWork(final Jedis jedis) throws Exception {
queueInfo.setName(name);
queueInfo.setSize(size(jedis, name));
queueInfo.setDelayed(delayed(jedis, name));
final Collection<String> payloads = paylods(jedis, name, jobOffset, jobCount);
final List<Job> jobs = new ArrayList<Job>(payloads.size());
for (final String payload : payloads) {
jobs.add(ObjectMapperFactory.get().readValue(payload, Job.class));
}
final List<Job> jobs = getJobs(jedis, name, jobOffset, jobCount);
queueInfo.setJobs(jobs);
return queueInfo;
}
Expand Down Expand Up @@ -225,22 +222,31 @@ private long size(final Jedis jedis, final String queueName) {
}

/**
* Get list of payload from a queue.
* Get list of Jobs from a queue.
*
* @param jedis
* @param queueName
* @param jobOffset
* @param jobCount
* @return
*/
private Collection<String> paylods(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) {
private List<Job> getJobs(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) throws Exception {
final String key = key(QUEUE, queueName);
final Collection<String> payloads;
if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGE
payloads = jedis.zrange(key, jobOffset, jobOffset + jobCount - 1);
final List<Job> jobs = new ArrayList<>();
if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGEWITHSCORES
final Set<Tuple> elements = jedis.zrangeWithScores(key, jobOffset, jobOffset + jobCount - 1);
for (final Tuple elementWithScore : elements) {
final Job job = ObjectMapperFactory.get().readValue(elementWithScore.getElement(), Job.class);
job.setRunAt(elementWithScore.getScore());
jobs.add(job);
}
} else { // Else, use LRANGE
payloads = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1);
final List<String> elements = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1);
for (final String element : elements) {
jobs.add(ObjectMapperFactory.get().readValue(element, Job.class));
}
}
return payloads;
return jobs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import redis.clients.util.Pool;

public class TestQueueInfoDAORedisImpl {
Expand Down Expand Up @@ -215,14 +216,14 @@ public void testGetQueueInfo_ZSet() throws JsonProcessingException {
final long jobOffset = 1;
final long jobCount = 2;
final long size = 4;
final Collection<String> payloads = new HashSet<String>();
payloads.add(ObjectMapperFactory.get().writeValueAsString(new Job("foo")));
payloads.add(ObjectMapperFactory.get().writeValueAsString(new Job("bar")));
final Set<Tuple> payloads = new HashSet<>();
payloads.add(new Tuple(ObjectMapperFactory.get().writeValueAsString(new Job("foo")), 1d));
payloads.add(new Tuple(ObjectMapperFactory.get().writeValueAsString(new Job("bar")), 1d));
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
exactly(3).of(jedis).type(queueKey); will(returnValue(KeyType.ZSET.toString()));
oneOf(jedis).zcard(queueKey); will(returnValue(size));
oneOf(jedis).zrange(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads));
oneOf(jedis).zrangeWithScores(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads));
oneOf(jedis).close();
}});
final QueueInfo queueInfo = this.qInfoDAO.getQueueInfo(name, jobOffset, jobCount);
Expand Down

0 comments on commit 6b3ef87

Please sign in to comment.