Skip to content

Commit

Permalink
Support suite level thread pools for data provider
Browse files Browse the repository at this point in the history
Closes testng-team#2980

We can now configure TestNG such that it uses a
Suite level thread pool when running data driven
Tests in parallel.

This can be enabled via the configuration 

“-shareThreadPoolForDataProviders” 
with a value of “true”

Alternatively one can also use the suite level 
attribute “share-thread-pool-for-data-providers”
  • Loading branch information
krmahadevan committed Oct 1, 2023
1 parent a9c0174 commit 4c332ba
Show file tree
Hide file tree
Showing 19 changed files with 357 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Current
Fixed: GITHUB-2991: Suite attributes map should be thread safe (Krishnan Mahadevan)
Fixed: GITHUB-2980: Data Provider Threads configuration in the suite don't match the documentation (Krishnan Mahadevan)
Fixed: GITHUB-2974: Command line arguments -groups and -excludegroups override defined groups in a suite xml file (dr29bart)
Fixed: GITHUB-2961: "Unexpected value: 16" error when multiple beforeMethod config methods with firstTimeOnly property run before a test (Krishnan Mahadevan)
Fixed: GITHUB-2904: Add location of docs Github to readme and contributions page (Mohsin Sackeer)
Expand Down
27 changes: 27 additions & 0 deletions testng-core-api/src/main/java/org/testng/IObjectBag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.testng;

import java.io.Closeable;
import java.util.function.Supplier;

/**
* Represents a bean bag which is scoped to a specific {@link ISuite}. After a {@link ISuite} wraps
* up its execution, the contents of this bean bag will be cleaned-up.
*/
public interface IObjectBag {

/**
* @param type - The type of the object to be created
* @param supplier - A {@link Supplier} that should be used to produce a new instance
* @param <T> - The type of the object
* @return - Either the newly produced instance or the existing instance.
*/
<T> T createIfRequired(Class<T> type, Supplier<T> supplier);

/**
* In-case the object bag has bean objects that implement {@link java.io.Closeable} then the
* {@link Closeable#close()} will be automatically invoked after a suite has run to completion but
* before the test reports are generated. This will also empty the bean bag. So querying this bean
* bag during reporting phase is most likely to yield only an empty value
*/
void cleanup();
}
13 changes: 13 additions & 0 deletions testng-core-api/src/main/java/org/testng/ISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.testng.internal.annotations.IAnnotationFinder;
import org.testng.xml.XmlSuite;

