diff --git a/src/main/java/redis/clients/jedis/BroadcastResponse.java b/src/main/java/redis/clients/jedis/BroadcastResponse.java new file mode 100644 index 0000000000..481d7fa63c --- /dev/null +++ b/src/main/java/redis/clients/jedis/BroadcastResponse.java @@ -0,0 +1,28 @@ +package redis.clients.jedis; + +import java.util.function.Supplier; + +/** + * Represents the response from a single node in broadcast mode. + */ +public class BroadcastResponse implements Supplier { + + private T response = null; + private RuntimeException exception = null; + + public BroadcastResponse(T response) { + this.response = response; + } + + public BroadcastResponse(RuntimeException exception) { + this.exception = exception; + } + + @Override + public T get() { + if (exception != null) { + throw exception; + } + return response; + } +} diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index 9c615780d5..0f634324d4 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -2754,32 +2754,62 @@ public final CommandObject evalshaReadonly(byte[] sha1, List key BuilderFactory.RAW_OBJECT); } + public final CommandObject> scriptExists(String... sha1s) { + return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s), + BuilderFactory.BOOLEAN_LIST); + } + public final CommandObject> scriptExists(String sampleKey, String... sha1s) { return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s) .processKey(sampleKey), BuilderFactory.BOOLEAN_LIST); } + public final CommandObject scriptLoad(String script) { + return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script), BuilderFactory.STRING); + } + public final CommandObject scriptLoad(String script, String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptFlush() { + return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING); + } + public final CommandObject scriptFlush(String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptFlush(FlushMode flushMode) { + return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode), BuilderFactory.STRING); + } + public final CommandObject scriptFlush(String sampleKey, FlushMode flushMode) { return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject scriptKill() { + return new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING); + } + public final CommandObject scriptKill(String sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(KILL).processKey(sampleKey), BuilderFactory.STRING); } + public final CommandObject> scriptExists(byte[]... sha1s) { + return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s), + BuilderFactory.BOOLEAN_LIST); + } + public final CommandObject> scriptExists(byte[] sampleKey, byte[]... sha1s) { return new CommandObject<>(commandArguments(SCRIPT).add(Keyword.EXISTS).addObjects((Object[]) sha1s) .processKey(sampleKey), BuilderFactory.BOOLEAN_LIST); } + public final CommandObject scriptLoad(byte[] script) { + return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script), BuilderFactory.BINARY); + } + public final CommandObject scriptLoad(byte[] script, byte[] sampleKey) { return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.BINARY); } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 5e5385f65c..702af9936c 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -3860,14 +3860,12 @@ public Object evalsha(final byte[] sha1, final int keyCount, final byte[]... par @Override public String scriptFlush() { - connection.sendCommand(SCRIPT, FLUSH); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptFlush()); } @Override public String scriptFlush(final FlushMode flushMode) { - connection.sendCommand(SCRIPT, FLUSH.getRaw(), flushMode.getRaw()); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptFlush(flushMode)); } @Override @@ -3879,20 +3877,17 @@ public Boolean scriptExists(final byte[] sha1) { @Override public List scriptExists(final byte[]... sha1) { - connection.sendCommand(SCRIPT, joinParameters(Keyword.EXISTS.getRaw(), sha1)); - return BuilderFactory.BOOLEAN_LIST.build(connection.getOne()); + return connection.executeCommand(commandObjects.scriptExists(sha1)); } @Override public byte[] scriptLoad(final byte[] script) { - connection.sendCommand(SCRIPT, LOAD.getRaw(), script); - return connection.getBinaryBulkReply(); + return connection.executeCommand(commandObjects.scriptLoad(script)); } @Override public String scriptKill() { - connection.sendCommand(SCRIPT, KILL); - return connection.getStatusCodeReply(); + return connection.executeCommand(commandObjects.scriptKill()); } @Override @@ -4335,8 +4330,7 @@ public String migrate(String host, int port, int timeout, MigrateParams params, @Override public long waitReplicas(final int replicas, final long timeout) { checkIsInMultiOrPipeline(); - connection.sendCommand(WAIT, toByteArray(replicas), toByteArray(timeout)); - return connection.getIntegerReply(); + return connection.executeCommand(commandObjects.waitReplicas(replicas, timeout)); } @Override @@ -7993,14 +7987,12 @@ public Boolean scriptExists(final String sha1) { @Override public List scriptExists(final String... sha1) { - connection.sendCommand(SCRIPT, joinParameters(Keyword.EXISTS.name(), sha1)); - return BuilderFactory.BOOLEAN_LIST.build(connection.getOne()); + return connection.executeCommand(commandObjects.scriptExists(sha1)); } @Override public String scriptLoad(final String script) { - connection.sendCommand(SCRIPT, LOAD.name(), script); - return connection.getBulkReply(); + return connection.executeCommand(commandObjects.scriptLoad(script)); } @Override diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 0a48fc0b6b..d2309658a9 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -2,9 +2,12 @@ import java.net.URI; import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.json.JSONArray; @@ -34,6 +37,7 @@ import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.JedisURIHelper; import redis.clients.jedis.util.KeyValue; +import redis.clients.jedis.util.Pool; public class UnifiedJedis implements JedisCommands, JedisBinaryCommands, SampleKeyedCommands, SampleBinaryKeyedCommands, RedisModuleCommands, @@ -167,6 +171,33 @@ public final T executeCommand(CommandObject commandObject) { return executor.executeCommand(commandObject); } + public final Map> broadcastCommand(CommandObject commandObject) { + // TODO: Push this implementation in executor interface/classes. + Map connectionMap = provider.getConnectionMap(); + Map> responseMap = new HashMap<>(connectionMap.size(), 1f); + for (Map.Entry entry : connectionMap.entrySet()) { + Object key = entry.getKey(); + Object connection = entry.getValue(); + try { + responseMap.put(key, new BroadcastResponse<>(executeBroadcastCommand(connection, commandObject))); + } catch (RuntimeException re) { + responseMap.put(key, new BroadcastResponse<>(re)); + } + } + return responseMap; + } + + private T executeBroadcastCommand(Object connection, CommandObject commandObject) { + if (connection instanceof Connection) { + return ((Connection) connection).executeCommand(commandObject); + } else if (connection instanceof Pool) { + try (Connection _conn = ((Pool) connection).getResource()) { + return _conn.executeCommand(commandObject); + } + } + throw new IllegalStateException(connection.getClass() + "is not supported."); + } + // Key commands @Override public boolean exists(String key) { @@ -3322,6 +3353,10 @@ public long waitReplicas(byte[] sampleKey, int replicas, long timeout) { return executeCommand(commandObjects.waitReplicas(sampleKey, replicas, timeout)); } + public Map> waitReplicasBroadcast(final int replicas, final long timeout) { + return broadcastCommand(commandObjects.waitReplicas(replicas, timeout)); + } + @Override public Object eval(String script, String sampleKey) { return executeCommand(commandObjects.eval(script, sampleKey)); @@ -3352,6 +3387,10 @@ public List scriptExists(String sampleKey, String... sha1s) { return executeCommand(commandObjects.scriptExists(sampleKey, sha1s)); } + public Map>> scriptExistsBroadcast(String... sha1s) { + return broadcastCommand(commandObjects.scriptExists(sha1s)); + } + @Override public Boolean scriptExists(byte[] sha1, byte[] sampleKey) { return scriptExists(sampleKey, new byte[][]{sha1}).get(0); @@ -3362,11 +3401,19 @@ public List scriptExists(byte[] sampleKey, byte[]... sha1s) { return executeCommand(commandObjects.scriptExists(sampleKey, sha1s)); } + public Map>> scriptExistsBroadcast(byte[]... sha1s) { + return broadcastCommand(commandObjects.scriptExists(sha1s)); + } + @Override public String scriptLoad(String script, String sampleKey) { return executeCommand(commandObjects.scriptLoad(script, sampleKey)); } + public Map> scriptLoadBroadcast(String script) { + return broadcastCommand(commandObjects.scriptLoad(script)); + } + @Override public String scriptFlush(String sampleKey) { return executeCommand(commandObjects.scriptFlush(sampleKey)); @@ -3377,16 +3424,32 @@ public String scriptFlush(String sampleKey, FlushMode flushMode) { return executeCommand(commandObjects.scriptFlush(sampleKey, flushMode)); } + public Map> scriptFlushBroadcast() { + return broadcastCommand(commandObjects.scriptFlush()); + } + + public Map> scriptFlushBroadcast(FlushMode flushMode) { + return broadcastCommand(commandObjects.scriptFlush(flushMode)); + } + @Override public String scriptKill(String sampleKey) { return executeCommand(commandObjects.scriptKill(sampleKey)); } + public Map> scriptKillBroadcast() { + return broadcastCommand(commandObjects.scriptKill()); + } + @Override public byte[] scriptLoad(byte[] script, byte[] sampleKey) { return executeCommand(commandObjects.scriptLoad(script, sampleKey)); } + public Map> scriptLoadBroadcast(byte[] script) { + return broadcastCommand(commandObjects.scriptLoad(script)); + } + @Override public String scriptFlush(byte[] sampleKey) { return executeCommand(commandObjects.scriptFlush(sampleKey)); @@ -3403,6 +3466,27 @@ public String scriptKill(byte[] sampleKey) { } // Sample key commands + public Map> configSetBroadcast(final String... parameterValues) { + if (parameterValues.length > 0 && parameterValues.length % 2 == 0) { + // ok + } else { + throw new IllegalStateException("It requires 'pair's of config parameter-values."); + } + CommandArguments args = new CommandArguments(Protocol.Command.CONFIG).add(Protocol.Keyword.SET) + .addObjects((Object[]) parameterValues); + return broadcastCommand(new CommandObject<>(args, BuilderFactory.STRING)); + } + + public Map> flushAllBroadcast() { + return broadcastCommand(new CommandObject<>(new CommandArguments(Protocol.Command.FLUSHALL), + BuilderFactory.STRING)); + } + + public Map> flushAllBroadcast(FlushMode flushMode) { + return broadcastCommand(new CommandObject<>(new CommandArguments(Protocol.Command.FLUSHALL) + .add(flushMode), BuilderFactory.STRING)); + } + // Random node commands public long publish(String channel, String message) { return executeCommand(commandObjects.publish(channel, message)); @@ -3451,11 +3535,33 @@ public String ftCreate(String indexName, IndexOptions indexOptions, Schema schem return executeCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); } + public Map> ftCreateBroadcast(String indexName, IndexOptions indexOptions, Schema schema) { + return broadcastCommand(commandObjects.ftCreate(indexName, indexOptions, schema)); + } + @Override public String ftCreate(String indexName, FTCreateParams createParams, Iterable schemaFields) { return executeCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); } + public Map> ftCreateBroadcast(String indexName, SchemaField... schemaFields) { + return ftCreateBroadcast(indexName, Arrays.asList(schemaFields)); + } + + public Map> ftCreateBroadcast(String indexName, FTCreateParams createParams, + SchemaField... schemaFields) { + return ftCreateBroadcast(indexName, createParams, Arrays.asList(schemaFields)); + } + + public Map> ftCreateBroadcast(String indexName, Iterable schemaFields) { + return ftCreateBroadcast(indexName, FTCreateParams.createParams(), schemaFields); + } + + public Map> ftCreateBroadcast(String indexName, FTCreateParams createParams, + Iterable schemaFields) { + return broadcastCommand(commandObjects.ftCreate(indexName, createParams, schemaFields)); + } + @Override public String ftAlter(String indexName, Schema schema) { return executeCommand(commandObjects.ftAlter(indexName, schema)); @@ -3539,6 +3645,14 @@ public String ftDropIndexDD(String indexName) { return executeCommand(commandObjects.ftDropIndexDD(indexName)); } + public Map> ftDropIndexBroadcast(String indexName) { + return broadcastCommand(commandObjects.ftDropIndex(indexName)); + } + + public Map> ftDropIndexDDBroadcast(String indexName) { + return broadcastCommand(commandObjects.ftDropIndexDD(indexName)); + } + @Override public String ftSynUpdate(String indexName, String synonymGroupId, String... terms) { return executeCommand(commandObjects.ftSynUpdate(indexName, synonymGroupId, terms)); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 24ee287d99..4ac0470ad1 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -135,4 +135,9 @@ public Connection getConnectionFromSlot(int slot) { } } } + + @Override + public Map getConnectionMap() { + return Collections.unmodifiableMap(getNodes()); + } } diff --git a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java index 8efafac33a..48543dd5cb 100644 --- a/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ConnectionProvider.java @@ -1,5 +1,7 @@ package redis.clients.jedis.providers; +import java.util.Collections; +import java.util.Map; import redis.clients.jedis.CommandArguments; import redis.clients.jedis.Connection; @@ -8,4 +10,9 @@ public interface ConnectionProvider extends AutoCloseable { Connection getConnection(); Connection getConnection(CommandArguments args); + + default Map getConnectionMap() { + final Connection c = getConnection(); + return Collections.singletonMap(c.toString(), c); + } } diff --git a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java index 415160f97f..f7b90e2953 100644 --- a/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/PooledConnectionProvider.java @@ -1,5 +1,7 @@ package redis.clients.jedis.providers; +import java.util.Collections; +import java.util.Map; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -14,27 +16,33 @@ public class PooledConnectionProvider implements ConnectionProvider { private final Pool pool; + private Object connectionMapKey = ""; public PooledConnectionProvider(HostAndPort hostAndPort) { this(new ConnectionFactory(hostAndPort)); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) { this(new ConnectionPool(hostAndPort, clientConfig)); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig, GenericObjectPoolConfig poolConfig) { this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig); + this.connectionMapKey = hostAndPort; } public PooledConnectionProvider(PooledObjectFactory factory) { this(new ConnectionPool(factory)); + this.connectionMapKey = factory; } public PooledConnectionProvider(PooledObjectFactory factory, GenericObjectPoolConfig poolConfig) { this(new ConnectionPool(factory, poolConfig)); + this.connectionMapKey = factory; } private PooledConnectionProvider(Pool pool) { @@ -59,4 +67,9 @@ public Connection getConnection() { public Connection getConnection(CommandArguments args) { return pool.getResource(); } + + @Override + public Map> getConnectionMap() { + return Collections.singletonMap(connectionMapKey, pool); + } } diff --git a/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java index f934c59f5a..63d6c7946f 100644 --- a/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ShardedConnectionProvider.java @@ -158,4 +158,9 @@ private HostAndPort getNodeFromHash(Long hash) { } return tail.get(tail.firstKey()); } + + @Override + public Map getConnectionMap() { + return Collections.unmodifiableMap(resources); + } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java index 4045e6a6ed..321fda88ad 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/ClusterScriptingCommandsTest.java @@ -4,8 +4,11 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.function.Supplier; import org.junit.Test; import redis.clients.jedis.args.FlushMode; @@ -91,4 +94,31 @@ public void testBinaryScriptExists() { byte[][] arraySha1 = { sha1 }; assertEquals(Collections.singletonList(Boolean.TRUE), cluster.scriptExists(byteKey, arraySha1)); } + + @Test + public void broadcast() { + Map> stringReplies; + String script_1 = "return 'jedis'", script_2 = "return 79", sha1_1, sha1_2; + + stringReplies = cluster.scriptLoadBroadcast(script_1); + assertEquals(3, stringReplies.size()); + sha1_1 = stringReplies.values().stream().findAny().get().get(); + stringReplies.values().forEach(reply -> assertEquals(sha1_1, reply.get())); + + stringReplies = cluster.scriptLoadBroadcast(script_2); + assertEquals(3, stringReplies.size()); + sha1_2 = stringReplies.values().stream().findAny().get().get(); + stringReplies.values().forEach(reply -> assertEquals(sha1_2, reply.get())); + + Map>> booleanListReplies; + booleanListReplies = cluster.scriptExistsBroadcast(sha1_1, sha1_2); + assertEquals(3, booleanListReplies.size()); + booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(true, true), reply.get())); + + cluster.scriptFlushBroadcast(); + + booleanListReplies = cluster.scriptExistsBroadcast(sha1_1, sha1_2); + assertEquals(3, booleanListReplies.size()); + booleanListReplies.values().forEach(reply -> assertEquals(Arrays.asList(false, false), reply.get())); + } } diff --git a/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java b/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java index 6eed412078..ad3c18c8dd 100644 --- a/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java +++ b/src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java @@ -1,16 +1,22 @@ package redis.clients.jedis.commands.jedis; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.junit.Test; +import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.CommandObject; import redis.clients.jedis.GeoCoordinate; import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.Protocol; import redis.clients.jedis.args.GeoUnit; import redis.clients.jedis.params.GeoRadiusParam; import redis.clients.jedis.params.GeoRadiusStoreParam; @@ -106,4 +112,20 @@ public void onUnsubscribe(String channel, int subscribedChannels) { } }, "foo"); } + + @Test + public void broadcastPing() { + Map> replies = cluster.broadcastCommand(new CommandObject<>( + new CommandArguments(Protocol.Command.PING), BuilderFactory.STRING)); + assertEquals(3, replies.size()); + replies.values().forEach(reply -> assertEquals("PONG", reply.get())); + } + + @Test + public void broadcastFlushAll() { + assertEquals("OK", cluster.set("foo", "bar")); + assertEquals("bar", cluster.get("foo")); + cluster.flushAllBroadcast(); + assertNull(cluster.get("foo")); + } } diff --git a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java index 611d80e274..098e2a4d89 100644 --- a/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java +++ b/src/test/java/redis/clients/jedis/modules/search/SearchWithParamsTest.java @@ -4,6 +4,7 @@ import static redis.clients.jedis.util.AssertUtil.assertOK; import java.util.*; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.BeforeClass; import org.junit.Test; @@ -138,6 +139,13 @@ public void createWithFieldNames() { assertEquals(1, asFamily.getTotalResults()); } + @Test + public void createBroadcast() { + Map> reply = client.ftCreateBroadcast(index, TextField.of("t")); + assertEquals(1, reply.size()); + assertOK(reply.values().stream().findFirst().get().get()); + } + @Test public void alterAdd() { assertOK(client.ftCreate(index, TextField.of("title")));