From 6617e9cd84dad290e772e81214d89874dc6c9e4c Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Sun, 6 Nov 2022 12:26:22 +0600 Subject: [PATCH 1/8] Introduce JedisBroadcast to broadcast commands --- .../clients/jedis/BroadcastResponse.java | 25 ++++ .../redis/clients/jedis/JedisBroadcast.java | 130 ++++++++++++++++++ .../providers/ClusterConnectionProvider.java | 5 + .../jedis/providers/ConnectionProvider.java | 7 + .../providers/PooledConnectionProvider.java | 11 ++ .../providers/ShardedConnectionProvider.java | 5 + .../redis/clients/jedis/JedisClusterTest.java | 14 ++ .../jedis/modules/search/BroadcastTest.java | 59 ++++++++ 8 files changed, 256 insertions(+) create mode 100644 src/main/java/redis/clients/jedis/BroadcastResponse.java create mode 100644 src/main/java/redis/clients/jedis/JedisBroadcast.java create mode 100644 src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java diff --git a/src/main/java/redis/clients/jedis/BroadcastResponse.java b/src/main/java/redis/clients/jedis/BroadcastResponse.java new file mode 100644 index 0000000000..915ae50784 --- /dev/null +++ b/src/main/java/redis/clients/jedis/BroadcastResponse.java @@ -0,0 +1,25 @@ +package redis.clients.jedis; + +import java.util.function.Supplier; + +public class BroadcastResponse implements Supplier { + + private T response = null; + private RuntimeException exception = null; + + public BroadcastResponse(T response) { + this.response = response; + } + + public BroadcastResponse(RuntimeException exception) { + this.exception = exception; + } + + @Override + public T get() { + if (exception != null) { + throw exception; + } + return response; + } +} diff --git a/src/main/java/redis/clients/jedis/JedisBroadcast.java b/src/main/java/redis/clients/jedis/JedisBroadcast.java new file mode 100644 index 0000000000..9bbc45011c --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisBroadcast.java @@ -0,0 +1,130 @@ +package redis.clients.jedis; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import redis.clients.jedis.Protocol.Command; +import redis.clients.jedis.Protocol.Keyword; +import redis.clients.jedis.args.FlushMode; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.search.FTCreateParams; +import redis.clients.jedis.search.IndexOptions; +import redis.clients.jedis.search.Schema; +import redis.clients.jedis.search.SearchProtocol.SearchCommand; +import redis.clients.jedis.search.SearchProtocol.SearchKeyword; +import redis.clients.jedis.search.schemafields.SchemaField; +import redis.clients.jedis.util.Pool; + +public class JedisBroadcast { + + private final ConnectionProvider provider; + + public JedisBroadcast(UnifiedJedis jedis) { + this(jedis.provider); + } + + public JedisBroadcast(ConnectionProvider provider) { + if (provider == null) { + throw new NullPointerException("ConnectionProvider is null."); + } + this.provider = provider; + } + + public final Map> broadcastCommand(CommandObject commandObject) { + Map connectionMap = provider.getConnectionMap(); + Map> responseMap = new HashMap<>(connectionMap.size(), 1f); + for (Map.Entry entry : connectionMap.entrySet()) { + Object key = entry.getKey(); + Object connection = entry.getValue(); + try { + responseMap.put(key, new BroadcastResponse<>(executeCommand(connection, commandObject))); + } catch (RuntimeException re) { + responseMap.put(key, new BroadcastResponse<>(re)); + } + } + return responseMap; + } + + private T executeCommand(Object connection, CommandObject commandObject) { + if (connection instanceof Connection) { + return ((Connection) connection).executeCommand(commandObject); + } + if (connection instanceof Pool) { + try (Connection _conn = ((Pool) connection).getResource()) { + return _conn.executeCommand(commandObject); + } + } + throw new IllegalStateException(connection.getClass() + "is not supported."); + } + + public Map>> scriptExists(String... sha1) { + CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); + return broadcastCommand(command); + } + + public Map>> scriptExists(byte[]... sha1) { + CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); + return broadcastCommand(command); + } + + public Map> scriptLoad(String script) { + CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.LOAD).add(script), BuilderFactory.STRING); + return broadcastCommand(command); + } + + public Map> scriptLoad(byte[] script) { + CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.LOAD).add(script), BuilderFactory.BINARY); + return broadcastCommand(command); + } + + public Map> scriptFlush() { + CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.FLUSH), BuilderFactory.STRING); + return broadcastCommand(command); + } + + public Map> scriptFlush(FlushMode flushMode) { + CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.FLUSH).add(flushMode), BuilderFactory.STRING); + return broadcastCommand(command); + } + + public Map> scriptKill() { + CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) + .add(Keyword.KILL), BuilderFactory.STRING); + return broadcastCommand(command); + } + + public Map> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) { + CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) + .addParams(indexOptions).add(SearchKeyword.SCHEMA); + schema.fields.forEach(field -> args.addParams(field)); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); + } + + public Map> ftCreate(String indexName, SchemaField... schemaFields) { + return ftCreate(indexName, Arrays.asList(schemaFields)); + } + + public Map> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) { + return ftCreate(indexName, createParams, Arrays.asList(schemaFields)); + } + + public Map> ftCreate(String indexName, Iterable schemaFields) { + return ftCreate(indexName, FTCreateParams.createParams(), schemaFields); + } + + public Map> ftCreate(String indexName, FTCreateParams createParams, + Iterable schemaFields) { + CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) + .addParams(createParams).add(SearchKeyword.SCHEMA); + schemaFields.forEach(field -> args.addParams(field)); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); + } +} diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 1fc0138e30..a1d5e1f4cb 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -135,4 +135,9 @@ public Connection getConnectionFromSlot(int slot) { } } } + + @Override + public Map getConnectionMap() { + return Collections.unmodifiableMap(getNodes()); + } } diff --git a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java index 8efafac33a..48543dd5cb 100644 --- a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java @@ -1,5 +1,7 @@ package redis.clients.jedis.providers; +import java.util.Collections; +import java.util.Map; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.Connection; @@ -8,4 +10,9 @@ public interface ConnectionProvider extends AutoCloseable { Connection getConnection(); Connection getConnection(CommandArguments args); + + default Map getConnectionMap() { + final Connection c = getConnection(); + return Collections.singletonMap(c.toString(), c); + } } diff --git a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java index 415160f97f..7c091c58f3 100644 --- a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java @@ -1,5 +1,7 @@ package redis.clients.jedis.providers; +import java.util.Collections; +import java.util.Map; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -14,18 +16,22 @@ public class PooledConnectionProvider implements ConnectionProvider { private final Pool pool; + private Object connectionMapKey = ""; public PooledConnectionProvider(HostAndPort hostAndPort) { this(new ConnectionFactory(hostAndPort)); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) { this(new ConnectionPool(hostAndPort, clientConfig)); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig, GenericObjectPoolConfig poolConfig) { this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(PooledObjectFactory factory) { @@ -59,4 +65,9 @@ public Connection getConnection() { public Connection getConnection(CommandArguments args) { return pool.getResource(); } + + @Override + public Map> getConnectionMap() { + return Collections.singletonMap("", pool); + } } diff --git a/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java index f934c59f5a..63d6c7946f 100644 --- a/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java @@ -158,4 +158,9 @@ private HostAndPort getNodeFromHash(Long hash) { } return tail.get(tail.firstKey()); } + + @Override + public Map getConnectionMap() { + return Collections.unmodifiableMap(resources); + } } diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index b93fa2409e..ca58924138 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -13,6 +13,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; @@ -780,4 +781,17 @@ private boolean isAnyNodeHandshaking(Jedis node) { } return false; } + + @Test + public void broadcast() { + try (JedisCluster cluster = new JedisCluster(Collections.singleton(new HostAndPort(LOCAL_IP, 7379)), + DefaultJedisClientConfig.builder().password("cluster").build())) { + JedisBroadcast broadcast = new JedisBroadcast(cluster); + + Map> replies = broadcast.broadcastCommand(new CommandObject<>( + new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING)); + assertEquals(3, replies.size()); + replies.values().forEach(reply -> assertEquals("PONG", reply.get())); + } + } } diff --git a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java new file mode 100644 index 0000000000..433a8bd755 --- /dev/null +++ b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java @@ -0,0 +1,59 @@ +package redis.clients.jedis.modules.search; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static redis.clients.jedis.util.AssertUtil.assertOK; + +import java.util.Map; +import org.junit.BeforeClass; +import org.junit.Test; + +import redis.clients.jedis.BroadcastResponse; +import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.Connection; +import redis.clients.jedis.JedisBroadcast; +import redis.clients.jedis.modules.RedisModuleCommandsTestBase; +import redis.clients.jedis.providers.ConnectionProvider; +import redis.clients.jedis.search.schemafields.TextField; + +public class BroadcastTest extends RedisModuleCommandsTestBase { + + private static final String index = "broadcast"; + + @BeforeClass + public static void prepare() { + RedisModuleCommandsTestBase.prepare(); + } +// +// @AfterClass +// public static void tearDown() { +//// RedisModuleCommandsTestBase.tearDown(); +// } + + @Test + public void broadcast() throws Exception { + final Connection conn = new Connection(hnp); + try (ConnectionProvider provider = new ConnectionProvider() { + @Override + public Connection getConnection() { + return conn; + } + + @Override + public Connection getConnection(CommandArguments args) { + return getConnection(); + } + + @Override + public void close() throws Exception { + conn.close(); + } + }) { + JedisBroadcast broadcast = new JedisBroadcast(provider); + Map> reply = broadcast.ftCreate(index, TextField.of("t")); + assertEquals(1, reply.size()); + assertOK(reply.values().stream().findAny().get().get()); + } + assertFalse(conn.isConnected()); + } +} From 5bbeca907f6f349197728ea76be8e449a3fe7ea5 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 7 Nov 2022 22:00:48 +0600 Subject: [PATCH 2/8] edit --- .../clients/jedis/providers/PooledConnectionProvider.java | 4 +++- src/test/java/redis/clients/jedis/JedisClusterTest.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java index 7c091c58f3..f7b90e2953 100644 --- a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java @@ -36,11 +36,13 @@ public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clien public PooledConnectionProvider(PooledObjectFactory factory) { this(new ConnectionPool(factory)); + this.connectionMapKey = factory; } public PooledConnectionProvider(PooledObjectFactory factory, GenericObjectPoolConfig poolConfig) { this(new ConnectionPool(factory, poolConfig)); + this.connectionMapKey = factory; } private PooledConnectionProvider(Pool pool) { @@ -68,6 +70,6 @@ public Connection getConnection(CommandArguments args) { @Override public Map> getConnectionMap() { - return Collections.singletonMap("", pool); + return Collections.singletonMap(connectionMapKey, pool); } } diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index ca58924138..12b0e3ae08 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -13,7 +13,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; From 7bfa784f15291e78bc475cf90b5656d9490abc4a Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:27:17 +0600 Subject: [PATCH 3/8] Use Supplier in broadcast method signatures instead of BroadcastResponse --- .../redis/clients/jedis/JedisBroadcast.java | 29 ++++++++++--------- .../redis/clients/jedis/JedisClusterTest.java | 3 +- .../jedis/modules/search/BroadcastTest.java | 4 +-- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisBroadcast.java b/src/main/java/redis/clients/jedis/JedisBroadcast.java index 9bbc45011c..84fe401e91 100644 --- a/src/main/java/redis/clients/jedis/JedisBroadcast.java +++ b/src/main/java/redis/clients/jedis/JedisBroadcast.java @@ -4,6 +4,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.Protocol.Keyword; @@ -32,9 +33,9 @@ public JedisBroadcast(ConnectionProvider provider) { this.provider = provider; } - public final Map> broadcastCommand(CommandObject commandObject) { + public final Map> broadcastCommand(CommandObject commandObject) { Map connectionMap = provider.getConnectionMap(); - Map> responseMap = new HashMap<>(connectionMap.size(), 1f); + Map> responseMap = new HashMap<>(connectionMap.size(), 1f); for (Map.Entry entry : connectionMap.entrySet()) { Object key = entry.getKey(); Object connection = entry.getValue(); @@ -59,68 +60,68 @@ private T executeCommand(Object connection, CommandObject commandObject) throw new IllegalStateException(connection.getClass() + "is not supported."); } - public Map>> scriptExists(String... sha1) { + public Map>> scriptExists(String... sha1) { CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); return broadcastCommand(command); } - public Map>> scriptExists(byte[]... sha1) { + public Map>> scriptExists(byte[]... sha1) { CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); return broadcastCommand(command); } - public Map> scriptLoad(String script) { + public Map> scriptLoad(String script) { CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.LOAD).add(script), BuilderFactory.STRING); return broadcastCommand(command); } - public Map> scriptLoad(byte[] script) { + public Map> scriptLoad(byte[] script) { CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.LOAD).add(script), BuilderFactory.BINARY); return broadcastCommand(command); } - public Map> scriptFlush() { + public Map> scriptFlush() { CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.FLUSH), BuilderFactory.STRING); return broadcastCommand(command); } - public Map> scriptFlush(FlushMode flushMode) { + public Map> scriptFlush(FlushMode flushMode) { CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.FLUSH).add(flushMode), BuilderFactory.STRING); return broadcastCommand(command); } - public Map> scriptKill() { + public Map> scriptKill() { CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.KILL), BuilderFactory.STRING); return broadcastCommand(command); } - public Map> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) { + public Map> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) { CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) .addParams(indexOptions).add(SearchKeyword.SCHEMA); schema.fields.forEach(field -> args.addParams(field)); return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); } - public Map> ftCreate(String indexName, SchemaField... schemaFields) { + public Map> ftCreate(String indexName, SchemaField... schemaFields) { return ftCreate(indexName, Arrays.asList(schemaFields)); } - public Map> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) { + public Map> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) { return ftCreate(indexName, createParams, Arrays.asList(schemaFields)); } - public Map> ftCreate(String indexName, Iterable schemaFields) { + public Map> ftCreate(String indexName, Iterable schemaFields) { return ftCreate(indexName, FTCreateParams.createParams(), schemaFields); } - public Map> ftCreate(String indexName, FTCreateParams createParams, + public Map> ftCreate(String indexName, FTCreateParams createParams, Iterable schemaFields) { CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) .addParams(createParams).add(SearchKeyword.SCHEMA); diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index 12b0e3ae08..24c2c01b76 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.Test; @@ -787,7 +788,7 @@ public void broadcast() { DefaultJedisClientConfig.builder().password("cluster").build())) { JedisBroadcast broadcast = new JedisBroadcast(cluster); - Map> replies = broadcast.broadcastCommand(new CommandObject<>( + Map> replies = broadcast.broadcastCommand(new CommandObject<>( new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING)); assertEquals(3, replies.size()); replies.values().forEach(reply -> assertEquals("PONG", reply.get())); diff --git a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java index 433a8bd755..711e100be2 100644 --- a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java +++ b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java @@ -5,10 +5,10 @@ import static redis.clients.jedis.util.AssertUtil.assertOK; import java.util.Map; +import java.util.function.Supplier; import org.junit.BeforeClass; import org.junit.Test; -import redis.clients.jedis.BroadcastResponse; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.Connection; import redis.clients.jedis.JedisBroadcast; @@ -50,7 +50,7 @@ public void close() throws Exception { } }) { JedisBroadcast broadcast = new JedisBroadcast(provider); - Map> reply = broadcast.ftCreate(index, TextField.of("t")); + Map> reply = broadcast.ftCreate(index, TextField.of("t")); assertEquals(1, reply.size()); assertOK(reply.values().stream().findAny().get().get()); } From 322990563964f839c4cdf335fdfecbc0149e77e2 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Sun, 13 Nov 2022 15:30:18 +0600 Subject: [PATCH 4/8] Address review --- src/main/java/redis/clients/jedis/BroadcastResponse.java | 3 +++ src/main/java/redis/clients/jedis/JedisBroadcast.java | 5 ++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/redis/clients/jedis/BroadcastResponse.java b/src/main/java/redis/clients/jedis/BroadcastResponse.java index 915ae50784..481d7fa63c 100644 --- a/src/main/java/redis/clients/jedis/BroadcastResponse.java +++ b/src/main/java/redis/clients/jedis/BroadcastResponse.java @@ -2,6 +2,9 @@ import java.util.function.Supplier; +/** + * Represents the response from a single node in broadcast mode. + */ public class BroadcastResponse implements Supplier { private T response = null; diff --git a/src/main/java/redis/clients/jedis/JedisBroadcast.java b/src/main/java/redis/clients/jedis/JedisBroadcast.java index 84fe401e91..8aa9b848d4 100644 --- a/src/main/java/redis/clients/jedis/JedisBroadcast.java +++ b/src/main/java/redis/clients/jedis/JedisBroadcast.java @@ -28,7 +28,7 @@ public JedisBroadcast(UnifiedJedis jedis) { public JedisBroadcast(ConnectionProvider provider) { if (provider == null) { - throw new NullPointerException("ConnectionProvider is null."); + throw new NullPointerException("ConnectionProvider cannot be null."); } this.provider = provider; } @@ -51,8 +51,7 @@ public final Map> broadcastCommand(CommandObject commandOb private T executeCommand(Object connection, CommandObject commandObject) { if (connection instanceof Connection) { return ((Connection) connection).executeCommand(commandObject); - } - if (connection instanceof Pool) { + } else if (connection instanceof Pool) { try (Connection _conn = ((Pool) connection).getResource()) { return _conn.executeCommand(commandObject); } From 13345f91810dca078f177ee67d8ea3a37b920874 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Wed, 16 Nov 2022 12:13:54 +0600 Subject: [PATCH 5/8] more commands --- .../redis/clients/jedis/JedisBroadcast.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/main/java/redis/clients/jedis/JedisBroadcast.java b/src/main/java/redis/clients/jedis/JedisBroadcast.java index 8aa9b848d4..ad47a9c78b 100644 --- a/src/main/java/redis/clients/jedis/JedisBroadcast.java +++ b/src/main/java/redis/clients/jedis/JedisBroadcast.java @@ -59,6 +59,33 @@ private T executeCommand(Object connection, CommandObject commandObject) throw new IllegalStateException(connection.getClass() + "is not supported."); } + public Map> configSet(final String... parameterValues) { + if (parameterValues.length > 0 && parameterValues.length % 2 == 0) { + // ok + } else { + throw new IllegalStateException("It requires 'pair's of config parameter-values."); + } + CommandArguments args = new CommandArguments(Command.CONFIG).add(Keyword.SET) + .addObjects((Object[]) parameterValues); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); + } + + public Map> flushAll() { + return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL), + BuilderFactory.STRING)); + } + + public Map> flushAll(FlushMode flushMode) { + return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL) + .add(flushMode), BuilderFactory.STRING)); + } + + public Map> waitReplicas(final int replicas, final long timeout) { + CommandArguments args = new CommandArguments(Command.WAIT) + .add(Protocol.toByteArray(replicas)).add(Protocol.toByteArray(timeout)); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.LONG)); + } + public Map>> scriptExists(String... sha1) { CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); @@ -127,4 +154,14 @@ public Map> ftCreate(String indexName, FTCreateParams create schemaFields.forEach(field -> args.addParams(field)); return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); } + + public Map> ftDropIndex(String indexName) { + return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX) + .add(indexName), BuilderFactory.STRING)); + } + + public Map> ftDropIndexDD(String indexName) { + return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX) + .add(indexName).add(SearchKeyword.DD), BuilderFactory.STRING)); + } } From 3c79413ea216df903845653b35d9103b1548d46a Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 21 Nov 2022 21:40:25 +0600 Subject: [PATCH 6/8] Include broadcast commands in UnifiedJedis --- .../redis/clients/jedis/CommandObjects.java | 30 ++++ src/main/java/redis/clients/jedis/Jedis.java | 24 +-- .../redis/clients/jedis/JedisBroadcast.java | 167 ------------------ .../redis/clients/jedis/UnifiedJedis.java | 114 ++++++++++++ .../redis/clients/jedis/JedisClusterTest.java | 14 -- .../jedis/ClusterScriptingCommandsTest.java | 30 ++++ .../jedis/ClusterValuesCommandsTest.java | 22 +++ .../jedis/modules/search/BroadcastTest.java | 2 +- 8 files changed, 205 insertions(+), 198 deletions(-) delete mode 100644 src/main/java/redis/clients/jedis/JedisBroadcast.java diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 9c615780d5..0f634324d4 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2754,32 +2754,62 @@ public final CommandObject evalshaReadonly(byte[] sha1, List key BuilderFactory.RAW_OBJECT); } + public final CommandObject> scriptExists(String... sha1s) { + return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s), + BuilderFactory.BOOLEAN_LIST); + } + public final CommandObject> scriptExists(String sampleKey, String... sha1s) { return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s) .processKey(sampleKey), BuilderFactory.BOOLEAN_LIST); } + public final CommandObject scriptLoad(String script) { + return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script), BuilderFactory.STRING); + } + public final CommandObject scriptLoad(String script, String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptFlush() { + return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING); + } + public final CommandObject scriptFlush(String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptFlush(FlushMode flushMode) { + return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode), BuilderFactory.STRING); + } + public final CommandObject scriptFlush(String sampleKey, FlushMode flushMode) { return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptKill() { + return new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING); + } + public final CommandObject scriptKill(String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(KILL).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject> scriptExists(byte[]... sha1s) { + return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s), + BuilderFactory.BOOLEAN_LIST); + } + public final CommandObject> scriptExists(byte[] sampleKey, byte[]... sha1s) { return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s) .processKey(sampleKey), BuilderFactory.BOOLEAN_LIST); } + public final CommandObject scriptLoad(byte[] script) { + return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script), BuilderFactory.BINARY); + } + public final CommandObject scriptLoad(byte[] script, byte[] sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.BINARY); } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 5e5385f65c..702af9936c 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3860,14 +3860,12 @@ public Object evalsha(final byte[] sha1, final int keyCount, final byte[]... par @Override public String scriptFlush() { - connection.sendCommand(SCRIPT, FLUSH); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptFlush()); } @Override public String scriptFlush(final FlushMode flushMode) { - connection.sendCommand(SCRIPT, FLUSH.getRaw(), flushMode.getRaw()); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptFlush(flushMode)); } @Override @@ -3879,20 +3877,17 @@ public Boolean scriptExists(final byte[] sha1) { @Override public List scriptExists(final byte[]... sha1) { - connection.sendCommand(SCRIPT, joinParameters(Keyword.EXISTS.getRaw(), sha1)); - return BuilderFactory.BOOLEAN_LIST.build(connection.getOne()); + return connection.executeCommand(commandObjects.scriptExists(sha1)); } @Override public byte[] scriptLoad(final byte[] script) { - connection.sendCommand(SCRIPT, LOAD.getRaw(), script); - return connection.getBinaryBulkReply(); + return connection.executeCommand(commandObjects.scriptLoad(script)); } @Override public String scriptKill() { - connection.sendCommand(SCRIPT, KILL); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptKill()); } @Override @@ -4335,8 +4330,7 @@ public String migrate(String host, int port, int timeout, MigrateParams params, @Override public long waitReplicas(final int replicas, final long timeout) { checkIsInMultiOrPipeline(); - connection.sendCommand(WAIT, toByteArray(replicas), toByteArray(timeout)); - return connection.getIntegerReply(); + return connection.executeCommand(commandObjects.waitReplicas(replicas, timeout)); } @Override @@ -7993,14 +7987,12 @@ public Boolean scriptExists(final String sha1) { @Override public List scriptExists(final String... sha1) { - connection.sendCommand(SCRIPT, joinParameters(Keyword.EXISTS.name(), sha1)); - return BuilderFactory.BOOLEAN_LIST.build(connection.getOne()); + return connection.executeCommand(commandObjects.scriptExists(sha1)); } @Override public String scriptLoad(final String script) { - connection.sendCommand(SCRIPT, LOAD.name(), script); - return connection.getBulkReply(); + return connection.executeCommand(commandObjects.scriptLoad(script)); } @Override diff --git a/src/main/java/redis/clients/jedis/JedisBroadcast.java b/src/main/java/redis/clients/jedis/JedisBroadcast.java deleted file mode 100644 index ad47a9c78b..0000000000 --- a/src/main/java/redis/clients/jedis/JedisBroadcast.java +++ /dev/null @@ -1,167 +0,0 @@ -package redis.clients.jedis; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; - -import redis.clients.jedis.Protocol.Command; -import redis.clients.jedis.Protocol.Keyword; -import redis.clients.jedis.args.FlushMode; -import redis.clients.jedis.providers.ConnectionProvider; -import redis.clients.jedis.search.FTCreateParams; -import redis.clients.jedis.search.IndexOptions; -import redis.clients.jedis.search.Schema; -import redis.clients.jedis.search.SearchProtocol.SearchCommand; -import redis.clients.jedis.search.SearchProtocol.SearchKeyword; -import redis.clients.jedis.search.schemafields.SchemaField; -import redis.clients.jedis.util.Pool; - -public class JedisBroadcast { - - private final ConnectionProvider provider; - - public JedisBroadcast(UnifiedJedis jedis) { - this(jedis.provider); - } - - public JedisBroadcast(ConnectionProvider provider) { - if (provider == null) { - throw new NullPointerException("ConnectionProvider cannot be null."); - } - this.provider = provider; - } - - public final Map> broadcastCommand(CommandObject commandObject) { - Map connectionMap = provider.getConnectionMap(); - Map> responseMap = new HashMap<>(connectionMap.size(), 1f); - for (Map.Entry entry : connectionMap.entrySet()) { - Object key = entry.getKey(); - Object connection = entry.getValue(); - try { - responseMap.put(key, new BroadcastResponse<>(executeCommand(connection, commandObject))); - } catch (RuntimeException re) { - responseMap.put(key, new BroadcastResponse<>(re)); - } - } - return responseMap; - } - - private T executeCommand(Object connection, CommandObject commandObject) { - if (connection instanceof Connection) { - return ((Connection) connection).executeCommand(commandObject); - } else if (connection instanceof Pool) { - try (Connection _conn = ((Pool) connection).getResource()) { - return _conn.executeCommand(commandObject); - } - } - throw new IllegalStateException(connection.getClass() + "is not supported."); - } - - public Map> configSet(final String... parameterValues) { - if (parameterValues.length > 0 && parameterValues.length % 2 == 0) { - // ok - } else { - throw new IllegalStateException("It requires 'pair's of config parameter-values."); - } - CommandArguments args = new CommandArguments(Command.CONFIG).add(Keyword.SET) - .addObjects((Object[]) parameterValues); - return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); - } - - public Map> flushAll() { - return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL), - BuilderFactory.STRING)); - } - - public Map> flushAll(FlushMode flushMode) { - return broadcastCommand(new CommandObject<>(new CommandArguments(Command.FLUSHALL) - .add(flushMode), BuilderFactory.STRING)); - } - - public Map> waitReplicas(final int replicas, final long timeout) { - CommandArguments args = new CommandArguments(Command.WAIT) - .add(Protocol.toByteArray(replicas)).add(Protocol.toByteArray(timeout)); - return broadcastCommand(new CommandObject<>(args, BuilderFactory.LONG)); - } - - public Map>> scriptExists(String... sha1) { - CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); - return broadcastCommand(command); - } - - public Map>> scriptExists(byte[]... sha1) { - CommandObject> command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.EXISTS).addObjects((Object[]) sha1), BuilderFactory.BOOLEAN_LIST); - return broadcastCommand(command); - } - - public Map> scriptLoad(String script) { - CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.LOAD).add(script), BuilderFactory.STRING); - return broadcastCommand(command); - } - - public Map> scriptLoad(byte[] script) { - CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.LOAD).add(script), BuilderFactory.BINARY); - return broadcastCommand(command); - } - - public Map> scriptFlush() { - CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.FLUSH), BuilderFactory.STRING); - return broadcastCommand(command); - } - - public Map> scriptFlush(FlushMode flushMode) { - CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.FLUSH).add(flushMode), BuilderFactory.STRING); - return broadcastCommand(command); - } - - public Map> scriptKill() { - CommandObject command = new CommandObject<>(new CommandArguments(Command.SCRIPT) - .add(Keyword.KILL), BuilderFactory.STRING); - return broadcastCommand(command); - } - - public Map> ftCreate(String indexName, IndexOptions indexOptions, Schema schema) { - CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) - .addParams(indexOptions).add(SearchKeyword.SCHEMA); - schema.fields.forEach(field -> args.addParams(field)); - return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); - } - - public Map> ftCreate(String indexName, SchemaField... schemaFields) { - return ftCreate(indexName, Arrays.asList(schemaFields)); - } - - public Map> ftCreate(String indexName, FTCreateParams createParams, SchemaField... schemaFields) { - return ftCreate(indexName, createParams, Arrays.asList(schemaFields)); - } - - public Map> ftCreate(String indexName, Iterable schemaFields) { - return ftCreate(indexName, FTCreateParams.createParams(), schemaFields); - } - - public Map> ftCreate(String indexName, FTCreateParams createParams, - Iterable schemaFields) { - CommandArguments args = new CommandArguments(SearchCommand.CREATE).add(indexName) - .addParams(createParams).add(SearchKeyword.SCHEMA); - schemaFields.forEach(field -> args.addParams(field)); - return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); - } - - public Map> ftDropIndex(String indexName) { - return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX) - .add(indexName), BuilderFactory.STRING)); - } - - public Map> ftDropIndexDD(String indexName) { - return broadcastCommand(new CommandObject<>(new CommandArguments(SearchCommand.DROPINDEX) - .add(indexName).add(SearchKeyword.DD), BuilderFactory.STRING)); - } -} diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 0a48fc0b6b..d2309658a9 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -2,9 +2,12 @@ import java.net.URI; import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.json.JSONArray; @@ -34,6 +37,7 @@ import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.JedisURIHelper; import redis.clients.jedis.util.KeyValue; +import redis.clients.jedis.util.Pool; public class UnifiedJedis implements JedisCommands, JedisBinaryCommands, SampleKeyedCommands, SampleBinaryKeyedCommands, RedisModuleCommands, @@ -167,6 +171,33 @@ public final T executeCommand(CommandObject commandObject) { return executor.executeCommand(commandObject); } + public final Map> broadcastCommand(CommandObject commandObject) { + // TODO: Push this implementation in executor interface/classes. + Map connectionMap = provider.getConnectionMap(); + Map> responseMap = new HashMap<>(connectionMap.size(), 1f); + for (Map.Entry entry : connectionMap.entrySet()) { + Object key = entry.getKey(); + Object connection = entry.getValue(); + try { + responseMap.put(key, new BroadcastResponse<>(executeBroadcastCommand(connection, commandObject))); + } catch (RuntimeException re) { + responseMap.put(key, new BroadcastResponse<>(re)); + } + } + return responseMap; + } + + private T executeBroadcastCommand(Object connection, CommandObject commandObject) { + if (connection instanceof Connection) { + return ((Connection) connection).executeCommand(commandObject); + } else if (connection instanceof Pool) { + try (Connection _conn = ((Pool) connection).getResource()) { + return _conn.executeCommand(commandObject); + } + } + throw new IllegalStateException(connection.getClass() + "is not supported."); + } + // Key commands @Override public boolean exists(String key) { @@ -3322,6 +3353,10 @@ public long waitReplicas(byte[] sampleKey, int replicas, long timeout) { return executeCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout)); } + public Map> waitReplicasBroadcast(final int replicas, final long timeout) { + return broadcastCommand(commandObjects.waitReplicas(replicas, timeout)); + } + @Override public Object eval(String script, String sampleKey) { return executeCommand(commandObjects.eval(script, sampleKey)); @@ -3352,6 +3387,10 @@ public List scriptExists(String sampleKey, String... sha1s) { return executeCommand(commandObjects.scriptExists(sampleKey, sha1s)); } + public Map>> scriptExistsBroadcast(String... sha1s) { + return broadcastCommand(commandObjects.scriptExists(sha1s)); + } + @Override public Boolean scriptExists(byte[] sha1, byte[] sampleKey) { return scriptExists(sampleKey, new byte[][]{sha1}).get(0); @@ -3362,11 +3401,19 @@ public List scriptExists(byte[] sampleKey, byte[]... sha1s) { return executeCommand(commandObjects.scriptExists(sampleKey, sha1s)); } + public Map>> scriptExistsBroadcast(byte[]... sha1s) { + return broadcastCommand(commandObjects.scriptExists(sha1s)); + } + @Override public String scriptLoad(String script, String sampleKey) { return executeCommand(commandObjects.scriptLoad(script, sampleKey)); } + public Map> scriptLoadBroadcast(String script) { + return broadcastCommand(commandObjects.scriptLoad(script)); + } + @Override public String scriptFlush(String sampleKey) { return executeCommand(commandObjects.scriptFlush(sampleKey)); @@ -3377,16 +3424,32 @@ public String scriptFlush(String sampleKey, FlushMode flushMode) { return executeCommand(commandObjects.scriptFlush(sampleKey, flushMode)); } + public Map> scriptFlushBroadcast() { + return broadcastCommand(commandObjects.scriptFlush()); + } + + public Map> scriptFlushBroadcast(FlushMode flushMode) { + return broadcastCommand(commandObjects.scriptFlush(flushMode)); + } + @Override public String scriptKill(String sampleKey) { return executeCommand(commandObjects.scriptKill(sampleKey)); } + public Map> scriptKillBroadcast() { + return broadcastCommand(commandObjects.scriptKill()); + } + @Override public byte[] scriptLoad(byte[] script, byte[] sampleKey) { return executeCommand(commandObjects.scriptLoad(script, sampleKey)); } + public Map> scriptLoadBroadcast(byte[] script) { + return broadcastCommand(commandObjects.scriptLoad(script)); + } + @Override public String scriptFlush(byte[] sampleKey) { return executeCommand(commandObjects.scriptFlush(sampleKey)); @@ -3403,6 +3466,27 @@ public String scriptKill(byte[] sampleKey) { } // Sample key commands + public Map> configSetBroadcast(final String... parameterValues) { + if (parameterValues.length > 0 && parameterValues.length % 2 == 0) { + // ok + } else { + throw new IllegalStateException("It requires 'pair's of config parameter-values."); + } + CommandArguments args = new CommandArguments(Protocol.Command.CONFIG).add(Protocol.Keyword.SET) + .addObjects((Object[]) parameterValues); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); + } + + public Map> flushAllBroadcast() { + return broadcastCommand(new CommandObject<>(new CommandArguments(Protocol.Command.FLUSHALL), + BuilderFactory.STRING)); + } + + public Map> flushAllBroadcast(FlushMode flushMode) { + return broadcastCommand(new CommandObject<>(new CommandArguments(Protocol.Command.FLUSHALL) + .add(flushMode), BuilderFactory.STRING)); + } + // Random node commands public long publish(String channel, String message) { return executeCommand(commandObjects.publish(channel, message)); @@ -3451,11 +3535,33 @@ public String ftCreate(String indexName, IndexOptions indexOptions, Schema schem return executeCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); } + public Map> ftCreateBroadcast(String indexName, IndexOptions indexOptions, Schema schema) { + return broadcastCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); + } + @Override public String ftCreate(String indexName, FTCreateParams createParams, Iterable schemaFields) { return executeCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); } + public Map> ftCreateBroadcast(String indexName, SchemaField... schemaFields) { + return ftCreateBroadcast(indexName, Arrays.asList(schemaFields)); + } + + public Map> ftCreateBroadcast(String indexName, FTCreateParams createParams, + SchemaField... schemaFields) { + return ftCreateBroadcast(indexName, createParams, Arrays.asList(schemaFields)); + } + + public Map> ftCreateBroadcast(String indexName, Iterable schemaFields) { + return ftCreateBroadcast(indexName, FTCreateParams.createParams(), schemaFields); + } + + public Map> ftCreateBroadcast(String indexName, FTCreateParams createParams, + Iterable schemaFields) { + return broadcastCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); + } + @Override public String ftAlter(String indexName, Schema schema) { return executeCommand(commandObjects.ftAlter(indexName, schema)); @@ -3539,6 +3645,14 @@ public String ftDropIndexDD(String indexName) { return executeCommand(commandObjects.ftDropIndexDD(indexName)); } + public Map> ftDropIndexBroadcast(String indexName) { + return broadcastCommand(commandObjects.ftDropIndex(indexName)); + } + + public Map> ftDropIndexDDBroadcast(String indexName) { + return broadcastCommand(commandObjects.ftDropIndexDD(indexName)); + } + @Override public String ftSynUpdate(String indexName, String synonymGroupId, String... terms) { return executeCommand(commandObjects.ftSynUpdate(indexName, synonymGroupId, terms)); diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index 24c2c01b76..b93fa2409e 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.Test; @@ -781,17 +780,4 @@ private boolean isAnyNodeHandshaking(Jedis node) { } return false; } - - @Test - public void broadcast() { - try (JedisCluster cluster = new JedisCluster(Collections.singleton(new HostAndPort(LOCAL_IP, 7379)), - DefaultJedisClientConfig.builder().password("cluster").build())) { - JedisBroadcast broadcast = new JedisBroadcast(cluster); - - Map> replies = broadcast.broadcastCommand(new CommandObject<>( - new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING)); - assertEquals(3, replies.size()); - replies.values().forEach(reply -> assertEquals("PONG", reply.get())); - } - } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java index 4045e6a6ed..321fda88ad 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java @@ -4,8 +4,11 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.function.Supplier; import org.junit.Test; import redis.clients.jedis.args.FlushMode; @@ -91,4 +94,31 @@ public void testBinaryScriptExists() { byte[][] arraySha1 = { sha1 }; assertEquals(Collections.singletonList(Boolean.TRUE), cluster.scriptExists(byteKey, arraySha1)); } + + @Test + public void broadcast() { + Map> stringReplies; + String script_1 = "return 'jedis'", script_2 = "return 79", sha1_1, sha1_2; + + stringReplies = cluster.scriptLoadBroadcast(script_1); + assertEquals(3, stringReplies.size()); + sha1_1 = stringReplies.values().stream().findAny().get().get(); + stringReplies.values().forEach(reply -> assertEquals(sha1_1, reply.get())); + + stringReplies = cluster.scriptLoadBroadcast(script_2); + assertEquals(3, stringReplies.size()); + sha1_2 = stringReplies.values().stream().findAny().get().get(); + stringReplies.values().forEach(reply -> assertEquals(sha1_2, reply.get())); + + Map>> booleanListReplies; + booleanListReplies = cluster.scriptExistsBroadcast(sha1_1, sha1_2); + assertEquals(3, booleanListReplies.size()); + booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(true, true), reply.get())); + + cluster.scriptFlushBroadcast(); + + booleanListReplies = cluster.scriptExistsBroadcast(sha1_1, sha1_2); + assertEquals(3, booleanListReplies.size()); + booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(false, false), reply.get())); + } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java index 6eed412078..ad3c18c8dd 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java @@ -1,16 +1,22 @@ package redis.clients.jedis.commands.jedis; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.junit.Test; +import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.CommandObject; import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.Protocol; import redis.clients.jedis.args.GeoUnit; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; @@ -106,4 +112,20 @@ public void onUnsubscribe(String channel, int subscribedChannels) { } }, "foo"); } + + @Test + public void broadcastPing() { + Map> replies = cluster.broadcastCommand(new CommandObject<>( + new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING)); + assertEquals(3, replies.size()); + replies.values().forEach(reply -> assertEquals("PONG", reply.get())); + } + + @Test + public void broadcastFlushAll() { + assertEquals("OK", cluster.set("foo", "bar")); + assertEquals("bar", cluster.get("foo")); + cluster.flushAllBroadcast(); + assertNull(cluster.get("foo")); + } } diff --git a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java index 711e100be2..b4353e2006 100644 --- a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java +++ b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java @@ -50,7 +50,7 @@ public void close() throws Exception { } }) { JedisBroadcast broadcast = new JedisBroadcast(provider); - Map> reply = broadcast.ftCreate(index, TextField.of("t")); + Map> reply = broadcast.ftCreateBroadcast(index, TextField.of("t")); assertEquals(1, reply.size()); assertOK(reply.values().stream().findAny().get().get()); } From ec31320d6001280a2ca2fd6260b50d41568cd409 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 21 Nov 2022 21:47:49 +0600 Subject: [PATCH 7/8] fix search broadcast test --- .../jedis/modules/search/BroadcastTest.java | 59 ------------------- .../modules/search/SearchWithParamsTest.java | 9 +++ 2 files changed, 9 insertions(+), 59 deletions(-) delete mode 100644 src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java diff --git a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java b/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java deleted file mode 100644 index b4353e2006..0000000000 --- a/src/test/java/redis/clients/jedis/modules/search/BroadcastTest.java +++ /dev/null @@ -1,59 +0,0 @@ -package redis.clients.jedis.modules.search; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static redis.clients.jedis.util.AssertUtil.assertOK; - -import java.util.Map; -import java.util.function.Supplier; -import org.junit.BeforeClass; -import org.junit.Test; - -import redis.clients.jedis.CommandArguments; -import redis.clients.jedis.Connection; -import redis.clients.jedis.JedisBroadcast; -import redis.clients.jedis.modules.RedisModuleCommandsTestBase; -import redis.clients.jedis.providers.ConnectionProvider; -import redis.clients.jedis.search.schemafields.TextField; - -public class BroadcastTest extends RedisModuleCommandsTestBase { - - private static final String index = "broadcast"; - - @BeforeClass - public static void prepare() { - RedisModuleCommandsTestBase.prepare(); - } -// -// @AfterClass -// public static void tearDown() { -//// RedisModuleCommandsTestBase.tearDown(); -// } - - @Test - public void broadcast() throws Exception { - final Connection conn = new Connection(hnp); - try (ConnectionProvider provider = new ConnectionProvider() { - @Override - public Connection getConnection() { - return conn; - } - - @Override - public Connection getConnection(CommandArguments args) { - return getConnection(); - } - - @Override - public void close() throws Exception { - conn.close(); - } - }) { - JedisBroadcast broadcast = new JedisBroadcast(provider); - Map> reply = broadcast.ftCreateBroadcast(index, TextField.of("t")); - assertEquals(1, reply.size()); - assertOK(reply.values().stream().findAny().get().get()); - } - assertFalse(conn.isConnected()); - } -} diff --git a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java index 611d80e274..be93b61ab4 100644 --- a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java +++ b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java @@ -4,6 +4,7 @@ import static redis.clients.jedis.util.AssertUtil.assertOK; import java.util.*; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.BeforeClass; import org.junit.Test; @@ -17,6 +18,7 @@ import redis.clients.jedis.search.schemafields.*; import redis.clients.jedis.search.schemafields.VectorField.VectorAlgorithm; import redis.clients.jedis.modules.RedisModuleCommandsTestBase; +import static redis.clients.jedis.util.AssertUtil.assertOK; public class SearchWithParamsTest extends RedisModuleCommandsTestBase { @@ -138,6 +140,13 @@ public void createWithFieldNames() { assertEquals(1, asFamily.getTotalResults()); } + @Test + public void createBroadcast() { + Map> reply = client.ftCreateBroadcast(index, TextField.of("t")); + assertEquals(1, reply.size()); + assertOK(reply.values().stream().findFirst().get().get()); + } + @Test public void alterAdd() { assertOK(client.ftCreate(index, TextField.of("title"))); From 81463cf67b6d83b2909f278cf41e424390d4ac1b Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Mon, 21 Nov 2022 22:09:41 +0600 Subject: [PATCH 8/8] remove double import --- .../redis/clients/jedis/modules/search/SearchWithParamsTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java index be93b61ab4..098e2a4d89 100644 --- a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java +++ b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java @@ -18,7 +18,6 @@ import redis.clients.jedis.search.schemafields.*; import redis.clients.jedis.search.schemafields.VectorField.VectorAlgorithm; import redis.clients.jedis.modules.RedisModuleCommandsTestBase; -import static redis.clients.jedis.util.AssertUtil.assertOK; public class SearchWithParamsTest extends RedisModuleCommandsTestBase {