Expand All @@ -23,6 +24,18 @@ public interface ISuite extends IAttributes {
/** @return The object factory used to create all test instances. */
ITestObjectFactory getObjectFactory();

default IObjectBag getObjectBag() {
return new IObjectBag() {
@Override
public <T> T createIfRequired(Class<T> type, Supplier<T> supplier) {
return supplier.get();
}

@Override
public void cleanup() {}
};
}

@Deprecated
/** @deprecated - This interface stands deprecated as of TestNG 7.5.0 */
default IObjectFactory2 getObjectFactory2() {
Expand Down
10 changes: 10 additions & 0 deletions testng-core-api/src/main/java/org/testng/xml/XmlSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public String toString() {
public static final Boolean DEFAULT_SKIP_FAILED_INVOCATION_COUNTS = Boolean.FALSE;
private Boolean m_skipFailedInvocationCounts = DEFAULT_SKIP_FAILED_INVOCATION_COUNTS;

private boolean shareThreadPoolForDataProviders = false;

/** The thread count. */
public static final Integer DEFAULT_THREAD_COUNT = 5;

Expand Down Expand Up @@ -243,6 +245,14 @@ public Class<? extends ITestObjectFactory> getObjectFactoryClass() {
return m_objectFactoryClass;
}

public void setShareThreadPoolForDataProviders(boolean shareThreadPoolForDataProviders) {
this.shareThreadPoolForDataProviders = shareThreadPoolForDataProviders;
}

public boolean isShareThreadPoolForDataProviders() {
return shareThreadPoolForDataProviders;
}

@Deprecated
public void setObjectFactory(ITestObjectFactory objectFactory) {
setObjectFactoryClass(objectFactory.getClass());
Expand Down
9 changes: 9 additions & 0 deletions testng-core/src/main/java/org/testng/CommandLineArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,13 @@ public class CommandLineArgs {
names = GENERATE_RESULTS_PER_SUITE,
description = "Should TestNG consider failures in Data Providers as test failures.")
public Boolean generateResultsPerSuite = false;

public static final String SHARE_THREAD_POOL_FOR_DATA_PROVIDERS =
"-shareThreadPoolForDataProviders";

@Parameter(
names = SHARE_THREAD_POOL_FOR_DATA_PROVIDERS,
description =
"Should TestNG use a global Shared ThreadPool (At suite level) for running data providers.")
public Boolean shareThreadPoolForDataProviders = false;
}
7 changes: 7 additions & 0 deletions testng-core/src/main/java/org/testng/SuiteRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public class SuiteRunner implements ISuite, ISuiteRunnerListener {
private final Set<IExecutionVisualiser> visualisers = Sets.newHashSet();
private ITestListener exitCodeListener;

private final IObjectBag objectBag = new ObjectBag();

@Override
public IObjectBag getObjectBag() {
return objectBag;
}

public SuiteRunner(
IConfiguration configuration,
XmlSuite suite,
Expand Down
14 changes: 14 additions & 0 deletions testng-core/src/main/java/org/testng/TestNG.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,14 @@ public boolean isPropagateDataProviderFailureAsTestFailure() {
return this.m_configuration.isPropagateDataProviderFailureAsTestFailure();
}

public void shareThreadPoolForDataProviders(boolean flag) {
this.m_configuration.shareThreadPoolForDataProviders(flag);
}

public boolean isShareThreadPoolForDataProviders() {
return this.m_configuration.isShareThreadPoolForDataProviders();
}

/**
* Set the suites file names to be run by this TestNG object. This method tries to load and parse
* the specified TestNG suite xml files. If a file is missing, it is ignored.
Expand Down Expand Up @@ -1082,6 +1090,7 @@ public void run() {
m_end = System.currentTimeMillis();

if (null != suiteRunners) {
suiteRunners.forEach(it -> it.getObjectBag().cleanup());
generateReports(suiteRunners);
}

Expand Down Expand Up @@ -1186,6 +1195,9 @@ public List<ISuite> runSuitesLocally() {
// First initialize the suite runners to ensure there are no configuration issues.
// Create a map with XmlSuite as key and corresponding SuiteRunner as value
for (XmlSuite xmlSuite : m_suites) {
if (m_configuration.isShareThreadPoolForDataProviders()) {
xmlSuite.setShareThreadPoolForDataProviders(true);
}
createSuiteRunners(suiteRunnerMap, xmlSuite);
}

Expand Down Expand Up @@ -1454,6 +1466,8 @@ public static TestNG privateMain(String[] argv, ITestListener listener) {
* @param cla The command line parameters
*/
protected void configure(CommandLineArgs cla) {
Optional.ofNullable(cla.shareThreadPoolForDataProviders)
.ifPresent(this::shareThreadPoolForDataProviders);
Optional.ofNullable(cla.propagateDataProviderFailureAsTestFailure)
.ifPresent(value -> propagateDataProviderFailureAsTestFailure());
setReportAllDataDrivenTestsAsSkipped(cla.includeAllDataDrivenTestsWhenSkipping);
Expand Down
12 changes: 12 additions & 0 deletions testng-core/src/main/java/org/testng/internal/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Configuration implements IConfiguration {
private ITestObjectFactory m_objectFactory;
private IHookable m_hookable;
private IConfigurable m_configurable;

private boolean shareThreadPoolForDataProviders = false;
private final Map<Class<? extends IExecutionListener>, IExecutionListener> m_executionListeners =
Maps.newLinkedHashMap();
private final Map<Class<? extends IConfigurationListener>, IConfigurationListener>
Expand Down Expand Up @@ -168,4 +170,14 @@ public void propagateDataProviderFailureAsTestFailure() {
public boolean isPropagateDataProviderFailureAsTestFailure() {
return propagateDataProviderFailureAsTestFailure;
}

@Override
public boolean isShareThreadPoolForDataProviders() {
return this.shareThreadPoolForDataProviders;
}

@Override
public void shareThreadPoolForDataProviders(boolean flag) {
this.shareThreadPoolForDataProviders = flag;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ default boolean getReportAllDataDrivenTestsAsSkipped() {
void propagateDataProviderFailureAsTestFailure();

boolean isPropagateDataProviderFailureAsTestFailure();

boolean isShareThreadPoolForDataProviders();

void shareThreadPoolForDataProviders(boolean flag);
}
39 changes: 39 additions & 0 deletions testng-core/src/main/java/org/testng/internal/ObjectBag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.testng.internal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.testng.IObjectBag;
import org.testng.log4testng.Logger;

/**
* A simple bean bag that is intended to help share objects during the lifetime of TestNG without
* needing it to be a singleton.
*/
public final class ObjectBag implements IObjectBag {

private static final Logger logger = Logger.getLogger(ObjectBag.class);
private final Map<Class<?>, Object> bag = new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
public <T> T createIfRequired(Class<T> type, Supplier<T> supplier) {
return (T) bag.computeIfAbsent(type, t -> supplier.get());
}

public void cleanup() {
bag.values().stream()
.filter(it -> it instanceof Closeable)
.map(it -> (Closeable) it)
.forEach(
it -> {
try {
it.close();
} catch (IOException e) {
logger.debug("Could not clean-up " + it, e);
}
});
bag.clear();
}
}
22 changes: 20 additions & 2 deletions testng-core/src/main/java/org/testng/internal/PoolService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.testng.internal;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -15,12 +17,18 @@
import org.testng.internal.thread.ThreadUtil;

/** Simple wrapper for an ExecutorCompletionService. */
public class PoolService<FutureType> {
public class PoolService<FutureType> implements Closeable {

private final ExecutorCompletionService<FutureType> m_completionService;
private final ExecutorService m_executor;

private final boolean shutdownAfterExecution;

public PoolService(int threadPoolSize) {
this(threadPoolSize, true);
}

public PoolService(int threadPoolSize, boolean shutdownAfterExecution) {

ThreadFactory threadFactory =
new ThreadFactory() {
Expand All @@ -35,6 +43,7 @@ public Thread newThread(@Nonnull Runnable r) {
};
m_executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
m_completionService = new ExecutorCompletionService<>(m_executor);
this.shutdownAfterExecution = shutdownAfterExecution;
}

public List<FutureType> submitTasksAndWait(List<? extends Callable<FutureType>> tasks) {
Expand All @@ -53,7 +62,16 @@ public List<FutureType> submitTasksAndWait(List<? extends Callable<FutureType>>
}
}

m_executor.shutdown();
if (shutdownAfterExecution) {
m_executor.shutdown();
}
return result;
}

@Override
public void close() throws IOException {
if (!shutdownAfterExecution) {
m_executor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.testng.IObjectBag;
import org.testng.ITestContext;
import org.testng.ITestResult;
import org.testng.collections.CollectionUtils;
Expand Down Expand Up @@ -129,7 +130,17 @@ public List<ITestResult> runInParallel(
// testng387: increment the param index in the bag.
parametersIndex += 1;
}
PoolService<List<ITestResult>> ps = new PoolService<>(suite.getDataProviderThreadCount());

IObjectBag objectBag = context.getSuite().getObjectBag();
boolean sharedThreadPool = context.getSuite().getXmlSuite().isShareThreadPoolForDataProviders();

@SuppressWarnings("unchecked")
PoolService<List<ITestResult>> ps =
sharedThreadPool
? objectBag.createIfRequired(
PoolService.class,
() -> new PoolService<>(suite.getDataProviderThreadCount(), false))
: new PoolService<>(suite.getDataProviderThreadCount());
List<List<ITestResult>> r = ps.submitTasksAndWait(workers);
for (List<ITestResult> l2 : r) {
result.addAll(l2);
Expand Down
10 changes: 10 additions & 0 deletions testng-core/src/main/java/org/testng/xml/TestNGContentHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Stack;
import org.testng.ITestObjectFactory;
import org.testng.TestNGException;
Expand Down Expand Up @@ -267,6 +268,15 @@ private void xmlSuite(boolean start, Attributes attributes) {
if (null != dataProviderThreadCount) {
m_currentSuite.setDataProviderThreadCount(Integer.parseInt(dataProviderThreadCount));
}

String shareThreadPoolForDataProviders =
attributes.getValue("share-thread-pool-for-data-providers");
Optional.ofNullable(shareThreadPoolForDataProviders)
.ifPresent(
it ->
m_currentSuite.setShareThreadPoolForDataProviders(
Boolean.parseBoolean(shareThreadPoolForDataProviders)));

String timeOut = attributes.getValue("time-out");
if (null != timeOut) {
m_currentSuite.setTimeOut(timeOut);
Expand Down
5 changes: 4 additions & 1 deletion testng-core/src/main/resources/testng-1.0.dtd
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ Cedric Beust & Alexandru Popescu
@attr skipfailedinvocationcounts Whether to skip failed invocations.
@attr data-provider-thread-count An integer giving the size of the thread pool to use
for parallel data providers.
@attr share-thread-pool-for-data-providers - Whether TestNG should use a common thread pool
for running parallel data providers. (Works only with TestNG versions 7.9.0 or higher)
@attr object-factory A class that implements IObjectFactory that will be used to
instantiate the test objects.
@attr allow-return-values If true, tests that return a value will be run as well
-->
<!ATTLIST suite
<!ATTLIST suite
name CDATA #REQUIRED
junit (true | false) "false"
verbose CDATA #IMPLIED
Expand All @@ -73,6 +75,7 @@ Cedric Beust & Alexandru Popescu
time-out CDATA #IMPLIED
skipfailedinvocationcounts (true | false) "false"
data-provider-thread-count CDATA "10"
share-thread-pool-for-data-providers (true | false) "false"
object-factory CDATA #IMPLIED
group-by-instances (true | false) "false"
preserve-order (true | false) "true"
Expand Down
Loading

0 comments on commit 4c332ba

Please sign in to comment.