Skip to content

Commit

Permalink
Merge pull request #149 from uberall/reload-scripts-on-reconnect
Browse files Browse the repository at this point in the history
Reload redis scripts on reconnect
  • Loading branch information
gresrun authored Aug 27, 2018
2 parents 7e40392 + b510425 commit 88bab76
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2012 Greg Haines
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.greghaines.jesque.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisNoScriptException;

import static net.greghaines.jesque.worker.RecoveryStrategy.*;

/**
* DefaultPoolExceptionHandler reconnects if there is a connection exception, proceeds if the exception was
* JSON-related or a thread interrupt and terminates if the executor is shutdown.
*/
public class DefaultPoolExceptionHandler implements ExceptionHandler {

/**
* {@inheritDoc}
*/
@Override
public RecoveryStrategy onException(final JobExecutor jobExecutor, final Exception exception,
final String curQueue) {
final boolean isLoadingDataset = exception instanceof JedisDataException
&& exception.getMessage().equals("LOADING Redis is loading the dataset in memory");

if (exception instanceof JedisConnectionException
|| exception instanceof JedisNoScriptException
|| isLoadingDataset) {
return RECONNECT;
} else if (exception instanceof JsonProcessingException
|| (exception instanceof InterruptedException && !jobExecutor.isShutdown())) {
return PROCEED;
} else {
return TERMINATE;
}
}
}
39 changes: 33 additions & 6 deletions src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisNoScriptException;
import redis.clients.util.Pool;

import com.fasterxml.jackson.core.JsonParseException;
Expand All @@ -65,6 +66,7 @@ public class WorkerPoolImpl implements Worker {
private static final Logger LOG = LoggerFactory.getLogger(WorkerPoolImpl.class);
private static final AtomicLong WORKER_COUNTER = new AtomicLong(0);
protected static final long EMPTY_QUEUE_SLEEP_TIME = 500; // 500 ms
protected static final long RECONNECT_SLEEP_TIME = 5000; // 5 sec
private static final String LPOPLPUSH_LUA = "/workerScripts/jesque_lpoplpush.lua";
private static final String POP_LUA = "/workerScripts/jesque_pop.lua";
private static final String POP_FROM_MULTIPLE_PRIO_QUEUES = "/workerScripts/fromMultiplePriorityQueues.lua";
Expand Down Expand Up @@ -126,7 +128,7 @@ protected static void checkQueues(final Iterable<String> queues) {
private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
private final AtomicReference<Thread> threadRef = new AtomicReference<Thread>(null);
private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<ExceptionHandler>(
new DefaultExceptionHandler());
new DefaultPoolExceptionHandler());
private final AtomicReference<FailQueueStrategy> failQueueStrategyRef;
private final JobFactory jobFactory;

Expand Down Expand Up @@ -209,10 +211,7 @@ public Void doWork(final Jedis jedis) throws IOException {
jedis.sadd(key(WORKERS), name);
jedis.set(key(WORKER, name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date()));
listenerDelegate.fireEvent(WORKER_START, WorkerPoolImpl.this, null, null, null, null, null);
popScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
lpoplpushScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
multiPriorityQueuesScriptHash
.set(jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
loadRedisScripts(jedis);
return null;
}
});
Expand Down Expand Up @@ -557,7 +556,16 @@ protected void recoverFromException(final String curQueue, final Exception ex) {
final RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, ex, curQueue);
switch (recoveryStrategy) {
case RECONNECT:
LOG.info("Ignoring RECONNECT strategy in response to exception because this is a pool", ex);
if (ex instanceof JedisNoScriptException) {
LOG.info("Got JedisNoScriptException while reconnecting, reloading Redis scripts");
loadRedisScripts();
} else {
LOG.info("Waiting " + RECONNECT_SLEEP_TIME + "ms for pool to reconnect to redis", ex);
try {
Thread.sleep(RECONNECT_SLEEP_TIME);
} catch (InterruptedException e) {
}
}
break;
case TERMINATE:
LOG.warn("Terminating in response to exception", ex);
Expand Down Expand Up @@ -864,6 +872,25 @@ protected String lpoplpush(final Jedis jedis, final String from, final String to
return (String) jedis.evalsha(this.lpoplpushScriptHash.get(), 2, from, to);
}

protected void loadRedisScripts(Jedis jedis) throws IOException {
popScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
lpoplpushScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
multiPriorityQueuesScriptHash.set(jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
}

protected void loadRedisScripts() {
PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
/**
* {@inheritDoc}
*/
@Override
public Void doWork(final Jedis jedis) throws IOException {
loadRedisScripts(jedis);
return null;
}
});
}

/**
* {@inheritDoc}
*/
Expand Down

0 comments on commit 88bab76

Please sign in to comment.