From 6b3ef8717ce9c9f8215abea37ebdba75c763aeae Mon Sep 17 00:00:00 2001 From: Florian Langenhahn Date: Fri, 8 Jun 2018 22:17:50 +0200 Subject: [PATCH] add runAt property to job --- src/main/java/net/greghaines/jesque/Job.java | 11 +++++++ .../meta/dao/impl/QueueInfoDAORedisImpl.java | 32 +++++++++++-------- .../dao/impl/TestQueueInfoDAORedisImpl.java | 9 +++--- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/src/main/java/net/greghaines/jesque/Job.java b/src/main/java/net/greghaines/jesque/Job.java index 02668d5b..b8f64203 100644 --- a/src/main/java/net/greghaines/jesque/Job.java +++ b/src/main/java/net/greghaines/jesque/Job.java @@ -43,6 +43,7 @@ public class Job implements Serializable { private Object[] args; private Map vars; private Map unknownFields = new HashMap(); + private Double runAt; // only set if this job belongs to a delayed queue /** * No-argument constructor. @@ -194,6 +195,16 @@ public void setVars(final Map vars) { this.vars = (Map)vars; } + @JsonIgnore + public Double getRunAt() { + return runAt; + } + + @JsonIgnore + public void setRunAt(Double runAt) { + this.runAt = runAt; + } + /** * @return true if this Job has a valid class name and arguments */ diff --git a/src/main/java/net/greghaines/jesque/meta/dao/impl/QueueInfoDAORedisImpl.java b/src/main/java/net/greghaines/jesque/meta/dao/impl/QueueInfoDAORedisImpl.java index d868c85a..69b0769a 100644 --- a/src/main/java/net/greghaines/jesque/meta/dao/impl/QueueInfoDAORedisImpl.java +++ b/src/main/java/net/greghaines/jesque/meta/dao/impl/QueueInfoDAORedisImpl.java @@ -21,9 +21,9 @@ import static net.greghaines.jesque.utils.ResqueConstants.STAT; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import net.greghaines.jesque.Config; import net.greghaines.jesque.Job; @@ -35,6 +35,7 @@ import net.greghaines.jesque.utils.PoolUtils; import net.greghaines.jesque.utils.PoolUtils.PoolWork; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Tuple; import redis.clients.util.Pool; /** @@ -161,11 +162,7 @@ public QueueInfo doWork(final Jedis jedis) throws Exception { queueInfo.setName(name); queueInfo.setSize(size(jedis, name)); queueInfo.setDelayed(delayed(jedis, name)); - final Collection payloads = paylods(jedis, name, jobOffset, jobCount); - final List jobs = new ArrayList(payloads.size()); - for (final String payload : payloads) { - jobs.add(ObjectMapperFactory.get().readValue(payload, Job.class)); - } + final List jobs = getJobs(jedis, name, jobOffset, jobCount); queueInfo.setJobs(jobs); return queueInfo; } @@ -225,7 +222,7 @@ private long size(final Jedis jedis, final String queueName) { } /** - * Get list of payload from a queue. + * Get list of Jobs from a queue. * * @param jedis * @param queueName @@ -233,14 +230,23 @@ private long size(final Jedis jedis, final String queueName) { * @param jobCount * @return */ - private Collection paylods(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) { + private List getJobs(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) throws Exception { final String key = key(QUEUE, queueName); - final Collection payloads; - if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGE - payloads = jedis.zrange(key, jobOffset, jobOffset + jobCount - 1); + final List jobs = new ArrayList<>(); + if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGEWITHSCORES + final Set elements = jedis.zrangeWithScores(key, jobOffset, jobOffset + jobCount - 1); + for (final Tuple elementWithScore : elements) { + final Job job = ObjectMapperFactory.get().readValue(elementWithScore.getElement(), Job.class); + job.setRunAt(elementWithScore.getScore()); + jobs.add(job); + } } else { // Else, use LRANGE - payloads = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1); + final List elements = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1); + for (final String element : elements) { + jobs.add(ObjectMapperFactory.get().readValue(element, Job.class)); + } } - return payloads; + return jobs; } + } diff --git a/src/test/java/net/greghaines/jesque/meta/dao/impl/TestQueueInfoDAORedisImpl.java b/src/test/java/net/greghaines/jesque/meta/dao/impl/TestQueueInfoDAORedisImpl.java index e04780c1..4a0a9c31 100644 --- a/src/test/java/net/greghaines/jesque/meta/dao/impl/TestQueueInfoDAORedisImpl.java +++ b/src/test/java/net/greghaines/jesque/meta/dao/impl/TestQueueInfoDAORedisImpl.java @@ -35,6 +35,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import redis.clients.jedis.Jedis; +import redis.clients.jedis.Tuple; import redis.clients.util.Pool; public class TestQueueInfoDAORedisImpl { @@ -215,14 +216,14 @@ public void testGetQueueInfo_ZSet() throws JsonProcessingException { final long jobOffset = 1; final long jobCount = 2; final long size = 4; - final Collection payloads = new HashSet(); - payloads.add(ObjectMapperFactory.get().writeValueAsString(new Job("foo"))); - payloads.add(ObjectMapperFactory.get().writeValueAsString(new Job("bar"))); + final Set payloads = new HashSet<>(); + payloads.add(new Tuple(ObjectMapperFactory.get().writeValueAsString(new Job("foo")), 1d)); + payloads.add(new Tuple(ObjectMapperFactory.get().writeValueAsString(new Job("bar")), 1d)); this.mockCtx.checking(new Expectations(){{ oneOf(pool).getResource(); will(returnValue(jedis)); exactly(3).of(jedis).type(queueKey); will(returnValue(KeyType.ZSET.toString())); oneOf(jedis).zcard(queueKey); will(returnValue(size)); - oneOf(jedis).zrange(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads)); + oneOf(jedis).zrangeWithScores(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads)); oneOf(jedis).close(); }}); final QueueInfo queueInfo = this.qInfoDAO.getQueueInfo(name, jobOffset, jobCount);