Skip to content

Commit

Permalink
Allow custom thread pool executors to be wired in. (#3075)
Browse files Browse the repository at this point in the history
* Allow custom thread pool executors to be wired in.

Closes #3066

Following should be done.

* Implement `org.testng.IExecutorServiceFactory`
* plugin the fully qualified class name of the 
above implementation via the configuration parameter
"-threadpoolfactoryclass"

If using Maven surefire plugin then it can be wired
in as below: 

<configuration>
  <properties>
      <property>
          <name>threadpoolfactoryclass</name>
          <value>test.thread.MyExecutorServiceFactory</value>
      </property>
  </properties>
</configuration>

If using TestNG API, then it can be wired in as 
below:

```java
TestNG testng = new TestNG();
testng.setExecutorServiceFactory(new MyExecutorServiceFactory());
```
  • Loading branch information
krmahadevan authored Feb 22, 2024
1 parent 38910de commit 2cc332d
Show file tree
Hide file tree
Showing 22 changed files with 298 additions and 326 deletions.
3 changes: 2 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Current (7.10.0)
New: GITHUB-2916: Allow users to define ordering for TestNG listeners (Krishnan Mahadevan)
Fixed: GITHUB-3066: How to dynamically adjust the number of TestNG threads after IExecutorFactory is deprecated? (Krishnan Mahadevan)
New: GITHUB-2874: Allow users to define ordering for TestNG listeners (Krishnan Mahadevan)
Fixed: GITHUB-3033: Moved ant support under own repository https://github.com/testng-team/testng-ant (Julien Herr)
Fixed: GITHUB-3064: TestResult lost if failure creating RetryAnalyzer (Krishnan Mahadevan)
Fixed: GITHUB-3048: ConcurrentModificationException when injecting values (Krishnan Mahadevan)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.testng;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Represents the capability to create a custom {@link ExecutorService} by downstream consumers. The
* implementation can be plugged in via the configuration parameter <code>-threadpoolfactoryclass
* </code>
*/
@FunctionalInterface
public interface IExecutorServiceFactory {

/**
* @param corePoolSize the number of threads to keep in the pool, even if they are idle, unless
* {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the pool
* @param keepAliveTime when the number of threads is greater than the core, this is the maximum
* time that excess idle threads will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are executed. This queue will
* hold only the {@code Runnable} tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread *
* @return - An implementation of {@link ExecutorService}
*/
ExecutorService create(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ public static boolean ignoreCallbackInvocationSkips() {
return Boolean.getBoolean(IGNORE_CALLBACK_INVOCATION_SKIPS);
}

/**
* @return - <code>true</code> if TestNG is to be using its custom implementation of {@link
* java.util.concurrent.ThreadPoolExecutor} for running concurrent tests. Defaults to <code>
* false</code>
*/
public static boolean favourCustomThreadPoolExecutor() {
return Boolean.getBoolean(FAVOR_CUSTOM_THREAD_POOL_EXECUTOR);
}

/**
* @return - A comma separated list of packages that represent special listeners which users will
* expect to be executed after executing the regular listeners. Here special listeners can be
Expand Down
6 changes: 5 additions & 1 deletion testng-core/src/main/java/org/testng/SuiteRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,11 @@ private void runInParallelTestMode() {
}

ThreadUtil.execute(
"tests", tasks, xmlSuite.getThreadCount(), xmlSuite.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS));
configuration,
"tests",
tasks,
xmlSuite.getThreadCount(),
xmlSuite.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS));
}

private class SuiteWorker implements Runnable {
Expand Down
43 changes: 12 additions & 31 deletions testng-core/src/main/java/org/testng/SuiteTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.testng.internal.IConfiguration;
import org.testng.internal.RuntimeBehavior;
import org.testng.internal.Utils;
import org.testng.internal.thread.TestNGThreadFactory;
import org.testng.internal.thread.graph.GraphOrchestrator;
import org.testng.log4testng.Logger;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;

class SuiteTaskExecutor {
Expand Down Expand Up @@ -41,33 +37,18 @@ public SuiteTaskExecutor(

public void execute() {
String name = "suites-";
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
IExecutorFactory execFactory = configuration.getExecutorFactory();
ITestNGThreadPoolExecutor executor =
execFactory.newSuiteExecutor(
name,
graph,
factory,
threadPoolSize,
threadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
queue,
null);
executor.run();
service = executor;
} else {
service =
new ThreadPoolExecutor(
threadPoolSize,
threadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
GraphOrchestrator<ISuite> executor = new GraphOrchestrator<>(service, factory, graph, null);
executor.run();
}
service =
this.configuration
.getExecutorServiceFactory()
.create(
threadPoolSize,
threadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
GraphOrchestrator<ISuite> executor = new GraphOrchestrator<>(service, factory, graph, null);
executor.run();
}

public void awaitCompletion() {
Expand Down
56 changes: 11 additions & 45 deletions testng-core/src/main/java/org/testng/TestNG.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.testng.reporters.VerboseReporter;
import org.testng.reporters.XMLReporter;
import org.testng.reporters.jq.Main;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.IThreadWorkerFactory;
import org.testng.util.Strings;
import org.testng.xml.IPostProcessor;
Expand Down Expand Up @@ -151,8 +151,6 @@ public class TestNG {
private final Map<Class<? extends IDataProviderInterceptor>, IDataProviderInterceptor>
m_dataProviderInterceptors = Maps.newLinkedHashMap();

private IExecutorFactory m_executorFactory = null;

public static final Integer DEFAULT_VERBOSE = 1;

// Command line suite parameters
Expand Down Expand Up @@ -843,10 +841,9 @@ public void setVerbose(int verbose) {
m_verbose = verbose;
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public void setExecutorFactoryClass(String clazzName) {
this.m_executorFactory = createExecutorFactoryInstanceUsing(clazzName);
public void setExecutorServiceFactory(IExecutorServiceFactory factory) {
m_configuration.setExecutorServiceFactory(
Objects.requireNonNull(factory, "ExecutorServiceFactory cannot be null"));
}

public void setListenerFactory(ITestNGListenerFactory factory) {
Expand All @@ -857,31 +854,6 @@ public void setGenerateResultsPerSuite(boolean generateResultsPerSuite) {
this.m_generateResultsPerSuite = generateResultsPerSuite;
}

private IExecutorFactory createExecutorFactoryInstanceUsing(String clazzName) {
Class<?> cls = ClassHelper.forName(clazzName);
Object instance = m_objectFactory.newInstance(cls);
if (instance instanceof IExecutorFactory) {
return (IExecutorFactory) instance;
}
throw new IllegalArgumentException(
clazzName + " does not implement " + IExecutorFactory.class.getName());
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public void setExecutorFactory(IExecutorFactory factory) {
this.m_executorFactory = factory;
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public IExecutorFactory getExecutorFactory() {
if (this.m_executorFactory == null) {
this.m_executorFactory = createExecutorFactoryInstanceUsing(DEFAULT_THREADPOOL_FACTORY);
}
return this.m_executorFactory;
}

private void initializeCommandLineSuites() {
if (m_commandLineTestClasses != null || m_commandLineMethods != null) {
if (null != m_commandLineMethods) {
Expand Down Expand Up @@ -1018,7 +990,6 @@ private void initializeConfiguration() {
m_configuration.setConfigurable(m_configurable);
m_configuration.setObjectFactory(factory);
m_configuration.setAlwaysRunListeners(this.m_alwaysRun);
m_configuration.setExecutorFactory(getExecutorFactory());
}

private void addListeners(XmlSuite s) {
Expand Down Expand Up @@ -1217,11 +1188,9 @@ public List<ISuite> runSuitesLocally() {
// Create a map with XmlSuite as key and corresponding SuiteRunner as value
for (XmlSuite xmlSuite : m_suites) {
if (m_configuration.isShareThreadPoolForDataProviders()) {
abortIfUsingGraphThreadPoolExecutor("Shared thread-pool for data providers");
xmlSuite.setShareThreadPoolForDataProviders(true);
}
if (m_configuration.useGlobalThreadPool()) {
abortIfUsingGraphThreadPoolExecutor("Global thread-pool");
xmlSuite.shouldUseGlobalThreadPool(true);
}
createSuiteRunners(suiteRunnerMap, xmlSuite);
Expand Down Expand Up @@ -1272,13 +1241,6 @@ private static void error(String s) {
LOGGER.error(s);
}

private static void abortIfUsingGraphThreadPoolExecutor(String prefix) {
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
throw new UnsupportedOperationException(
prefix + " is NOT COMPATIBLE with TestNG's custom thread pool executor");
}
}

/**
* @return the verbose level, checking in order: the verbose level on the suite, the verbose level
* on the TestNG object, or 1.
Expand Down Expand Up @@ -1514,9 +1476,13 @@ protected void configure(CommandLineArgs cla) {
m_objectFactory.newInstance((Class<IInjectorFactory>) clazz));
}
}
if (cla.threadPoolFactoryClass != null) {
setExecutorFactoryClass(cla.threadPoolFactoryClass);
}
Optional.ofNullable(cla.threadPoolFactoryClass)
.map(ClassHelper::forName)
.filter(IExecutorServiceFactory.class::isAssignableFrom)
.map(it -> m_objectFactory.newInstance(it))
.map(it -> (IExecutorServiceFactory) it)
.ifPresent(this::setExecutorServiceFactory);

setOutputDirectory(cla.outputDirectory);

String testClasses = cla.testClass;
Expand Down
57 changes: 19 additions & 38 deletions testng-core/src/main/java/org/testng/TestTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.testng.internal.IConfiguration;
import org.testng.internal.ObjectBag;
import org.testng.internal.RuntimeBehavior;
import org.testng.internal.Utils;
import org.testng.internal.thread.TestNGThreadFactory;
import org.testng.internal.thread.graph.GraphOrchestrator;
import org.testng.log4testng.Logger;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;
import org.testng.xml.XmlTest;

Expand Down Expand Up @@ -50,42 +46,27 @@ public TestTaskExecutor(
public void execute() {
String name = "test-" + xmlTest.getName();
int threadCount = Math.max(xmlTest.getThreadCount(), 1);
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
IExecutorFactory execFactory = configuration.getExecutorFactory();
ITestNGThreadPoolExecutor executor =
execFactory.newTestMethodExecutor(
name,
graph,
factory,
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
queue,
comparator);
executor.run();
service = executor;
boolean reUse = xmlTest.getSuite().useGlobalThreadPool();
Supplier<Object> supplier =
() ->
configuration
.getExecutorServiceFactory()
.create(
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
if (reUse) {
ObjectBag bag = ObjectBag.getInstance(xmlTest.getSuite());
service = (ExecutorService) bag.createIfRequired(ExecutorService.class, supplier);
} else {
boolean reUse = xmlTest.getSuite().useGlobalThreadPool();
Supplier<Object> supplier =
() ->
new ThreadPoolExecutor(
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
if (reUse) {
ObjectBag bag = ObjectBag.getInstance(xmlTest.getSuite());
service = (ExecutorService) bag.createIfRequired(ExecutorService.class, supplier);
} else {
service = (ExecutorService) supplier.get();
}
GraphOrchestrator<ITestNGMethod> executor =
new GraphOrchestrator<>(service, factory, graph, comparator);
executor.run();
service = (ExecutorService) supplier.get();
}
GraphOrchestrator<ITestNGMethod> executor =
new GraphOrchestrator<>(service, factory, graph, comparator);
executor.run();
}

public void awaitCompletion() {
Expand Down
15 changes: 8 additions & 7 deletions testng-core/src/main/java/org/testng/internal/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import org.testng.IConfigurable;
import org.testng.IConfigurationListener;
import org.testng.IExecutionListener;
import org.testng.IExecutorServiceFactory;
import org.testng.IHookable;
import org.testng.IInjectorFactory;
import org.testng.ITestNGListenerFactory;
Expand All @@ -16,8 +19,6 @@
import org.testng.internal.annotations.IAnnotationFinder;
import org.testng.internal.annotations.JDK15AnnotationFinder;
import org.testng.internal.objects.GuiceBackedInjectorFactory;
import org.testng.internal.thread.DefaultThreadPoolExecutorFactory;
import org.testng.thread.IExecutorFactory;

public class Configuration implements IConfiguration {

Expand All @@ -34,7 +35,7 @@ public class Configuration implements IConfiguration {
private final Map<Class<? extends IConfigurationListener>, IConfigurationListener>
m_configurationListeners = Maps.newLinkedHashMap();
private boolean alwaysRunListeners = true;
private IExecutorFactory m_executorFactory = new DefaultThreadPoolExecutorFactory();
private IExecutorServiceFactory executorServiceFactory = ThreadPoolExecutor::new;

private IInjectorFactory injectorFactory = new GuiceBackedInjectorFactory();

Expand Down Expand Up @@ -145,13 +146,13 @@ public void setAlwaysRunListeners(boolean alwaysRunListeners) {
}

@Override
public void setExecutorFactory(IExecutorFactory factory) {
this.m_executorFactory = factory;
public void setExecutorServiceFactory(IExecutorServiceFactory executorServiceFactory) {
this.executorServiceFactory = Objects.requireNonNull(executorServiceFactory);
}

@Override
public IExecutorFactory getExecutorFactory() {
return this.m_executorFactory;
public IExecutorServiceFactory getExecutorServiceFactory() {
return executorServiceFactory;
}

@Override
Expand Down
Loading

0 comments on commit 2cc332d

Please sign in to comment.