Skip to content

Commit

Permalink
Pipelining from UnifiedJedis (#3221)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Dec 5, 2022
1 parent 9f029c3 commit ed57047
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 49 deletions.
9 changes: 7 additions & 2 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,15 @@ public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterCommandExecutor) executor).provider.getNodes();
return ((ClusterConnectionProvider) provider).getNodes();
}

public Connection getConnectionFromSlot(int slot) {
return ((ClusterCommandExecutor) executor).provider.getConnectionFromSlot(slot);
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
}

@Override
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider);
}
}
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,9 @@ public JedisPooled(PooledConnectionProvider provider) {
public final Pool<Connection> getPool() {
return ((PooledConnectionProvider) provider).getPool();
}

@Override
public Pipeline pipelined() {
return (Pipeline) super.pipelined();
}
}
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSharding.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public JedisSharding(ShardedConnectionProvider provider) {
public JedisSharding(ShardedConnectionProvider provider, Pattern tagPattern) {
super(provider, tagPattern);
}

@Override
public ShardedPipeline pipelined() {
return new ShardedPipeline((ShardedConnectionProvider) provider);
}
}
23 changes: 13 additions & 10 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,21 @@ public class Pipeline extends Queable implements PipelineCommands, PipelineBinar
DatabasePipelineCommands, RedisModulePipelineCommands, Closeable {

protected final Connection connection;
// private final Jedis jedis;
private final boolean closeConnection;
private final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;

public Pipeline(Jedis jedis) {
this(jedis.getConnection(), false);
}

public Pipeline(Connection connection) {
// super(connection);
this.connection = connection;
// this.jedis = null;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this.connection);
this(connection, false);
}

public Pipeline(Jedis jedis) {
// super(jedis.getConnection());
this.connection = jedis.getConnection();
// this.jedis = jedis;
public Pipeline(Connection connection, boolean closeConnection) {
this.connection = connection;
this.closeConnection = closeConnection;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this.connection);
}
Expand All @@ -61,6 +60,10 @@ public final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
@Override
public void close() {
sync();

if (closeConnection) {
connection.close();
}
}

/**
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4534,6 +4534,14 @@ public Map<String, Object> graphConfigGet(String configName) {
}
// RedisGraph commands

public Object pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
}
Connection connection = provider.getConnection();
return new Pipeline(connection, true);
}

public Object sendCommand(ProtocolCommand cmd) {
return executeCommand(commandObjects.commandArguments(cmd));
}
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,4 +944,37 @@ public void testEvalshaKeyAndArgWithBinary() {
private <T> Matcher<Iterable<? super T>> listWithItem(T expected) {
return CoreMatchers.<T>hasItem(equalTo(expected));
}

@Test
public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.cluster' package
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;

List<Response<?>> responses = new ArrayList<>(totalCount);
List<Object> expected = new ArrayList<>(totalCount);
try (ClusterPipeline pipeline = jedis.pipelined()) {
for (int i = 0; i < count; i++) {
responses.add(pipeline.get("foo" + i));
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
responses.add(pipeline.lrange("foobar" + i, 0, -1));
expected.add(Arrays.asList("foo" + i, "bar" + i));
}
}

for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i).get());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
//package redis.clients.jedis.commands.unified.cluster;
//
//import org.junit.AfterClass;
//import org.junit.Before;
//import org.junit.BeforeClass;
//import redis.clients.jedis.commands.unified.HashesCommandsTestBase;
//
//public class ClusterHashesCommandsTest extends HashesCommandsTestBase {
//
// @BeforeClass
// public static void prepare() throws InterruptedException {
// jedis = ClusterCommandsTestHelper.initAndGetCluster();
// }
//
// @AfterClass
// public static void closeCluster() {
// jedis.close();
// }
//
// @AfterClass
// public static void resetCluster() {
// ClusterCommandsTestHelper.tearClusterDown();
// }
//
// @Before
// public void setUp() {
// ClusterCommandsTestHelper.clearClusterData();
// }
//}
package redis.clients.jedis.commands.unified.pooled;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import redis.clients.jedis.commands.unified.HashesCommandsTestBase;

public class PooledHashesCommandsTest extends HashesCommandsTestBase {

@BeforeClass
public static void prepare() throws InterruptedException {
jedis = PooledCommandsTestHelper.getPooled();
}

@AfterClass
public static void cleanUp() {
jedis.close();
}

@Before
public void setUp() {
PooledCommandsTestHelper.clearData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package redis.clients.jedis.commands.unified.pooled;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

import redis.clients.jedis.commands.unified.UnifiedJedisCommandsTestBase;

public class PooledPipelinedTest extends UnifiedJedisCommandsTestBase {

protected Pipeline pipeline;

@BeforeClass
public static void prepare() throws InterruptedException {
jedis = PooledCommandsTestHelper.getPooled();
}

@AfterClass
public static void cleanUp() {
jedis.close();
}

@Before
public void setUp() {
PooledCommandsTestHelper.clearData();
pipeline = ((JedisPooled) jedis).pipelined();
}

@After
public void tearDown() {
pipeline.close();
}

@Test
public void simple() {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;

List<Response<?>> responses = new ArrayList<>(totalCount);
List<Object> expected = new ArrayList<>(totalCount);
for (int i = 0; i < count; i++) {
responses.add(pipeline.get("foo" + i));
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
responses.add(pipeline.lrange("foobar" + i, 0, -1));
expected.add(Arrays.asList("foo" + i, "bar" + i));
}
pipeline.sync();

for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i).get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down

0 comments on commit ed57047

Please sign in to comment.