Skip to content

Commit

Permalink
Shutdown ExecutorService(s) in multi node pipelines (#3467)
Browse files Browse the repository at this point in the history
* Shutdown ExecutorService(s) in multi node pipelines

* Use only shutdownNow()

* format import
  • Loading branch information
sazzad16 committed Jun 14, 2023
1 parent f9a8575 commit c33bdf7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/main/java/redis/clients/jedis/MultiNodePipelineBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public abstract class MultiNodePipelineBase extends PipelineBase
private final Map<HostAndPort, Connection> connections;
private volatile boolean syncing = false;

private final ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

public MultiNodePipelineBase(CommandObjects commandObjects) {
super(commandObjects);
pipelinedResponses = new LinkedHashMap<>();
Expand Down Expand Up @@ -104,6 +102,8 @@ public final void sync() {
}
syncing = true;

ExecutorService executorService = Executors.newFixedThreadPool(MULTI_NODE_PIPELINE_SYNC_WORKERS);

CountDownLatch countDownLatch = new CountDownLatch(pipelinedResponses.size());
Iterator<Map.Entry<HostAndPort, Queue<Response<?>>>> pipelinedResponsesIterator
= pipelinedResponses.entrySet().iterator();
Expand Down Expand Up @@ -136,6 +136,8 @@ public final void sync() {
log.error("Thread is interrupted during sync.", e);
}

executorService.shutdownNow();

syncing = false;
}

Expand Down
36 changes: 36 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1061,4 +1061,40 @@ public void transaction() {
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
}
}

@Test(timeout = 10_000L)
public void multiple() {
final int maxTotal = 100;
ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
poolConfig.setMaxTotal(maxTotal);
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG, 5, poolConfig)) {
for (int i = 0; i < maxTotal; i++) {
assertThreadsCount();
String s = Integer.toString(i);
try (ClusterPipeline pipeline = cluster.pipelined()) {
pipeline.set(s, s);
pipeline.sync();
}
assertThreadsCount();
}
}
}

private static void assertThreadsCount() {
// Get the root thread group
final ThreadGroup rootGroup = Thread.currentThread().getThreadGroup().getParent();

// Create a buffer to store the thread information
final Thread[] threads = new Thread[rootGroup.activeCount()];

// Enumerate all threads into the buffer
rootGroup.enumerate(threads);

// Assert information about threads
final int count = (int) Arrays.stream(threads)
.filter(thread -> thread != null && thread.getName() != null
&& thread.getName().startsWith("pool-"))
.count();
assertTrue(count < 9);
}
}

0 comments on commit c33bdf7

Please sign in to comment.