Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow batch enqueueing of jobs using Redis Pipelining #154

Merged
merged 3 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ worker.end(true);
try { workerThread.join(); } catch (Exception e){ e.printStackTrace(); }
```

If enqueueing multiple jobs at the same time, there is `client.batchEnqueue(String queue, List<Job> jobs)` which does it
in an optimized way.

### Delayed jobs
Delayed jobs can be executed at sometime in the future.
```java
Expand Down
90 changes: 84 additions & 6 deletions src/main/java/net/greghaines/jesque/client/AbstractClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
*/
package net.greghaines.jesque.client;

import static net.greghaines.jesque.utils.ResqueConstants.QUEUE;
import static net.greghaines.jesque.utils.ResqueConstants.QUEUES;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Transaction;

import java.util.ArrayList;
import java.util.List;

import static net.greghaines.jesque.utils.ResqueConstants.QUEUE;
import static net.greghaines.jesque.utils.ResqueConstants.QUEUES;

/**
* Common logic for Client implementations.
*
Expand Down Expand Up @@ -82,6 +86,31 @@ public void enqueue(final String queue, final Job job) {
}
}

/**
* {@inheritDoc}
*/
@Override
public void batchEnqueue(String queue, List<Job> jobs) {
if (jobs == null) {
throw new IllegalArgumentException("job list must not be null");
}
validateQueue(queue);
for (Job job : jobs) {
validateJob(job);
}
List<String> serializedJobs = new ArrayList<>(jobs.size());
try {
for (Job job : jobs) {
serializedJobs.add(ObjectMapperFactory.get().writeValueAsString(job));
}
doBatchEnqueue(queue, serializedJobs);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -132,6 +161,18 @@ public boolean acquireLock(final String lockName, final String lockHolder, final
*/
protected abstract void doEnqueue(String queue, String msg) throws Exception;

/**
* Actually enqueue the serialized jobs.
*
* @param queue
* the queue to add the Jobs to
* @param msgs
* the serialized Jobs
* @throws Exception
* in case something goes wrong
*/
protected abstract void doBatchEnqueue(String queue, List<String> msgs) throws Exception;

/**
* Actually enqueue the serialized job with high priority.
*
Expand Down Expand Up @@ -178,6 +219,27 @@ public static void doEnqueue(final Jedis jedis, final String namespace, final St
jedis.rpush(JesqueUtils.createKey(namespace, QUEUE, queue), jobJson);
}

/**
* Helper method that encapsulates the minimum logic for adding jobs to a queue.
*
* @param jedis
* the connection to Redis
* @param namespace
* the Resque namespace
* @param queue
* the Resque queue name
* @param jobJsons
* a list of jobs serialized as JSON
*/
public static void doBatchEnqueue(final Jedis jedis, final String namespace, final String queue, final List<String> jobJsons) {
Pipeline pipelined = jedis.pipelined();
pipelined.sadd(JesqueUtils.createKey(namespace, QUEUES), queue);
for (String jobJson : jobJsons) {
pipelined.rpush(JesqueUtils.createKey(namespace, QUEUE, queue), jobJson);
}
pipelined.sync();
}

/**
* Helper method that encapsulates the minimum logic for adding a high
* priority job to a queue.
Expand Down Expand Up @@ -387,9 +449,11 @@ public void removeRecurringEnqueue(String queue, Job job) {
}

private static void validateArguments(final String queue, final Job job) {
if (queue == null || "".equals(queue)) {
throw new IllegalArgumentException("queue must not be null or empty: " + queue);
}
validateQueue(queue);
validateJob(job);
}

private static void validateJob(Job job) {
if (job == null) {
throw new IllegalArgumentException("job must not be null");
}
Expand All @@ -398,15 +462,29 @@ private static void validateArguments(final String queue, final Job job) {
}
}

private static void validateQueue(String queue) {
if (queue == null || "".equals(queue)) {
throw new IllegalArgumentException("queue must not be null or empty: " + queue);
}
}

private static void validateArguments(final String queue, final Job job, final long future) {
validateArguments(queue, job);
validateFuture(future);
}

private static void validateFuture(long future) {
if (System.currentTimeMillis() > future) {
throw new IllegalArgumentException("future must be after current time");
}
}

private static void validateArguments(final String queue, final Job job, final long future, final long frequency) {
validateArguments(queue, job, future);
validateFrequency(frequency);
}

private static void validateFrequency(long frequency) {
if (frequency < 1) {
throw new IllegalArgumentException("frequency must be greater than one second");
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/net/greghaines/jesque/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import net.greghaines.jesque.Job;

import java.util.List;

/**
* A Client allows Jobs to be enqueued for execution by Workers.
*
Expand All @@ -37,6 +39,20 @@ public interface Client {
*/
void enqueue(String queue, Job job);

/**
* Queues jobs in a given queue to be run. It uses Redis Pipelining (https://redis.io/topics/pipelining).
* Check out the "Important Note" here: https://redis.io/topics/pipelining#redis-pipelining. => Consider splitting
* long lists of jobs into chunks of 10,000 or so.
*
* @param queue
* the queue to add the Job to
* @param jobs
* the jobs to be enqueued
* @throws IllegalArgumentException
* if the queue is null or empty or if the list of jobs is null
*/
void batchEnqueue(String queue, List<Job> jobs);

/**
* Queues a job with high priority in a given queue to be run.
*
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/net/greghaines/jesque/client/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package net.greghaines.jesque.client;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -109,6 +110,15 @@ protected void doEnqueue(final String queue, final String jobJson) {
doEnqueue(this.jedis, getNamespace(), queue, jobJson);
}

/**
* {@inheritDoc}
*/
@Override
protected void doBatchEnqueue(final String queue, final List<String> jobsJson) {
ensureJedisConnection();
doBatchEnqueue(this.jedis, getNamespace(), queue, jobsJson);
}

/**
* {@inheritDoc}
*/
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/net/greghaines/jesque/client/ClientPoolImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

import java.util.List;

/**
* A Client implementation that gets its connection to Redis from a connection
* pool.
Expand Down Expand Up @@ -65,6 +67,20 @@ public Void doWork(final Jedis jedis) {
});
}

@Override
protected void doBatchEnqueue(final String queue, final List<String> jobsJson) throws Exception {
PoolUtils.doWorkInPool(this.jedisPool, new PoolWork<Jedis, Void>() {
/**
* {@inheritDoc}
*/
@Override
public Void doWork(final Jedis jedis) {
doBatchEnqueue(jedis, getNamespace(), queue, jobsJson);
return null;
}
});
}

