diff --git a/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java b/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java index 3b114b03..1cf66e75 100644 --- a/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java +++ b/src/main/java/net/greghaines/jesque/worker/WorkerImpl.java @@ -464,7 +464,7 @@ protected void poll() { } } catch (JsonParseException | JsonMappingException e) { // If the job JSON is not deserializable, we never want to submit it again... - removeInFlight(curQueue); + removeInFlight(curQueue, true); recoverFromException(curQueue, e); } catch (Exception e) { recoverFromException(curQueue, e); @@ -594,6 +594,7 @@ protected void checkPaused() throws IOException { * @param curQueue the queue the payload came from */ protected void process(final Job job, final String curQueue) { + boolean success = false; try { this.processingJob.set(true); if (threadNameChangingEnabled) { @@ -604,17 +605,18 @@ protected void process(final Job job, final String curQueue) { final Object instance = this.jobFactory.materializeJob(job); final Object result = execute(job, curQueue, instance); success(job, instance, result, curQueue); + success = true; } catch (Throwable thrwbl) { failure(thrwbl, job, curQueue); } finally { - removeInFlight(curQueue); + removeInFlight(curQueue, success); this.jedis.del(key(WORKER, this.name)); this.processingJob.set(false); } } - private void removeInFlight(final String curQueue) { - if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) { + private void removeInFlight(final String curQueue, boolean skipRequeue) { + if (SHUTDOWN_IMMEDIATE.equals(this.state.get()) && !skipRequeue) { lpoplpush(key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue)); } else { this.jedis.lpop(key(INFLIGHT, this.name, curQueue)); diff --git a/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java b/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java index 4860b47e..f35011d7 100644 --- a/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java +++ b/src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java @@ -482,7 +482,7 @@ protected void poll() { */ @Override public Void doWork(final Jedis jedis) { - removeInFlight(jedis, fCurQueue); + removeInFlight(jedis, fCurQueue, true); return null; } }); @@ -621,6 +621,7 @@ public Void doWork(final Jedis jedis) { * @param curQueue the queue the payload came from */ protected void process(final Job job, final String curQueue) { + boolean success = false; try { this.processingJob.set(true); if (threadNameChangingEnabled) { @@ -640,16 +641,18 @@ public Void doWork(final Jedis jedis) throws IOException { final Object instance = this.jobFactory.materializeJob(job); final Object result = execute(job, curQueue, instance); success(job, instance, result, curQueue); + success = true; } catch (Throwable thrwbl) { failure(thrwbl, job, curQueue); } finally { + final boolean skipReque = success; PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) { - removeInFlight(jedis, curQueue); + removeInFlight(jedis, curQueue, skipReque); jedis.del(key(WORKER, name)); return null; } @@ -658,8 +661,8 @@ public Void doWork(final Jedis jedis) { } } - private void removeInFlight(final Jedis jedis, final String curQueue) { - if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) { + private void removeInFlight(final Jedis jedis, final String curQueue, boolean skipRequeue) { + if (SHUTDOWN_IMMEDIATE.equals(this.state.get()) && !skipRequeue) { lpoplpush(jedis, key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue)); } else { jedis.lpop(key(INFLIGHT, this.name, curQueue)); diff --git a/src/test/java/net/greghaines/jesque/DurabilityTest.java b/src/test/java/net/greghaines/jesque/DurabilityTest.java index 4d0bf3bf..630623a0 100644 --- a/src/test/java/net/greghaines/jesque/DurabilityTest.java +++ b/src/test/java/net/greghaines/jesque/DurabilityTest.java @@ -26,10 +26,12 @@ * * * @author Daniƫl de Kok + * @author Florian Langenhahn */ public class DurabilityTest { private static final Job sleepJob = new Job("SleepAction", 3000L); + private static final Job sleepWithExceptionJob = new Job("SleepWithExceptionAction", 3000L); private static final Config config = new ConfigBuilder().build(); @Before @@ -65,7 +67,7 @@ public void testNotInterrupted() throws InterruptedException, JsonProcessingExce } @Test - public void testInterrupted() throws InterruptedException, JsonProcessingException { + public void testInterruptedNoExceptionJobSucceeds() { final String queue = "bar"; TestUtils.enqueueJobs(queue, Arrays.asList(sleepJob), config); @@ -76,15 +78,32 @@ public void testInterrupted() throws InterruptedException, JsonProcessingExcepti TestUtils.stopWorker(worker, workerThread, true); + final Jedis jedis = TestUtils.createJedis(config); + Assert.assertTrue("Job should not be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 0L); + Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker, queue)) == 0L); + } + + @Test + public void testInterrupted() throws JsonProcessingException { + final String queue = "bar"; + TestUtils.enqueueJobs(queue, Arrays.asList(sleepWithExceptionJob), config); + + final Worker worker = new WorkerImpl(config, Arrays.asList(queue), + new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("SleepWithExceptionAction", SleepWithExceptionAction.class)))); + final Thread workerThread = new Thread(worker); + workerThread.start(); + + TestUtils.stopWorker(worker, workerThread, true); + final Jedis jedis = TestUtils.createJedis(config); Assert.assertTrue("Job should be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 1L); - Assert.assertEquals("Incorrect job was requeued", ObjectMapperFactory.get().writeValueAsString(sleepJob), + Assert.assertEquals("Incorrect job was requeued", ObjectMapperFactory.get().writeValueAsString(sleepWithExceptionJob), jedis.lindex(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue), 0)); Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker, queue)) == 0L); } @Test - public void testJSONException() throws InterruptedException, JsonProcessingException { + public void testJSONException() throws InterruptedException { final String queue = "baz"; final Jedis jedis = TestUtils.createJedis(config); diff --git a/src/test/java/net/greghaines/jesque/SleepWithExceptionAction.java b/src/test/java/net/greghaines/jesque/SleepWithExceptionAction.java new file mode 100644 index 00000000..254a2751 --- /dev/null +++ b/src/test/java/net/greghaines/jesque/SleepWithExceptionAction.java @@ -0,0 +1,28 @@ +package net.greghaines.jesque; + +import java.util.concurrent.Callable; + +/** + * An action that sleeps for the given number of milliseconds and may throw an exception. + * + * @author Florian Langenhahn + */ +public class SleepWithExceptionAction implements Callable { + + private final int millis; + + /** + * Construct a sleep action. + * + * @param millis The number of milliseconds to sleep. + */ + public SleepWithExceptionAction(int millis) { + this.millis = millis; + } + + @Override + public Void call() throws Exception { + Thread.sleep(millis); + return null; + } +}