Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support JMX monitoring for worker pool status #103

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 140 additions & 7 deletions src/main/java/net/greghaines/jesque/worker/WorkerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,81 @@
*/
package net.greghaines.jesque.worker;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* WorkerPool creates a fixed number of identical <code>Workers</code>, each on a separate <code>Thread</code>.
*/
public class WorkerPool implements Worker {

public class WorkerPool implements Worker, WorkerPoolMXBean {
private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class);
private static final String JMX_WORKER_POOL_NAME = "net.greghaines.jesque:type=WorkerPool";

private final List<Worker> workers;
private final List<Thread> threads;
private final WorkerEventEmitter eventEmitter;

private boolean isRegisterMbeans;

/**
* Create a WorkerPool with the given number of Workers and the default <code>ThreadFactory</code>.
* Create a WorkerPool with the given number of Workers, the default <code>ThreadFactory</code>
* and disable JMX monitoring.
*
* @param workerFactory a Callable that returns an implementation of Worker
* @param numWorkers the number of Workers to create
*/
public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers) {
this(workerFactory, numWorkers, Executors.defaultThreadFactory());
this(workerFactory, numWorkers, false);
}

/**
* Create a WorkerPool with the given number of Workers, the default <code>ThreadFactory</code>
* and whether to monitor by JMX or not.
*
* @param workerFactory a Callable that returns an implementation of Worker
* @param numWorkers the number of Workers to create
* @param isRegisterMbeans whether to monitor by JMX or not.
*/
public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers,
final boolean isRegisterMbeans) {
this(workerFactory, numWorkers, Executors.defaultThreadFactory(), isRegisterMbeans);
}

/**
* Create a WorkerPool with the given number of Workers and the given <code>ThreadFactory</code>.
* Create a WorkerPool with the given number of Workers, the given <code>ThreadFactory</code>
* and disable JMX monitoring.
*
* @param workerFactory a Callable that returns an implementation of Worker
* @param numWorkers the number of Workers to create
* @param threadFactory the factory to create pre-configured Threads
*/
public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers,
final ThreadFactory threadFactory) {
this(workerFactory, numWorkers, threadFactory, false);
}

/**
* Create a WorkerPool with the given number of Workers, the given <code>ThreadFactory</code>
* and whether to monitor by JMX or not.
*
* @param workerFactory a Callable that returns an implementation of Worker
* @param numWorkers the number of Workers to create
* @param threadFactory the factory to create pre-configured Threads
* @param isRegisterMbeans whether to monitor by JMX or not.
*/
public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers,
final ThreadFactory threadFactory, final boolean isRegisterMbeans) {
this.workers = new ArrayList<Worker>(numWorkers);
this.threads = new ArrayList<Thread>(numWorkers);
this.eventEmitter = new WorkerPoolEventEmitter(this.workers);
Expand All @@ -62,13 +104,16 @@ public WorkerPool(final Callable<? extends Worker> workerFactory, final int numW
throw new RuntimeException(e);
}
}

this.isRegisterMbeans = isRegisterMbeans;
registerMBeans();
}

/**
* Shutdown this pool and wait millis time per thread or until all threads are finished if millis is 0.
* @param now if true, an effort will be made to stop any jobs in progress
* @param millis the time to wait in milliseconds for the threads to join; a timeout of 0 means to wait forever.
* @throws InterruptedException if any thread has interrupted the current thread.
* @throws InterruptedException if any thread has interrupted the current thread.
* The interrupted status of the current thread is cleared when this exception is thrown.
*/
public void endAndJoin(final boolean now, final long millis) throws InterruptedException {
Expand Down Expand Up @@ -248,7 +293,95 @@ public void setExceptionHandler(final ExceptionHandler exceptionHandler) {
worker.setExceptionHandler(exceptionHandler);
}
}


/**
* {@inheritDoc}
*/
@Override
public int getTotalWorkers() {
return this.workers.size();
}

/**
* {@inheritDoc}
*/
@Override
public int getActiveWorkers() {
int activeWorkers = 0;
for (final Worker worker : this.workers) {
if (worker.isProcessingJob()) {
activeWorkers++;
}
}
return activeWorkers;
}

/**
* {@inheritDoc}
*/
@Override
public int getIdleWorkers() {
return getTotalWorkers() - getActiveWorkers();
}

/**
* Enable JMX monitoring.
*/
public void registerMBeans() {
if (!this.isRegisterMbeans) {
return;
}

final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

final ObjectName beanWorkerPoolName;
try {
beanWorkerPoolName = new ObjectName(JMX_WORKER_POOL_NAME);
if (!mBeanServer.isRegistered(beanWorkerPoolName)) {
mBeanServer.registerMBean(this, beanWorkerPoolName);
}
} catch (Exception e) {
LOG.warn("Failed to register management beans.", e);
}
}

/**
* Disable JMX monitoring.
*/
public void unregisterMBeans() {
if (!this.isRegisterMbeans) {
return;
}

final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

final ObjectName beanWorkerPoolName;
try {
beanWorkerPoolName = new ObjectName(JMX_WORKER_POOL_NAME);
if (mBeanServer.isRegistered(beanWorkerPoolName)) {
mBeanServer.unregisterMBean(beanWorkerPoolName);
}
} catch (Exception e) {
LOG.warn("Failed to unregister management beans.", e);
}
}

/**
* @return whether to monitor by JMX or not
*/
public boolean isRegisterMbeans() {
return this.isRegisterMbeans;
}

/**
* Set whether to monitor by JMX or not
*
* @param isRegisterMbeans whether to monitor by JMX or not
*/
public void setRegisterMbeans(final boolean isRegisterMbeans) {
this.isRegisterMbeans = isRegisterMbeans;
}

private static class WorkerPoolEventEmitter implements WorkerEventEmitter {

private final List<Worker> workers;
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/net/greghaines/jesque/worker/WorkerPoolMXBean.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package net.greghaines.jesque.worker;

public interface WorkerPoolMXBean {
/**
* @return number of total workers
*/
int getTotalWorkers();

/**
* @return number of active (processing) workers
*/
int getActiveWorkers();

/**
* @return number of idle workers
*/
int getIdleWorkers();
}