/**
* {@inheritDoc}
*/
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/net/greghaines/jesque/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ public void jobMixed() throws Exception {
assertMixed(null);
}

@Test
public void jobBatchMixed() throws Exception {
LOG.info("Running jobBatchMixed()...");
assertBatchMixed(null);
}

@Test
public void successInSpiteOfListenerFailPoll() {
LOG.info("Running successInSpiteOfListenerFailPoll()...");
Expand Down Expand Up @@ -128,6 +134,12 @@ public void mixedInSpiteOfListenerFailAll() {
assertMixed(new FailingWorkerListener(), WorkerEvent.values());
}

@Test
public void batchMixedInSpiteOfListenerFailAll() {
LOG.info("Running batchMixedInSpiteOfListenerFailAll()...");
assertBatchMixed(new FailingWorkerListener(), WorkerEvent.values());
}

@Test
public void acquireLockSuccess() {
LOG.info("Running acquireLockSuccess()...");
Expand Down Expand Up @@ -217,6 +229,24 @@ private static void assertMixed(final WorkerListener listener, final WorkerEvent
}
}

private static void assertBatchMixed(final WorkerListener listener, final WorkerEvent... events) {
final Job job1 = new Job("FailAction");
final Job job2 = new Job("TestAction", new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) });
final Job job3 = new Job("FailAction");
final Job job4 = new Job("TestAction", new Object[] { 1, 2.3, true, "test", Arrays.asList("inner", 4.5) });

doBatchWork(Arrays.asList(job1, job2, job3, job4),
map(entry("FailAction", FailAction.class), entry("TestAction", TestAction.class)), listener, events);

final Jedis jedis = createJedis(CONFIG);
try {
Assert.assertEquals("2", jedis.get(createKey(CONFIG.getNamespace(), STAT, FAILED)));
Assert.assertEquals("2", jedis.get(createKey(CONFIG.getNamespace(), STAT, PROCESSED)));
} finally {
jedis.quit();
}
}

private static void doWork(final List<Job> jobs, final Map<String, ? extends Class<? extends Runnable>> jobTypes,
final WorkerListener listener, final WorkerEvent... events) {
final Worker worker = new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE), new MapBasedJobFactory(jobTypes));
Expand All @@ -232,6 +262,21 @@ private static void doWork(final List<Job> jobs, final Map<String, ? extends Cla
}
}

private static void doBatchWork(final List<Job> jobs, final Map<String, ? extends Class<? extends Runnable>> jobTypes,
final WorkerListener listener, final WorkerEvent... events) {
final Worker worker = new WorkerImpl(CONFIG, Arrays.asList(TEST_QUEUE), new MapBasedJobFactory(jobTypes));
if (listener != null && events.length > 0) {
worker.getWorkerEventEmitter().addListener(listener, events);
}
final Thread workerThread = new Thread(worker);
workerThread.start();
try {
TestUtils.batchEnqueueJobs(TEST_QUEUE, jobs, CONFIG);
} finally {
TestUtils.stopWorker(worker, workerThread);
}
}

private static class FailingWorkerListener implements WorkerListener {
public void onEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job,
final Object runner, final Object result, final Throwable t) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/net/greghaines/jesque/Issue56Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testZREM() {
}

public static void enqueue() {
final long future = System.currentTimeMillis() + 5;
final long future = System.currentTimeMillis() + 500;
final Job job = new Job(TestAction.class.getSimpleName());
CLIENT.delayedEnqueue(QUEUE, job, future);
}
Expand Down
9 changes: 9 additions & 0 deletions src/test/java/net/greghaines/jesque/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ public static void enqueueJobs(final String queue, final List<Job> jobs, final C
}
}

public static void batchEnqueueJobs(String queue, List<Job> jobs, Config config) {
final Client client = new ClientImpl(config);
try {
client.batchEnqueue(queue, jobs);
} finally {
client.end();
}
}

public static void stopWorker(final JobExecutor worker, final Thread workerThread) {
stopWorker(worker, workerThread, false);
}
Expand Down