diff --git a/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java b/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java index 3ba046c2..8786c5a0 100644 --- a/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java +++ b/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java @@ -508,14 +508,16 @@ protected String getNextQueue() throws InterruptedException { */ protected String pop(final String curQueue) { final String key = key(QUEUE, curQueue); + final String now = Long.toString(System.currentTimeMillis()); + final String inflightKey = key(INFLIGHT, this.name, curQueue); switch (nextQueueStrategy) { - case DRAIN_WHILE_MESSAGES_EXISTS: - return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue), - JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); - case RESET_TO_HIGHEST_PRIORITY: - return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue); - default: - throw new RuntimeException("Unimplemented 'nextQueueStrategy'"); + case DRAIN_WHILE_MESSAGES_EXISTS: + return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, inflightKey, + JesqueUtils.createRecurringHashKey(key), now); + case RESET_TO_HIGHEST_PRIORITY: + return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 2, curQueue, inflightKey, now); + default: + throw new RuntimeException("Unimplemented 'nextQueueStrategy'"); } } diff --git a/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java b/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java index 9aadfe1f..cd01d598 100644 --- a/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java +++ b/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java @@ -526,6 +526,8 @@ protected String getNextQueue() throws InterruptedException { */ protected String pop(final String curQueue) { final String key = key(QUEUE, curQueue); + final String now = Long.toString(System.currentTimeMillis()); + final String inflightKey = key(INFLIGHT, this.name, curQueue); return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork() { /** * {@inheritDoc} @@ -534,10 +536,10 @@ protected String pop(final String curQueue) { public String doWork(final Jedis jedis) { switch (nextQueueStrategy) { case DRAIN_WHILE_MESSAGES_EXISTS: - return (String) jedis.evalsha(popScriptHash.get(), 3, key, key(INFLIGHT, name, curQueue), - JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis())); + return (String) jedis.evalsha(popScriptHash.get(), 3, key, inflightKey, + JesqueUtils.createRecurringHashKey(key), now); case RESET_TO_HIGHEST_PRIORITY: - return (String) jedis.evalsha(multiPriorityQueuesScriptHash.get(), 1, curQueue); + return (String) jedis.evalsha(multiPriorityQueuesScriptHash.get(), 2, curQueue, inflightKey, now); default: throw new RuntimeException("Unimplemented 'nextQueueStrategy'"); } diff --git a/src/main/resources/workerScripts/fromMultiplePriorityQueues.lua b/src/main/resources/workerScripts/fromMultiplePriorityQueues.lua index 4a06f6a0..1db6afdb 100644 --- a/src/main/resources/workerScripts/fromMultiplePriorityQueues.lua +++ b/src/main/resources/workerScripts/fromMultiplePriorityQueues.lua @@ -4,8 +4,11 @@ -- Time: 2:47 PM -- local queues = KEYS[1] +local inFlightKey = KEYS[2] --local debug = {'Hooray!', "'"..queues.."'"} +local now = ARGV[1] + local QUEUE_NAME_CAPTURING_REGEX = '([^,]+)' local OPTIONAL_COMMA_SEPARATOR = ',?' local OPTIONAL_SPACE_SEPARATOR = '%s*' @@ -15,8 +18,9 @@ for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do local queueName = 'resque:queue:' .. q local status, queueType = next(redis.call('TYPE', queueName)) local payload + if queueType == 'zset' then - local firstMsg = redis.call('ZRANGE', queueName, '0', '0') + local firstMsg = redis.call('ZRANGEBYSCORE', queueName, '-inf', now, 'LIMIT', '0', '1') if firstMsg ~= nil then payload = firstMsg[1] if payload ~= nil then @@ -28,6 +32,7 @@ for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do end if payload ~= nil then + redis.call('LPUSH', inFlightKey, payload) return payload end end diff --git a/src/main/resources/workerScripts/jesque_pop.lua b/src/main/resources/workerScripts/jesque_pop.lua index 7d9dca3d..234a281a 100644 --- a/src/main/resources/workerScripts/jesque_pop.lua +++ b/src/main/resources/workerScripts/jesque_pop.lua @@ -11,7 +11,7 @@ end local ok, queueType = next(redis.call('TYPE', queueKey)) if queueType == 'zset' then - local i, lPayload = next(redis.call('ZRANGEBYSCORE', queueKey, '-inf', now, 'WITHSCORES', 'LIMIT' , '0' , '1')) + local i, lPayload = next(redis.call('ZRANGEBYSCORE', queueKey, '-inf', now, 'LIMIT' , '0' , '1')) if lPayload then payload = lPayload local frequency = redis.call('HGET', freqKey, payload)