diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 15fed0297e..6ed9a54408 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -191,10 +191,15 @@ public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, } public Map getClusterNodes() { - return ((ClusterCommandExecutor) executor).provider.getNodes(); + return ((ClusterConnectionProvider) provider).getNodes(); } public Connection getConnectionFromSlot(int slot) { - return ((ClusterCommandExecutor) executor).provider.getConnectionFromSlot(slot); + return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot); + } + + @Override + public ClusterPipeline pipelined() { + return new ClusterPipeline((ClusterConnectionProvider) provider); } } diff --git a/src/main/java/redis/clients/jedis/JedisPooled.java b/src/main/java/redis/clients/jedis/JedisPooled.java index 767ce6b065..f573341868 100644 --- a/src/main/java/redis/clients/jedis/JedisPooled.java +++ b/src/main/java/redis/clients/jedis/JedisPooled.java @@ -394,4 +394,9 @@ public JedisPooled(PooledConnectionProvider provider) { public final Pool getPool() { return ((PooledConnectionProvider) provider).getPool(); } + + @Override + public Pipeline pipelined() { + return (Pipeline) super.pipelined(); + } } diff --git a/src/main/java/redis/clients/jedis/JedisSharding.java b/src/main/java/redis/clients/jedis/JedisSharding.java index a1cb7371e4..1f7f1700cf 100644 --- a/src/main/java/redis/clients/jedis/JedisSharding.java +++ b/src/main/java/redis/clients/jedis/JedisSharding.java @@ -39,4 +39,9 @@ public JedisSharding(ShardedConnectionProvider provider) { public JedisSharding(ShardedConnectionProvider provider, Pattern tagPattern) { super(provider, tagPattern); } + + @Override + public ShardedPipeline pipelined() { + return new ShardedPipeline((ShardedConnectionProvider) provider); + } } diff --git a/src/main/java/redis/clients/jedis/Pipeline.java b/src/main/java/redis/clients/jedis/Pipeline.java index 1cb0d8005e..398950a18c 100644 --- a/src/main/java/redis/clients/jedis/Pipeline.java +++ b/src/main/java/redis/clients/jedis/Pipeline.java @@ -33,22 +33,21 @@ public class Pipeline extends Queable implements PipelineCommands, PipelineBinar DatabasePipelineCommands, RedisModulePipelineCommands, Closeable { protected final Connection connection; -// private final Jedis jedis; + private final boolean closeConnection; private final CommandObjects commandObjects; private final GraphCommandObjects graphCommandObjects; + public Pipeline(Jedis jedis) { + this(jedis.getConnection(), false); + } + public Pipeline(Connection connection) { -// super(connection); - this.connection = connection; -// this.jedis = null; - this.commandObjects = new CommandObjects(); - this.graphCommandObjects = new GraphCommandObjects(this.connection); + this(connection, false); } - public Pipeline(Jedis jedis) { -// super(jedis.getConnection()); - this.connection = jedis.getConnection(); -// this.jedis = jedis; + public Pipeline(Connection connection, boolean closeConnection) { + this.connection = connection; + this.closeConnection = closeConnection; this.commandObjects = new CommandObjects(); this.graphCommandObjects = new GraphCommandObjects(this.connection); } @@ -61,6 +60,10 @@ public final Response appendCommand(CommandObject commandObject) { @Override public void close() { sync(); + + if (closeConnection) { + connection.close(); + } } /** diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 0a48fc0b6b..bef6a91f6c 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -4534,6 +4534,14 @@ public Map graphConfigGet(String configName) { } // RedisGraph commands + public Object pipelined() { + if (provider == null) { + throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass()); + } + Connection connection = provider.getConnection(); + return new Pipeline(connection, true); + } + public Object sendCommand(ProtocolCommand cmd) { return executeCommand(commandObjects.commandArguments(cmd)); } diff --git a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java index e02808266a..b8db4f3c12 100644 --- a/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java +++ b/src/test/java/redis/clients/jedis/ClusterPipeliningTest.java @@ -944,4 +944,37 @@ public void testEvalshaKeyAndArgWithBinary() { private Matcher> listWithItem(T expected) { return CoreMatchers.hasItem(equalTo(expected)); } + + @Test + public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.cluster' package + try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) { + final int count = 10; + int totalCount = 0; + for (int i = 0; i < count; i++) { + jedis.set("foo" + i, "bar" + i); + } + totalCount += count; + for (int i = 0; i < count; i++) { + jedis.rpush("foobar" + i, "foo" + i, "bar" + i); + } + totalCount += count; + + List> responses = new ArrayList<>(totalCount); + List expected = new ArrayList<>(totalCount); + try (ClusterPipeline pipeline = jedis.pipelined()) { + for (int i = 0; i < count; i++) { + responses.add(pipeline.get("foo" + i)); + expected.add("bar" + i); + } + for (int i = 0; i < count; i++) { + responses.add(pipeline.lrange("foobar" + i, 0, -1)); + expected.add(Arrays.asList("foo" + i, "bar" + i)); + } + } + + for (int i = 0; i < totalCount; i++) { + assertEquals(expected.get(i), responses.get(i).get()); + } + } + } } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBinaryValuesCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBinaryValuesCommandsTest.java index 63adddb561..7c94f0165a 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBinaryValuesCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBinaryValuesCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBitCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBitCommandsTest.java index 0d0522f282..60ce669cd2 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBitCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledBitCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledGeoCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledGeoCommandsTest.java index 95e698d790..d94753f32e 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledGeoCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledGeoCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHashesCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHashesCommandsTest.java index 9ccf4cb479..a628336a97 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHashesCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHashesCommandsTest.java @@ -1,29 +1,24 @@ -//package redis.clients.jedis.commands.unified.cluster; -// -//import org.junit.AfterClass; -//import org.junit.Before; -//import org.junit.BeforeClass; -//import redis.clients.jedis.commands.unified.HashesCommandsTestBase; -// -//public class ClusterHashesCommandsTest extends HashesCommandsTestBase { -// -// @BeforeClass -// public static void prepare() throws InterruptedException { -// jedis = ClusterCommandsTestHelper.initAndGetCluster(); -// } -// -// @AfterClass -// public static void closeCluster() { -// jedis.close(); -// } -// -// @AfterClass -// public static void resetCluster() { -// ClusterCommandsTestHelper.tearClusterDown(); -// } -// -// @Before -// public void setUp() { -// ClusterCommandsTestHelper.clearClusterData(); -// } -//} +package redis.clients.jedis.commands.unified.pooled; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import redis.clients.jedis.commands.unified.HashesCommandsTestBase; + +public class PooledHashesCommandsTest extends HashesCommandsTestBase { + + @BeforeClass + public static void prepare() throws InterruptedException { + jedis = PooledCommandsTestHelper.getPooled(); + } + + @AfterClass + public static void cleanUp() { + jedis.close(); + } + + @Before + public void setUp() { + PooledCommandsTestHelper.clearData(); + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHyperLogLogCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHyperLogLogCommandsTest.java index 9519a81c81..aca21523da 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHyperLogLogCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledHyperLogLogCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledListCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledListCommandsTest.java index 414cd52c64..4ccd3de00a 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledListCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledListCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledPipelinedTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledPipelinedTest.java new file mode 100644 index 0000000000..fd96efa23e --- /dev/null +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledPipelinedTest.java @@ -0,0 +1,74 @@ +package redis.clients.jedis.commands.unified.pooled; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import redis.clients.jedis.JedisPooled; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Response; + +import redis.clients.jedis.commands.unified.UnifiedJedisCommandsTestBase; + +public class PooledPipelinedTest extends UnifiedJedisCommandsTestBase { + + protected Pipeline pipeline; + + @BeforeClass + public static void prepare() throws InterruptedException { + jedis = PooledCommandsTestHelper.getPooled(); + } + + @AfterClass + public static void cleanUp() { + jedis.close(); + } + + @Before + public void setUp() { + PooledCommandsTestHelper.clearData(); + pipeline = ((JedisPooled) jedis).pipelined(); + } + + @After + public void tearDown() { + pipeline.close(); + } + + @Test + public void simple() { + final int count = 10; + int totalCount = 0; + for (int i = 0; i < count; i++) { + jedis.set("foo" + i, "bar" + i); + } + totalCount += count; + for (int i = 0; i < count; i++) { + jedis.rpush("foobar" + i, "foo" + i, "bar" + i); + } + totalCount += count; + + List> responses = new ArrayList<>(totalCount); + List expected = new ArrayList<>(totalCount); + for (int i = 0; i < count; i++) { + responses.add(pipeline.get("foo" + i)); + expected.add("bar" + i); + } + for (int i = 0; i < count; i++) { + responses.add(pipeline.lrange("foobar" + i, 0, -1)); + expected.add(Arrays.asList("foo" + i, "bar" + i)); + } + pipeline.sync(); + + for (int i = 0; i < totalCount; i++) { + assertEquals(expected.get(i), responses.get(i).get()); + } + } +} diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSetCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSetCommandsTest.java index c2fcf96bf2..00b7420e11 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSetCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSetCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSortedSetCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSortedSetCommandsTest.java index 9868cbb927..38b4f95450 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSortedSetCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledSortedSetCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); } diff --git a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStringValuesCommandsTest.java b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStringValuesCommandsTest.java index d7b510b050..34fd50c776 100644 --- a/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStringValuesCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/unified/pooled/PooledStringValuesCommandsTest.java @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException { } @AfterClass - public static void closeCluster() { + public static void cleanUp() { jedis.close(); }