Skip to content

Commit

Permalink
Merge pull request #133 from uberall/fix-fromMultiplePriorityQueues
Browse files Browse the repository at this point in the history
properly handle delayed queues when using RESET_TO_HIGHEST_PRIORITY s…
  • Loading branch information
gresrun authored Jan 16, 2018
2 parents 51760f4 + 5b0327e commit f67c8ff
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
16 changes: 9 additions & 7 deletions src/main/java/net/greghaines/jesque/worker/WorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,16 @@ protected String getNextQueue() throws InterruptedException {
*/
protected String pop(final String curQueue) {
final String key = key(QUEUE, curQueue);
final String now = Long.toString(System.currentTimeMillis());
final String inflightKey = key(INFLIGHT, this.name, curQueue);
switch (nextQueueStrategy) {
case DRAIN_WHILE_MESSAGES_EXISTS:
return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, key(INFLIGHT, this.name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
case RESET_TO_HIGHEST_PRIORITY:
return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 1, curQueue);
default:
throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
case DRAIN_WHILE_MESSAGES_EXISTS:
return (String) this.jedis.evalsha(this.popScriptHash.get(), 3, key, inflightKey,
JesqueUtils.createRecurringHashKey(key), now);
case RESET_TO_HIGHEST_PRIORITY:
return (String) this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 2, curQueue, inflightKey, now);
default:
throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ protected String getNextQueue() throws InterruptedException {
*/
protected String pop(final String curQueue) {
final String key = key(QUEUE, curQueue);
final String now = Long.toString(System.currentTimeMillis());
final String inflightKey = key(INFLIGHT, this.name, curQueue);
return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, String>() {
/**
* {@inheritDoc}
Expand All @@ -534,10 +536,10 @@ protected String pop(final String curQueue) {
public String doWork(final Jedis jedis) {
switch (nextQueueStrategy) {
case DRAIN_WHILE_MESSAGES_EXISTS:
return (String) jedis.evalsha(popScriptHash.get(), 3, key, key(INFLIGHT, name, curQueue),
JesqueUtils.createRecurringHashKey(key), Long.toString(System.currentTimeMillis()));
return (String) jedis.evalsha(popScriptHash.get(), 3, key, inflightKey,
JesqueUtils.createRecurringHashKey(key), now);
case RESET_TO_HIGHEST_PRIORITY:
return (String) jedis.evalsha(multiPriorityQueuesScriptHash.get(), 1, curQueue);
return (String) jedis.evalsha(multiPriorityQueuesScriptHash.get(), 2, curQueue, inflightKey, now);
default:
throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
-- Time: 2:47 PM
--
local queues = KEYS[1]
local inFlightKey = KEYS[2]
--local debug = {'Hooray!', "'"..queues.."'"}

local now = ARGV[1]

local QUEUE_NAME_CAPTURING_REGEX = '([^,]+)'
local OPTIONAL_COMMA_SEPARATOR = ',?'
local OPTIONAL_SPACE_SEPARATOR = '%s*'
Expand All @@ -15,8 +18,9 @@ for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do
local queueName = 'resque:queue:' .. q
local status, queueType = next(redis.call('TYPE', queueName))
local payload

if queueType == 'zset' then
local firstMsg = redis.call('ZRANGE', queueName, '0', '0')
local firstMsg = redis.call('ZRANGEBYSCORE', queueName, '-inf', now, 'LIMIT', '0', '1')
if firstMsg ~= nil then
payload = firstMsg[1]
if payload ~= nil then
Expand All @@ -28,6 +32,7 @@ for q in queues.gmatch(queues, NEXT_QUEUE_REGEX) do
end

if payload ~= nil then
redis.call('LPUSH', inFlightKey, payload)
return payload
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/workerScripts/jesque_pop.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ end

local ok, queueType = next(redis.call('TYPE', queueKey))
if queueType == 'zset' then
local i, lPayload = next(redis.call('ZRANGEBYSCORE', queueKey, '-inf', now, 'WITHSCORES', 'LIMIT' , '0' , '1'))
local i, lPayload = next(redis.call('ZRANGEBYSCORE', queueKey, '-inf', now, 'LIMIT' , '0' , '1'))
if lPayload then
payload = lPayload
local frequency = redis.call('HGET', freqKey, payload)
Expand Down

0 comments on commit f67c8ff

Please sign in to comment.