-
Notifications
You must be signed in to change notification settings - Fork 131
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow tests to be independent. Failed when trying to release.
- Loading branch information
Showing
1 changed file
with
24 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,68 +21,69 @@ | |
* <p/> | ||
* <ul> | ||
* <li>A job should be in the in-flight list while being processed, but not afterwards.</li> | ||
* <li>A job should be requeued when the worker is shut down immediately.</li> | ||
* <li>A job should <i>not</i> be requeued when the worker shuts down after completing the job.</li> | ||
* <li>A job should be re-queued when the worker is shut down immediately.</li> | ||
* <li>A job should <i>not</i> be re-queued when the worker shuts down after completing the job.</li> | ||
* </ul> | ||
* | ||
* @author Daniël de Kok <[email protected]> | ||
*/ | ||
public class DurabilityTest { | ||
private static final Job sleepJob = new Job("SleepAction", (Long) 3000L); | ||
|
||
|
||
private static final Job sleepJob = new Job("SleepAction", 3000L); | ||
private static final Config config = new ConfigBuilder().build(); | ||
|
||
private static final String testQueue = "foo"; | ||
|
||
@Before | ||
public void resetRedis() { | ||
TestUtils.resetRedis(config); | ||
} | ||
|
||
@Test | ||
public void testNotInterrupted() throws InterruptedException, JsonProcessingException { | ||
TestUtils.enqueueJobs(testQueue, Arrays.asList(sleepJob), config); | ||
final String queue = "foo"; | ||
TestUtils.enqueueJobs(queue, Arrays.asList(sleepJob), config); | ||
|
||
final Worker worker = new WorkerImpl(config, Arrays.asList(testQueue), | ||
final Worker worker = new WorkerImpl(config, Arrays.asList(queue), | ||
new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("SleepAction", SleepAction.class)))); | ||
final Thread workerThread = new Thread(worker); | ||
workerThread.start(); | ||
|
||
Thread.sleep(1000); | ||
|
||
Jedis jedis = TestUtils.createJedis(config); | ||
Assert.assertTrue("In-flight list should have length one when running the job", | ||
jedis.llen(inFlightKey(worker)) == 1L); | ||
final Jedis jedis = TestUtils.createJedis(config); | ||
Assert.assertEquals("In-flight list should have length one when running the job", | ||
jedis.llen(inFlightKey(worker, queue)), (Long)1L); | ||
Assert.assertEquals("Object on the in-flight list should be the first job", | ||
ObjectMapperFactory.get().writeValueAsString(sleepJob), jedis.lindex(inFlightKey(worker), 0)); | ||
ObjectMapperFactory.get().writeValueAsString(sleepJob), | ||
jedis.lindex(inFlightKey(worker, queue), 0)); | ||
|
||
TestUtils.stopWorker(worker, workerThread, false); | ||
|
||
Assert.assertTrue("The job should not be requeued after succesful processing", | ||
jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, testQueue)) == 0L); | ||
Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker)) == 0L); | ||
jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 0L); | ||
Assert.assertEquals("In-flight list should be empty when finishing a job", | ||
jedis.llen(inFlightKey(worker, queue)), (Long)0L); | ||
} | ||
|
||
@Test | ||
public void testInterrupted() throws InterruptedException, JsonProcessingException { | ||
TestUtils.enqueueJobs(testQueue, Arrays.asList(sleepJob), config); | ||
final String queue = "bar"; | ||
TestUtils.enqueueJobs(queue, Arrays.asList(sleepJob), config); | ||
|
||
final Worker worker = new WorkerImpl(config, Arrays.asList(testQueue), | ||
final Worker worker = new WorkerImpl(config, Arrays.asList(queue), | ||
new MapBasedJobFactory(JesqueUtils.map(JesqueUtils.entry("SleepAction", SleepAction.class)))); | ||
final Thread workerThread = new Thread(worker); | ||
workerThread.start(); | ||
|
||
TestUtils.stopWorker(worker, workerThread, true); | ||
|
||
Jedis jedis = TestUtils.createJedis(config); | ||
Assert.assertTrue("Job should be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, testQueue)) == 1L); | ||
final Jedis jedis = TestUtils.createJedis(config); | ||
Assert.assertTrue("Job should be requeued", jedis.llen(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue)) == 1L); | ||
Assert.assertEquals("Incorrect job was requeued", ObjectMapperFactory.get().writeValueAsString(sleepJob), | ||
jedis.lindex(JesqueUtils.createKey(config.getNamespace(), QUEUE, testQueue), 0)); | ||
Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker)) == 0L); | ||
jedis.lindex(JesqueUtils.createKey(config.getNamespace(), QUEUE, queue), 0)); | ||
Assert.assertTrue("In-flight list should be empty when finishing a job", jedis.llen(inFlightKey(worker, queue)) == 0L); | ||
} | ||
|
||
private String inFlightKey(Worker worker) { | ||
return JesqueUtils.createKey(config.getNamespace(), ResqueConstants.INFLIGHT, worker.getName(), testQueue); | ||
private static String inFlightKey(final Worker worker, final String queue) { | ||
return JesqueUtils.createKey(config.getNamespace(), ResqueConstants.INFLIGHT, worker.getName(), queue); | ||
} | ||
|
||
} |