Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pipelining from UnifiedJedis #3221

Merged
merged 1 commit into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,15 @@ public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterCommandExecutor) executor).provider.getNodes();
return ((ClusterConnectionProvider) provider).getNodes();
}

public Connection getConnectionFromSlot(int slot) {
return ((ClusterCommandExecutor) executor).provider.getConnectionFromSlot(slot);
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
}

@Override
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider);
}
}
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,9 @@ public JedisPooled(PooledConnectionProvider provider) {
public final Pool<Connection> getPool() {
return ((PooledConnectionProvider) provider).getPool();
}

@Override
public Pipeline pipelined() {
return (Pipeline) super.pipelined();
}
}
5 changes: 5 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSharding.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public JedisSharding(ShardedConnectionProvider provider) {
public JedisSharding(ShardedConnectionProvider provider, Pattern tagPattern) {
super(provider, tagPattern);
}

@Override
public ShardedPipeline pipelined() {
return new ShardedPipeline((ShardedConnectionProvider) provider);
}
}
23 changes: 13 additions & 10 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,21 @@ public class Pipeline extends Queable implements PipelineCommands, PipelineBinar
DatabasePipelineCommands, RedisModulePipelineCommands, Closeable {

protected final Connection connection;
// private final Jedis jedis;
private final boolean closeConnection;
private final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;

public Pipeline(Jedis jedis) {
this(jedis.getConnection(), false);
}

public Pipeline(Connection connection) {
// super(connection);
this.connection = connection;
// this.jedis = null;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this.connection);
this(connection, false);
}

public Pipeline(Jedis jedis) {
// super(jedis.getConnection());
this.connection = jedis.getConnection();
// this.jedis = jedis;
public Pipeline(Connection connection, boolean closeConnection) {
this.connection = connection;
this.closeConnection = closeConnection;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this.connection);
}
Expand All @@ -61,6 +60,10 @@ public final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
@Override
public void close() {
sync();

if (closeConnection) {
connection.close();
}
}

/**
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4534,6 +4534,14 @@ public Map<String, Object> graphConfigGet(String configName) {
}
// RedisGraph commands

public Object pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
}
Connection connection = provider.getConnection();
return new Pipeline(connection, true);
}

public Object sendCommand(ProtocolCommand cmd) {
return executeCommand(commandObjects.commandArguments(cmd));
}
Expand Down
33 changes: 33 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -944,4 +944,37 @@ public void testEvalshaKeyAndArgWithBinary() {
private <T> Matcher<Iterable<? super T>> listWithItem(T expected) {
return CoreMatchers.<T>hasItem(equalTo(expected));
}

@Test
public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.cluster' package
try (JedisCluster jedis = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;

List<Response<?>> responses = new ArrayList<>(totalCount);
List<Object> expected = new ArrayList<>(totalCount);
try (ClusterPipeline pipeline = jedis.pipelined()) {
for (int i = 0; i < count; i++) {
responses.add(pipeline.get("foo" + i));
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
responses.add(pipeline.lrange("foobar" + i, 0, -1));
expected.add(Arrays.asList("foo" + i, "bar" + i));
}
}

for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i).get());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
//package redis.clients.jedis.commands.unified.cluster;
//
//import org.junit.AfterClass;
//import org.junit.Before;
//import org.junit.BeforeClass;
//import redis.clients.jedis.commands.unified.HashesCommandsTestBase;
//
//public class ClusterHashesCommandsTest extends HashesCommandsTestBase {
//
// @BeforeClass
// public static void prepare() throws InterruptedException {
// jedis = ClusterCommandsTestHelper.initAndGetCluster();
// }
//
// @AfterClass
// public static void closeCluster() {
// jedis.close();
// }
//
// @AfterClass
// public static void resetCluster() {
// ClusterCommandsTestHelper.tearClusterDown();
// }
//
// @Before
// public void setUp() {
// ClusterCommandsTestHelper.clearClusterData();
// }
//}
package redis.clients.jedis.commands.unified.pooled;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import redis.clients.jedis.commands.unified.HashesCommandsTestBase;

public class PooledHashesCommandsTest extends HashesCommandsTestBase {

@BeforeClass
public static void prepare() throws InterruptedException {
jedis = PooledCommandsTestHelper.getPooled();
}

@AfterClass
public static void cleanUp() {
jedis.close();
}

@Before
public void setUp() {
PooledCommandsTestHelper.clearData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package redis.clients.jedis.commands.unified.pooled;

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

import redis.clients.jedis.commands.unified.UnifiedJedisCommandsTestBase;

public class PooledPipelinedTest extends UnifiedJedisCommandsTestBase {

protected Pipeline pipeline;

@BeforeClass
public static void prepare() throws InterruptedException {
jedis = PooledCommandsTestHelper.getPooled();
}

@AfterClass
public static void cleanUp() {
jedis.close();
}

@Before
public void setUp() {
PooledCommandsTestHelper.clearData();
pipeline = ((JedisPooled) jedis).pipelined();
}

@After
public void tearDown() {
pipeline.close();
}

@Test
public void simple() {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;

List<Response<?>> responses = new ArrayList<>(totalCount);
List<Object> expected = new ArrayList<>(totalCount);
for (int i = 0; i < count; i++) {
responses.add(pipeline.get("foo" + i));
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
responses.add(pipeline.lrange("foobar" + i, 0, -1));
expected.add(Arrays.asList("foo" + i, "bar" + i));
}
pipeline.sync();

for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i).get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public static void prepare() throws InterruptedException {
}

@AfterClass
public static void closeCluster() {
public static void cleanUp() {
jedis.close();
}

Expand Down