Skip to content

Commit

Permalink
Merge pull request #147 from uberall/successful-job-re-queue
Browse files Browse the repository at this point in the history
don't re-enqueue a successful job if worker is in SHUTDOWN_IMMEDIATE …
  • Loading branch information
gresrun committed Aug 13, 2018
2 parents 12884f9 + 27519ee commit 9ac3111
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 11 deletions.
10 changes: 6 additions & 4 deletions src/main/java/net/greghaines/jesque/worker/WorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ protected void poll() {
}
} catch (JsonParseException | JsonMappingException e) {
// If the job JSON is not deserializable, we never want to submit it again...
removeInFlight(curQueue);
removeInFlight(curQueue, true);
recoverFromException(curQueue, e);
} catch (Exception e) {
recoverFromException(curQueue, e);
Expand Down Expand Up @@ -594,6 +594,7 @@ protected void checkPaused() throws IOException {
* @param curQueue the queue the payload came from
*/
protected void process(final Job job, final String curQueue) {
boolean success = false;
try {
this.processingJob.set(true);
if (threadNameChangingEnabled) {
Expand All @@ -604,17 +605,18 @@ protected void process(final Job job, final String curQueue) {
final Object instance = this.jobFactory.materializeJob(job);
final Object result = execute(job, curQueue, instance);
success(job, instance, result, curQueue);
success = true;
} catch (Throwable thrwbl) {
failure(thrwbl, job, curQueue);
} finally {
removeInFlight(curQueue);
removeInFlight(curQueue, success);
this.jedis.del(key(WORKER, this.name));
this.processingJob.set(false);
}
}

private void removeInFlight(final String curQueue) {
if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) {
private void removeInFlight(final String curQueue, boolean skipRequeue) {
if (SHUTDOWN_IMMEDIATE.equals(this.state.get()) && !skipRequeue) {
lpoplpush(key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue));
} else {
this.jedis.lpop(key(INFLIGHT, this.name, curQueue));
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/net/greghaines/jesque/worker/WorkerPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ protected void poll() {
*/
@Override
public Void doWork(final Jedis jedis) {
removeInFlight(jedis, fCurQueue);
removeInFlight(jedis, fCurQueue, true);
return null;
}
});
Expand Down Expand Up @@ -621,6 +621,7 @@ public Void doWork(final Jedis jedis) {
* @param curQueue the queue the payload came from
*/
protected void process(final Job job, final String curQueue) {
boolean success = false;
try {
this.processingJob.set(true);
if (threadNameChangingEnabled) {
Expand All @@ -640,16 +641,18 @@ public Void doWork(final Jedis jedis) throws IOException {
final Object instance = this.jobFactory.materializeJob(job);
final Object result = execute(job, curQueue, instance);
success(job, instance, result, curQueue);
success = true;
} catch (Throwable thrwbl) {
failure(thrwbl, job, curQueue);
} finally {
final boolean skipReque = success;
PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
/**
* {@inheritDoc}
*/
@Override
public Void doWork(final Jedis jedis) {
removeInFlight(jedis, curQueue);
removeInFlight(jedis, curQueue, skipReque);
jedis.del(key(WORKER, name));
return null;
}
Expand All @@ -658,8 +661,8 @@ public Void doWork(final Jedis jedis) {
}
}

private void removeInFlight(final Jedis jedis, final String curQueue) {
if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) {
private void removeInFlight(final Jedis jedis, final String curQueue, boolean skipRequeue) {
if (SHUTDOWN_IMMEDIATE.equals(this.state.get()) && !skipRequeue) {
lpoplpush(jedis, key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue));
} else {
jedis.lpop(key(INFLIGHT, this.name, curQueue));
Expand Down
25 changes: 22 additions & 3 deletions src/test/java/net/greghaines/jesque/DurabilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
* </ul>
*
* @author Daniël de Kok
* @author Florian Langenhahn
*/
public class DurabilityTest {

private static final Job sleepJob = new Job("SleepAction", 3000L);
private static final Job sleepWithExceptionJob = new Job("SleepWithExceptionAction", 3000L);
private static final Config config = new ConfigBuilder().build();

@Before
Expand Down Expand Up @@ -65,7 +67,7 @@ public void testNotInterrupted() throws InterruptedException, JsonProcessingExce
}

@Test
public void testInterrupted() throws InterruptedException, JsonProcessingException {
public void testInterruptedNoExceptionJobSucceeds() {
final String queue = "bar";
TestUtils.enqueueJobs(queue, Arrays.asList(sleepJob), config);

Expand All @@ -76,15 +78,32 @@ public void testInterrupted() throws InterruptedException, JsonProcessingExcepti

TestUtils.stopWorker(worker, workerThread, true);

final Jedis jedis = TestUtils.createJedis(config);
Assert.assertTrue("Job should not be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 0L);
Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker, queue)) == 0L);
}

@Test
public void testInterrupted() throws JsonProcessingException {
final String queue = "bar";
TestUtils.enqueueJobs(queue, Arrays.asList(sleepWithExceptionJob), config);

final Worker worker = new WorkerImpl(config, Arrays.asList(queue),
new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("SleepWithExceptionAction", SleepWithExceptionAction.class))));
final Thread workerThread = new Thread(worker);
workerThread.start();

TestUtils.stopWorker(worker, workerThread, true);

final Jedis jedis = TestUtils.createJedis(config);
Assert.assertTrue("Job should be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 1L);
Assert.assertEquals("Incorrect job was requeued", ObjectMapperFactory.get().writeValueAsString(sleepJob),
Assert.assertEquals("Incorrect job was requeued", ObjectMapperFactory.get().writeValueAsString(sleepWithExceptionJob),
jedis.lindex(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue), 0));
Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker, queue)) == 0L);
}

@Test
public void testJSONException() throws InterruptedException, JsonProcessingException {
public void testJSONException() throws InterruptedException {
final String queue = "baz";

final Jedis jedis = TestUtils.createJedis(config);
Expand Down
28 changes: 28 additions & 0 deletions src/test/java/net/greghaines/jesque/SleepWithExceptionAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package net.greghaines.jesque;

import java.util.concurrent.Callable;

/**
* An action that sleeps for the given number of milliseconds and may throw an exception.
*
* @author Florian Langenhahn
*/
public class SleepWithExceptionAction implements Callable<Void> {

private final int millis;

/**
* Construct a sleep action.
*
* @param millis The number of milliseconds to sleep.
*/
public SleepWithExceptionAction(int millis) {
this.millis = millis;
}

@Override
public Void call() throws Exception {
Thread.sleep(millis);
return null;
}
}

0 comments on commit 9ac3111

Please sign in to comment.